1#' Create a PSOCK Cluster of R Workers for Parallel Processing
2#'
3#' The `makeClusterPSOCK()` function creates a cluster of \R workers
4#' for parallel processing.  These \R workers may be background \R sessions
5#' on the current machine, \R sessions on external machines (local or remote),
6#' or a mix of such. For external workers, the default is to use SSH to connect
7#' to those external machines.  This function works similarly to
8#' \code{\link[parallel:makeCluster]{makePSOCKcluster}()} of the
9#' \pkg{parallel} package, but provides additional and more flexibility options
10#' for controlling the setup of the system calls that launch the background
11#' \R workers, and how to connect to external machines.
12#'
13#' @param workers The hostnames of workers (as a character vector) or the number
14#' of localhost workers (as a positive integer).
15#'
16#' @param makeNode A function that creates a `"SOCKnode"` or
17#' `"SOCK0node"` object, which represents a connection to a worker.
18#'
19#' @param port The port number of the master used for communicating with all
20#' the workers (via socket connections).  If an integer vector of ports, then a
21#' random one among those is chosen.  If `"random"`, then a random port in
22#' is chosen from `11000:11999`, or from the range specified by
23#' environment variable \env{R_PARALLELLY_RANDOM_PORTS}.
24#' If `"auto"` (default), then the default (single) port is taken from
25#' environment variable \env{R_PARALLEL_PORT}, otherwise `"random"` is
26#' used.
27#' _Note, do not use this argument to specify the port number used by
28#' `rshcmd`, which typically is an SSH client.  Instead, if the SSH daemon
29#' runs on a different port than the default 22, specify the SSH port by
30#' appending it to the hostname, e.g. `"remote.server.org:2200"` or via
31#' SSH options `-p`, e.g. `rshopts = c("-p", "2200")`._
32#'
33#' @param \dots Optional arguments passed to
34#' `makeNode(workers[i], ..., rank = i)` where `i = seq_along(workers)`.
35#'
36#' @param autoStop If TRUE, the cluster will be automatically stopped
37#' using \code{\link[parallel:makeCluster]{stopCluster}()} when it is
38#' garbage collected, unless already stopped.  See also [autoStopCluster()].
39#'
40#' @param tries,delay Maximum number of attempts done to launch each node
41#' with `makeNode()` and the delay (in seconds) in-between attempts.
42#' If argument `port` specifies more than one port, e.g. `port = "random"`
43#' then a random port will be drawn and validated at most `tries` times.
44#' Arguments `tries` and `delay` are used only when `setup_strategy == "sequential`.
45#'
46#' @param validate If TRUE (default), after the nodes have been created, they are all
47#' validated that they work by inquiring about their session information,
48#' which is saved in attribute `session_info` of each node.
49#'
50#' @param verbose If TRUE, informative messages are outputted.
51#'
52#' @return An object of class `c("RichSOCKcluster", "SOCKcluster", "cluster")`
53#' consisting of a list of `"SOCKnode"` or `"SOCK0node"` workers (that also
54#' inherit from `RichSOCKnode`).
55#'
56#' @example incl/makeClusterPSOCK.R
57#'
58#' @importFrom parallel stopCluster
59#' @importFrom utils packageVersion
60#' @export
61makeClusterPSOCK <- function(workers, makeNode = makeNodePSOCK, port = c("auto", "random"), ..., autoStop = FALSE, tries = getOption2("parallelly.makeNodePSOCK.tries", 3L), delay = getOption2("parallelly.makeNodePSOCK.tries.delay", 15.0), validate = getOption2("parallelly.makeNodePSOCK.validate", TRUE), verbose = getOption2("parallelly.debug", FALSE)) {
62  localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost")
63
64  if (is.numeric(workers)) {
65    if (length(workers) != 1L) {
66      stopf("When numeric, argument 'workers' must be a single value: %s", length(workers))
67    }
68    workers <- as.integer(workers)
69    if (is.na(workers) || workers < 1L) {
70      stopf("Number of 'workers' must be one or greater: %s", workers)
71    }
72    workers <- rep(localhostHostname, times = workers)
73  }
74
75  tries <- as.integer(tries)
76  stop_if_not(length(tries) == 1L, is.integer(tries), !is.na(tries), tries >= 1L)
77
78  delay <- as.numeric(delay)
79  stop_if_not(length(delay) == 1L, is.numeric(delay), !is.na(delay), delay >= 0)
80
81  validate <- as.logical(validate)
82  stop_if_not(length(validate) == 1L, is.logical(validate), !is.na(validate))
83
84  ## If we are sure that each node requires a connection, then ...
85  if (identical(makeNode, makeNodePSOCK)) {
86    ## ... can we create that many workers?
87    free <- freeConnections()
88    if (validate) free <- free - 1L
89    if (length(workers) > free) {
90      stopf("Cannot create %d parallel PSOCK nodes. Each node needs one connection but there are only %d connections left out of the maximum %d available on this R installation", length(workers), free, availableConnections())
91    }
92  }
93
94  verbose_prefix <- "[local output] "
95
96  if (verbose) {
97    mdebugf("%sWorkers: [n = %d] %s", verbose_prefix,
98                    length(workers), hpaste(sQuote(workers)))
99  }
100
101  if (length(port) == 0L) {
102    stop("Argument 'port' must be of length one or more: 0")
103  }
104  port <- freePort(port)
105  if (verbose) mdebugf("%sBase port: %d", verbose_prefix, port)
106
107
108  n <- length(workers)
109  nodeOptions <- vector("list", length = n)
110  if (verbose) mdebugf("%sGetting setup options for %d cluster nodes ...", verbose_prefix, n)
111  for (ii in seq_len(n)) {
112    if (verbose) mdebugf("%s - Node %d of %d ...", verbose_prefix, ii, n)
113    options <- makeNode(workers[[ii]], port = port, ..., rank = ii, action = "options", verbose = verbose)
114    stop_if_not(inherits(options, "makeNodePSOCKOptions"))
115    nodeOptions[[ii]] <- options
116  }
117  if (verbose) mdebugf("%sGetting setup options for %d cluster nodes ... done", verbose_prefix, n)
118
119  ## Is a 'parallel' setup strategy requested and possible?
120  setup_strategy <- lapply(nodeOptions, FUN = function(options) {
121    value <- options$setup_strategy
122    if (is.null(value)) value <- "sequential"
123    stop_if_not(is.character(value), length(value) == 1L)
124    value
125  })
126  setup_strategy <- unlist(setup_strategy, use.names = FALSE)
127  is_parallel <- (setup_strategy == "parallel")
128  force_sequential <- FALSE
129  if (any(is_parallel)) {
130    if (verbose) mdebugf("%s - Parallel setup requested for some PSOCK nodes", verbose_prefix)
131
132    if (!all(is_parallel)) {
133      if (verbose) mdebugf("%s - Parallel setup requested only for some PSOCK nodes; will revert to a sequential setup for all", verbose_prefix)
134      force_sequential <- TRUE
135    } else {
136      ## Force setup_strategy = "sequential"?
137      affected <- affected_by_bug18119()
138      if (!is.na(affected) && affected) {
139        if (verbose) mdebugf("%s - Parallel setup requested but not supported on this version of R: %s", verbose_prefix, getRversion())
140        force_sequential <- TRUE
141      }
142    }
143  }
144
145  if (force_sequential) {
146    ## Force all nodes to be setup using the 'sequential' setup strategy
147    setup_strategy <- "sequential"
148
149    for (ii in which(is_parallel)) {
150      if (verbose) mdebugf("%s - Node %d of %d ...", verbose_prefix, ii, n)
151      args <- list(workers[[ii]], port = port, ..., rank = ii, action = "options", verbose = verbose)
152      args$setup_strategy <- "sequential"
153      options <- do.call(makeNode, args = args)
154      stop_if_not(inherits(options, "makeNodePSOCKOptions"))
155      nodeOptions[[ii]] <- options
156    }
157  }
158
159  ## Sanity check
160  setup_strategy <- lapply(nodeOptions, FUN = function(options) {
161    value <- options$setup_strategy
162    if (is.null(value)) value <- "sequential"
163    stop_if_not(is.character(value), length(value) == 1L)
164    value
165  })
166  setup_strategy <- unlist(setup_strategy, use.names = FALSE)
167  setup_strategy <- unique(setup_strategy)
168  stop_if_not(length(setup_strategy) == 1L)
169
170  cl <- vector("list", length = length(nodeOptions))
171  class(cl) <- c("RichSOCKcluster", "SOCKcluster", "cluster")
172
173  ## If an error occurred, make sure to clean up before exiting, i.e.
174  ## stop each node
175  on.exit({
176    nodes <- vapply(cl, FUN = inherits, c("SOCKnode", "SOCK0node"),
177                        FUN.VALUE = FALSE)
178    stopCluster(cl[nodes])
179    cl <- NULL
180  })
181
182  if (setup_strategy == "parallel") {
183    ## To please R CMD check on R (< 4.0.0)
184    if (getRversion() < "4.0.0") {
185      stopf("Parallel setup of PSOCK cluster nodes is not supported in R %s", getRversion())
186      socketAccept <- serverSocket <- function(...) NULL
187    }
188
189    sendCall <- importParallel("sendCall")
190    recvResult <- importParallel("recvResult")
191
192    ## AD HOC: Use (port, timeout, useXDR) from the options of the first node
193    options <- nodeOptions[[1]]
194    if (verbose) {
195      mdebugf("%sSetting up PSOCK nodes in parallel", verbose_prefix)
196      mstr(options)
197    }
198    port <- options[["port"]]
199    connectTimeout <- options[["connectTimeout"]]
200    timeout <- options[["timeout"]]
201    useXDR <- options[["useXDR"]]
202    nodeClass <- c("RichSOCKnode", if(useXDR) "SOCKnode" else "SOCK0node")
203    cmd <- options[["cmd"]]
204
205    if (verbose) {
206      mdebugf("%sSystem call to launch all workers:", verbose_prefix)
207      mdebugf("%s%s", verbose_prefix, cmd)
208    }
209
210    ## FIXME: Add argument, option, environment variable for this
211
212    ## Start listening and start workers.
213    if (verbose) mdebugf("%sStarting PSOCK main server", verbose_prefix)
214    socket <- serverSocket(port = port)
215    on.exit(if (!is.null(socket)) close(socket), add = TRUE)
216
217    if (.Platform$OS.type == "windows") {
218      for (ii in seq_along(cl)) {
219        ## See parallel::newPSOCKnode() for the input = ""
220        system(cmd, wait = FALSE, input = "")
221      }
222    } else {
223      ## Asynchronous lists are defined by POSIX
224      cmd <- paste(rep(cmd, times = length(cl)), collapse = " & ")
225      system(cmd, wait = FALSE)
226    }
227
228    if (verbose) mdebugf("%sWorkers launched", verbose_prefix)
229
230    ## Accept connections and send the first command as initial
231    ## handshake.  The handshake makes TCP synchronization detect and
232    ## err on half-opened connections, which arise during parallel setup
233    ## of client-server connections (due to internal timeouts, limited
234    ## length of the listen backlog queue, race in timing out on
235    ## creating a connection and probably more).
236    ##
237    ## The handshake looks like a regular server command followed by
238    ## client response, which is compatible with older versions of R.
239    ready <- 0L
240    pending <- list()
241    on.exit({
242      lapply(pending, FUN = function(x) close(x$con))
243      cl <- NULL
244    }, add = TRUE)
245
246    if (verbose) mdebugf("%sWaiting for workers to connect back", verbose_prefix)
247
248    t0 <- Sys.time()
249    while (ready < length(cl)) {
250      if (verbose) mdebugf("%s%d workers out of %d ready", verbose_prefix, ready, length(cl))
251
252      cons <- lapply(pending, FUN = function(x) x$con)
253
254      if (difftime(Sys.time(), t0, units="secs") > connectTimeout + 5) {
255          ## The workers will give up after connectTimeout, so there is
256          ## no point waiting for them much longer.
257          failed <- length(cl) - ready
258          stop(ngettext(failed,
259               "Cluster setup failed. %d worker of %d failed to connect.",
260               "Cluster setup failed. %d of %d workers failed to connect."),
261               failed, length(cl))
262      }
263      a <- socketSelect(append(list(socket), cons), write = FALSE, timeout = connectTimeout)
264      canAccept <- a[1]
265      canReceive <- seq_along(pending)[a[-1]]
266
267      if (canAccept) {
268        con <- socketAccept(socket = socket, blocking = TRUE, open = "a+b", timeout = timeout)
269        scon <- structure(list(con = con, host = localhostHostname, rank = ready), class = nodeClass)
270        res <- tryCatch({
271          sendCall(scon, eval, list(quote(Sys.getpid())))
272        }, error = identity)
273        pending <- append(pending, list(scon))
274      }
275
276      for (scon in pending[canReceive]) {
277        pid <- tryCatch({
278          recvResult(scon)
279        }, error = identity)
280        if (is.integer(pid)) {
281          ready <- ready + 1L
282          cl[[ready]] <- scon
283        } else {
284          close(scon$con)
285        }
286      }
287      if (length(canReceive) > 0L) pending <- pending[-canReceive]
288    } ## while()
289  } else if (setup_strategy == "sequential") {
290    retryPort <- getOption2("parallelly.makeNodePSOCK.tries.port", "same")
291    for (ii in seq_along(cl)) {
292      if (verbose) {
293        mdebugf("%sCreating node %d of %d ...", verbose_prefix, ii, n)
294        mdebugf("%s- setting up node", verbose_prefix)
295      }
296
297      options <- nodeOptions[[ii]]
298
299      for (kk in 1:tries) {
300        if (verbose) {
301          mdebugf("%s- attempt #%d of %d", verbose_prefix, kk, tries)
302        }
303        node <- tryCatch({
304          makeNode(options, verbose = verbose)
305        }, error = identity)
306        ## Success or an error that is not a connection error?
307        if (!inherits(node, "PSOCKConnectionError")) break
308
309        if (kk < tries) {
310          if (verbose) {
311            message(conditionMessage(node))
312            ## Retry with a new random port?
313            if (retryPort == "next") {
314              options$port <- max(options$port + 1L, 65535L)
315            } else if (retryPort == "available") {
316              options$port <- freePort()
317            }
318            mdebugf("%s- waiting %g seconds before trying again",
319                    verbose_prefix, delay)
320          }
321          Sys.sleep(delay)
322        }
323      }
324      if (inherits(node, "error")) {
325        ex <- node
326        if (inherits(node, "PSOCKConnectionError")) {
327          if (verbose) {
328            mdebugf("%s  Failed %d attempts with %g seconds delay",
329                    verbose_prefix, tries, delay)
330          }
331          ex$message <- sprintf("%s\n * Number of attempts: %d (%gs delay)",
332                                conditionMessage(ex), tries, delay)
333        } else {
334          ex$call <- sys.call()
335        }
336        stop(ex)
337      }
338      cl[[ii]] <- node
339
340      if (verbose) {
341        mdebugf("%sCreating node %d of %d ... done", verbose_prefix, ii, n)
342      }
343    }
344  }
345
346  ## Cleanup
347  try(close(socket), silent = TRUE)
348  socket <- NULL
349
350  if (validate) {
351    ## Attaching session information for each worker.  This is done to assert
352    ## that we have a working cluster already here.  It will also collect
353    ## useful information otherwise not available, e.g. the PID.
354    if (verbose) {
355      mdebugf("%s- collecting session information", verbose_prefix)
356    }
357    for (ii in seq_along(cl)) {
358      cl[ii] <- add_cluster_session_info(cl[ii])
359    }
360  }
361
362  if (autoStop) cl <- autoStopCluster(cl)
363
364  ## Success, remove automatic cleanup of nodes
365  on.exit()
366
367  cl
368} ## makeClusterPSOCK()
369
370
371#' @param worker The hostname or IP number of the machine where the worker
372#' should run.
373#'
374#' @param master The hostname or IP number of the master / calling machine, as
375#' known to the workers.  If NULL (default), then the default is
376#' `Sys.info()[["nodename"]]` unless `worker` is _localhost_ or
377#' `revtunnel = TRUE` in case it is `"localhost"`.
378#'
379#' @param connectTimeout The maximum time (in seconds) allowed for each socket
380#' connection between the master and a worker to be established (defaults to
381#' 2 minutes). _See note below on current lack of support on Linux and
382#' macOS systems._
383#'
384#' @param timeout The maximum time (in seconds) allowed to pass without the
385#' master and a worker communicate with each other (defaults to 30 days).
386#'
387#' @param rscript,homogeneous The system command for launching \command{Rscript}
388#' on the worker and whether it is installed in the same path as the calling
389#' machine or not.  For more details, see below.
390#'
391#' @param rscript_args Additional arguments to \command{Rscript} (as a character
392#' vector).  This argument can be used to customize the \R environment of the
393#' workers before they launches.
394#' For instance, use `rscript_args = c("-e", shQuote('setwd("/path/to")'))`
395#' to set the working directory to \file{/path/to} on _all_ workers.
396#'
397#' @param rscript_envs A named character vector environment variables to
398#' set or unset on worker at startup, e.g.
399#' `rscript_envs = c(FOO = "3.14", "HOME", "UNKNOWN", UNSETME = NA_character_)`.
400#' If an element is not named, then the value of that variable will be used as
401#' the name and the value will be the value of `Sys.getenv()` for that
402#' variable.  Non-existing environment variables will be dropped.
403#' These variables are set using `Sys.setenv()`.
404#' An named element with value `NA_character_` will cause that variable to be
405#' unset, which is done via `Sys.unsetenv()`.
406#'
407#' @param rscript_libs A character vector of \R library paths that will be
408#' used for the library search path of the \R workers.  An asterisk
409#' (`"*"`) will be resolved to the default `.libPaths()` _on the
410#' worker_. That is, to `prepend` a folder, instead of replacing the
411#' existing ones, use `rscript_libs = c("new_folder", "*")`.
412#' To pass down a non-default library path currently set _on the main \R
413#' session_ to the workers, use `rscript_libs = .libPaths()`.
414#'
415#' @param rscript_startup An \R expression or a character vector of \R code,
416#' or a list with a mix of these, that will be evaluated on the \R worker
417#' prior to launching the worker's event loop.
418#' For instance, use `rscript_startup = 'setwd("/path/to")'`
419#' to set the working directory to \file{/path/to} on _all_ workers.
420#'
421#' @param rscript_sh The type of shell used where `rscript` is launched,
422#' which should be `"sh"` is launched via a POSIX shell and `"cmd"` if
423#' launched via an MS Windows shell.  This controls how shell command-line
424#' options are quoted, via
425#' \code{\link[base:shQuote]{shQuote(..., type = rscript_sh)}}.
426#' If `"auto"` (default), and the cluster node is launched locally, then it
427#' is set to `"sh"` or `"cmd"` according to the current platform.  If launched
428#' remotely, then it is set to `"sh"` based on the assumption remote machines
429#' typically launch commands via SSH in a POSIX shell.
430#'
431#' @param default_packages A character vector or NULL that controls which R
432#' packages are attached on each cluster node during startup.  An asterisk
433#' (`"*"`) resolves to `getOption("defaultPackages")` _on the current machine_.
434#' If NULL, then the default set of packages R are attached.
435#'
436#' @param methods If TRUE (default), then the \pkg{methods} package is also
437#' loaded. This is argument exists for legacy reasons due to how
438#' \command{Rscript} worked in R (< 3.5.0).
439#'
440#' @param useXDR If FALSE (default), the communication between master and workers, which is binary, will use small-endian (faster), otherwise big-endian ("XDR"; slower).
441#'
442#' @param socketOptions A character string that sets \R option
443#' \option{socketOptions} on the worker.
444#'
445#' @param outfile Where to direct the \link[base:showConnections]{stdout} and
446#' \link[base:showConnections]{stderr} connection output from the workers.
447#' If NULL, then no redirection of output is done, which means that the
448#' output is relayed in the terminal on the local computer.  On Windows, the
449#' output is only relayed when running \R from a terminal but not from a GUI.
450#'
451#' @param renice A numerical 'niceness' (priority) to set for the worker
452#' processes.
453#'
454#' @param rank A unique one-based index for each worker (automatically set).
455#'
456#' @param rshcmd,rshopts The command (character vector) to be run on the master
457#' to launch a process on another host and any additional arguments (character
458#' vector).  These arguments are only applied if `machine` is not
459#' _localhost_.  For more details, see below.
460#'
461#' @param rshlogfile (optional) If a filename, the output produced by the
462#' `rshcmd` call is logged to this file, of if TRUE, then it is logged
463#' to a temporary file.  The log file name is available as an attribute
464#' as part of the return node object.
465#' _Warning: This only works with SSH clients that support option
466#' `-E out.log`_.  For example, PuTTY's \command{plink} does _not_ support
467#' this option, and any attempts to specify `rshlogfile` will cause the SSH
468#' connection to fail.
469#'
470#' @param user (optional) The user name to be used when communicating with
471#' another host.
472#'
473#' @param revtunnel If TRUE, a reverse SSH tunnel is set up for each worker such
474#' that the worker \R process sets up a socket connection to its local port
475#' `(port - rank + 1)` which then reaches the master on port `port`.
476#' If FALSE, then the worker will try to connect directly to port `port` on
477#' `master`.  For more details, see below.
478#'
479#' @param manual If TRUE the workers will need to be run manually. The command
480#' to run will be displayed.
481#'
482#' @param dryrun If TRUE, nothing is set up, but a message suggesting how to
483#' launch the worker from the terminal is outputted.  This is useful for
484#' troubleshooting.
485#'
486#' @param quiet If TRUE, then no output will be produced other than that from
487#' using `verbose = TRUE`.
488#'
489#' @param setup_strategy If `"parallel"` (default), the workers are set up
490#' concurrently, one after the other.  If `"sequential"`, they are set up
491#' sequentially.
492#'
493#' @param action This is an internal argument.
494#'
495#' @return `makeNodePSOCK()` returns a `"SOCKnode"` or
496#' `"SOCK0node"` object representing an established connection to a worker.
497#'
498#' @section Definition of _localhost_:
499#' A hostname is considered to be _localhost_ if it equals:
500#' \itemize{
501#'   \item `"localhost"`,
502#'   \item `"127.0.0.1"`, or
503#'   \item `Sys.info()[["nodename"]]`.
504#' }
505#' It is also considered _localhost_ if it appears on the same line
506#' as the value of `Sys.info()[["nodename"]]` in file \file{/etc/hosts}.
507#'
508#' @section Default SSH client and options (arguments `rshcmd` and `rshopts`):
509#' Arguments `rshcmd` and `rshopts` are only used when connecting
510#' to an external host.
511#'
512#' The default method for connecting to an external host is via SSH and the
513#' system executable for this is given by argument `rshcmd`.  The default
514#' is given by option \option{parallelly.makeNodePSOCK.rshcmd}.  If that is not
515#' set, then the default is to use \command{ssh} on Unix-like systems,
516#' including macOS as well as Windows 10.  On older MS Windows versions, which
517#' does not have a built-in \command{ssh} client, the default is to use
518#' (i) \command{plink} from the \href{https://www.putty.org/}{\command{PuTTY}}
519#' project, and then (ii) the \command{ssh} client that is distributed with
520#' RStudio.
521#'
522#' PuTTY puts itself on Windows' system \env{PATH} when installed, meaning this
523#' function will find PuTTY automatically if installed.  If not, to manually
524#' set specify PuTTY as the SSH client, specify the absolute pathname of
525#' \file{plink.exe} in the first element and option \command{-ssh} in the
526#' second as in `rshcmd = c("C:/Path/PuTTY/plink.exe", "-ssh")`.
527#' This is because all elements of `rshcmd` are individually "shell"
528#' quoted and element `rshcmd[1]` must be on the system \env{PATH}.
529#'
530#' Furthermore, when running \R from RStudio on Windows, the \command{ssh}
531#' client that is distributed with RStudio will also be considered.
532#' This client, which is from \href{https://osdn.net/projects/mingw/}{MinGW}
533#' MSYS, is searched for in the folder given by the \env{RSTUDIO_MSYS_SSH}
534#' environment variable - a variable that is (only) set when running RStudio.
535#' To use this SSH client outside of RStudio, set \env{RSTUDIO_MSYS_SSH}
536#' accordingly.
537#'
538#' You can override the default set of SSH clients that are searched for
539#' by specifying them in argument `rshcmd` or via option
540#' \option{parallelly.makeNodePSOCK.rshcmd} using the format `<...>`, e.g.
541#' `rshcmd = c("<rstudio-ssh>", "<putty-plink>", "<ssh>")`.  See
542#' below for examples.
543#'
544#' If no SSH-client is found, an informative error message is produced.
545#'
546#' Additional SSH options may be specified via argument `rshopts`, which
547#' defaults to option \option{parallelly.makeNodePSOCK.rshopts}. For instance, a
548#' private SSH key can be provided as
549#' `rshopts = c("-i", "~/.ssh/my_private_key")`.  PuTTY users should
550#' specify a PuTTY PPK file, e.g.
551#' `rshopts = c("-i", "C:/Users/joe/.ssh/my_keys.ppk")`.
552#' Contrary to `rshcmd`, elements of `rshopts` are not quoted.
553#'
554#' @section Accessing external machines that prompts for a password:
555#' _IMPORTANT: With one exception, it is not possible to for these
556#' functions to log in and launch \R workers on external machines that requires
557#' a password to be entered manually for authentication._
558#' The only known exception is the PuTTY client on Windows for which one can
559#' pass the password via command-line option `-pw`, e.g.
560#' `rshopts = c("-pw", "MySecretPassword")`.
561#'
562#' Note, depending on whether you run \R in a terminal or via a GUI, you might
563#' not even see the password prompt.  It is also likely that you cannot enter
564#' a password, because the connection is set up via a background system call.
565#'
566#' The poor man's workaround for setup that requires a password is to manually
567#' log into the each of the external machines and launch the \R workers by hand.
568#' For this approach, use `manual = TRUE` and follow the instructions
569#' which include cut'n'pasteable commands on how to launch the worker from the
570#' external machine.
571#'
572#' However, a much more convenient and less tedious method is to set up
573#' key-based SSH authentication between your local machine and the external
574#' machine(s), as explain below.
575#'
576#' @section Accessing external machines via key-based SSH authentication:
577#' The best approach to automatically launch \R workers on external machines
578#' over SSH is to set up key-based SSH authentication.  This will allow you
579#' to log into the external machine without have to enter a password.
580#'
581#' Key-based SSH authentication is taken care of by the SSH client and not \R.
582#' To configure this, see the manuals of your SSH client or search the web
583#' for "ssh key authentication".
584#'
585#' @section Reverse SSH tunneling:
586#' The default is to use reverse SSH tunneling (`revtunnel = TRUE`) for
587#' workers running on other machines.  This avoids the complication of
588#' otherwise having to configure port forwarding in firewalls, which often
589#' requires static IP address as well as privileges to edit the firewall
590#' on your outgoing router, something most users don't have.
591#' It also has the advantage of not having to know the internal and / or the
592#' public IP address / hostname of the master.
593#' Yet another advantage is that there will be no need for a DNS lookup by the
594#' worker machines to the master, which may not be configured or is disabled
595#' on some systems, e.g. compute clusters.
596#'
597#' @section Argument `rscript`:
598#' If `homogeneous` is FALSE, the `rscript` defaults to `"Rscript"`, i.e. it
599#' is assumed that the \command{Rscript} executable is available on the
600#' \env{PATH} of the worker.
601#' If `homogeneous` is TRUE, the `rscript` defaults to
602#' `file.path(R.home("bin"), "Rscript")`, i.e. it is basically assumed that
603#' the worker and the caller share the same file system and \R installation.
604#'
605#' When specified, argument `rscript` should be a character vector with one or
606#' more elements.  Any asterisk (`"*"`) will be resolved to the above default
607#' `homogeneous`-dependent `Rscript` path.
608#' All elements are automatically shell quoted using [base::shQuote()], except
609#' those that are of format `<ENVVAR>=<VALUE>`, that is, the ones matching the
610#' regular expression '\samp{^[[:alpha:]_][[:alnum:]_]*=.*}'.
611#' Another exception is when `rscript` inherits from 'AsIs'.
612#'
613#' @section Default value of argument `homogeneous`:
614#' The default value of `homogeneous` is TRUE if and only if either
615#' of the following is fulfilled:
616#' \itemize{
617#'  \item `worker` is _localhost_
618#'  \item `revtunnel` is FALSE and `master` is _localhost_
619#'  \item `worker` is neither an IP number nor a fully qualified domain
620#'        name (FQDN).  A hostname is considered to be a FQDN if it contains
621#'        one or more periods
622#' }
623#' In all other cases, `homogeneous` defaults to FALSE.
624#'
625#' @section Connection time out:
626#' Argument `connectTimeout` does _not_ work properly on Unix and
627#' macOS due to limitation in \R itself.  For more details on this, please see
628#' R-devel thread 'BUG?: On Linux setTimeLimit() fails to propagate timeout
629#' error when it occurs (works on Windows)' on 2016-10-26
630#' (\url{https://stat.ethz.ch/pipermail/r-devel/2016-October/073309.html}).
631#' When used, the timeout will eventually trigger an error, but it won't happen
632#' until the socket connection timeout `timeout` itself happens.
633#'
634#' @section Communication time out:
635#' If there is no communication between the master and a worker within the
636#' `timeout` limit, then the corresponding socket connection will be
637#' closed automatically.  This will eventually result in an error in code
638#' trying to access the connection.
639#'
640#' @section Failing to set up local workers:
641#' When setting up a cluster of localhost workers, that is, workers running
642#' on the same machine as the master \R process, occasionally a connection
643#' to a worker ("cluster node") may fail to be set up.
644#' When this occurs, an informative error message with troubleshooting
645#' suggestions will be produced.
646#' The most common reason for such localhost failures is due to port
647#' clashes.  Retrying will often resolve the problem.
648#'
649#' @section Failing to set up remote workers:
650#' A cluster of remote workers runs \R processes on external machines. These
651#' external \R processes are launched over, typically, SSH to the remote
652#' machine.  For this to work, each of the remote machines needs to have
653#' \R installed, which preferably is of the same version as what is on the
654#' main machine.  For this to work, it is required that one can SSH to the
655#' remote machines.  Ideally, the SSH connections use authentication based
656#' on public-private SSH keys such that the set up of the remote workers can
657#' be fully automated (see above).  If `makeClusterPSOCK()` fails to set
658#' up one or more remote \R workers, then an informative error message is
659#' produced.
660#' There are a few reasons for failing to set up remote workers.  If this
661#' happens, start by asserting that you can SSH to the remote machine and
662#' launch \file{Rscript} by calling something like:
663#' \preformatted{
664#' {local}$ ssh -l alice remote.server.org
665#' {remote}$ Rscript --version
666#' R scripting front-end version 3.6.1 (2019-07-05)
667#' {remote}$ logout
668#' {local}$
669#' }
670#' When you have confirmed the above to work, then confirm that you can achieve
671#' the same in a single command-line call;
672#' \preformatted{
673#' {local}$ ssh -l alice remote.server.org Rscript --version
674#' R scripting front-end version 3.6.1 (2019-07-05)
675#' {local}$
676#' }
677#' The latter will assert that you have proper startup configuration also for
678#' _non-interactive_ shell sessions on the remote machine.
679#'
680#' Another reason for failing to setup remote workers could be that they are
681#' running an \R version that is not compatible with the version that your main
682#' \R session is running.  For instance, if we run R (>= 3.6.0) locally and the
683#' workers run R (< 3.5.0), we will get:
684#' `Error in unserialize(node$con) : error reading from connection`.
685#' This is because R (>= 3.6.0) uses serialization format version 3 by default
686#' whereas R (< 3.5.0) only supports version 2.  We can see the version of the
687#' \R workers by adding `rscript_args = c("-e", shQuote("getRversion()"))` when
688#' calling `makeClusterPSOCK()`.
689#'
690#' @rdname makeClusterPSOCK
691#' @importFrom tools pskill
692#' @importFrom utils flush.console
693#' @export
694makeNodePSOCK <- function(worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = TRUE, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), action = c("launch", "options"), verbose = FALSE) {
695  verbose <- as.logical(verbose)
696  stop_if_not(length(verbose) == 1L, !is.na(verbose))
697
698  if (inherits(worker, "makeNodePSOCKOptions")) {
699    return(launchNodePSOCK(options = worker, verbose = verbose))
700  }
701
702  localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost")
703  localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1"))
704
705  ## Could it be that the worker specifies the name of the localhost?
706  ## Note, this approach preserves worker == "127.0.0.1" if that is given.
707  if (!localMachine) {
708    localMachine <- is_localhost(worker)
709    if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost")
710  }
711  attr(worker, "localhost") <- localMachine
712
713  stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, !anyNA(rscript_sh))
714  rscript_sh <- rscript_sh[1]
715  if (rscript_sh == "auto") {
716    if (localMachine) {
717      rscript_sh <- if (.Platform$OS.type == "windows") "cmd" else "sh"
718    } else {
719      ## Assume remote machine uses as POSIX shell
720      rscript_sh <- "sh"
721    }
722  }
723
724  manual <- as.logical(manual)
725  stop_if_not(length(manual) == 1L, !is.na(manual))
726
727  dryrun <- as.logical(dryrun)
728  stop_if_not(length(dryrun) == 1L, !is.na(dryrun))
729
730  setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel"))
731
732  quiet <- as.logical(quiet)
733  stop_if_not(length(quiet) == 1L, !is.na(quiet))
734
735  ## Locate a default SSH client?
736  if (identical(rshcmd, "")) rshcmd <- NULL
737  if (!is.null(rshcmd)) {
738    rshcmd <- as.character(rshcmd)
739    stop_if_not(length(rshcmd) >= 1L)
740  }
741
742  if (identical(rshopts, "")) rshopts <- NULL
743  rshopts <- as.character(rshopts)
744
745  user <- as.character(user)
746  stop_if_not(length(user) <= 1L)
747
748  port <- as.integer(port)
749  assertPort(port)
750
751  revtunnel <- as.logical(revtunnel)
752  stop_if_not(length(revtunnel) == 1L, !is.na(revtunnel))
753
754  if (!is.null(rshlogfile)) {
755    if (is.logical(rshlogfile)) {
756      stop_if_not(!is.na(rshlogfile))
757      if (rshlogfile) {
758        rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log")
759      } else {
760        rshlogfile <- NULL
761      }
762    } else {
763      rshlogfile <- as.character(rshlogfile)
764      rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE)
765    }
766  }
767
768  if (is.null(master)) {
769    if (localMachine || revtunnel) {
770      master <- localhostHostname
771    } else {
772      master <- Sys.info()[["nodename"]]
773    }
774  }
775  stop_if_not(!is.null(master))
776
777  timeout <- as.numeric(timeout)
778  stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0)
779
780  ## FIXME: This is really legacy code there. It stems from R (< 3.5.0), where
781  ## 'Rscript' did *not* attach the 'methods' package by default, whereas 'R'
782  ## did.  Since R 3.5.0, 'R' and 'Rscript' attach the same set of packages.
783  methods <- as.logical(methods)
784  stop_if_not(length(methods) == 1L, !is.na(methods))
785
786  if (!is.null(default_packages)) {
787    default_packages <- as.character(default_packages)
788    stop_if_not(!anyNA(default_packages))
789    is_asterisk <- (default_packages == "*")
790    if (any(is_asterisk)) {
791      pkgs <- getOption("defaultPackages")
792      if (length(pkgs) == 0) {
793        default_packages[!is_asterisk]
794      } else {
795        pkgs <- paste(pkgs, collapse=",")
796        default_packages[is_asterisk] <- pkgs
797        default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE))
798      }
799    }
800    default_packages <- unique(default_packages)
801    pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name)
802    invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE)
803    if (length(invalid) > 0) {
804      stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", ")))
805    }
806  }
807
808  if (is.null(homogeneous)) {
809    homogeneous <- {
810      localMachine ||
811      (!revtunnel && is_localhost(master)) ||
812      (!is_ip_number(worker) && !is_fqdn(worker))
813    }
814  }
815  homogeneous <- as.logical(homogeneous)
816  stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous))
817
818  ## Is a parallel setup strategy possible?
819  if (setup_strategy == "parallel") {
820    if (getRversion() < "4.0.0" ||
821        manual || dryrun || !homogeneous || !localMachine) {
822      setup_strategy <- "sequential"
823    }
824  }
825
826  bin <- "Rscript"
827  if (homogeneous) bin <- file.path(R.home("bin"), bin)
828  if (is.null(rscript)) {
829    rscript <- bin
830  } else {
831    if (!is.character(rscript)) rscript <- as.character(rscript)
832    stop_if_not(length(rscript) >= 1L)
833    rscript[rscript == "*"] <- bin
834    bin <- rscript[1]
835    if (homogeneous && !inherits(bin, "AsIs")) {
836      bin <- Sys.which(bin)
837      if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE)
838      rscript[1] <- bin
839    }
840  }
841
842  ## Is rscript[1] referring to Rscript, or R/Rterm?
843  name <- sub("[.]exe$", "", basename(bin))
844  is_Rscript <- (tolower(name) == "rscript")
845
846  rscript_args <- as.character(rscript_args)
847
848  if (length(rscript_startup) > 0L) {
849    if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup)
850    rscript_startup <- lapply(rscript_startup, FUN = function(init) {
851      if (is.language(init)) {
852        init <- deparse(init, width.cutoff = 500L)
853        ## We cannot use newline between statements because
854        ## it needs to be passed as a one line string via -e <code>
855        init <- paste(init, collapse = ";")
856      }
857      init <- as.character(init)
858      if (length(init) == 0L) return(NULL)
859      tryCatch({
860        parse(text = init)
861      }, error = function(ex) {
862        stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex))
863      })
864      init
865    })
866    rscript_startup <- unlist(rscript_startup, use.names = FALSE)
867  }
868
869  if (!is.null(rscript_libs)) {
870    rscript_libs <- as.character(rscript_libs)
871    stop_if_not(!anyNA(rscript_libs))
872  }
873
874  useXDR <- as.logical(useXDR)
875  stop_if_not(length(useXDR) == 1L, !is.na(useXDR))
876
877  if (!is.null(socketOptions)) {
878    stop_if_not(is.character(socketOptions),length(socketOptions) == 1L,
879                !is.na(socketOptions), nzchar(socketOptions))
880    if (socketOptions == "NULL") socketOptions <- NULL
881  }
882
883  stop_if_not(is.null(outfile) || is.character(outfile))
884
885  renice <- as.integer(renice)
886  stop_if_not(length(renice) == 1L)
887
888  rank <- as.integer(rank)
889  stop_if_not(length(rank) == 1L, !is.na(rank))
890
891  action <- match.arg(action, choices = c("launch", "options"))
892
893  verbose_prefix <- "[local output] "
894
895  ## Shell quote the Rscript executable?
896  if (!inherits(rscript, "AsIs")) {
897    idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE)
898    rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh)
899  }
900
901  rscript_args_internal <- character(0L)
902
903  ## Can we get the worker's PID during launch?
904  if (localMachine && !dryrun) {
905    res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh, verbose = verbose)
906    pidfile <- res$pidfile
907    rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal)
908  } else {
909    pidfile <- NULL
910  }
911
912  ## Add Rscript "label"?
913  rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL)
914  if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) {
915    if (isTRUE(as.logical(rscript_label))) {
916      script <- grep("[.]R$", commandArgs(), value = TRUE)[1]
917      if (is.na(script)) script <- "UNKNOWN"
918      rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]])
919    }
920    rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh), rscript_args_internal)
921  }
922
923  ## In contrast to default_packages=character(0), default_packages = NULL
924  ## skips --default-packages/R_DEFAULT_PACKAGES completely.
925  if (!is.null(default_packages)) {
926    pkgs <- paste(unique(default_packages), collapse = ",")
927    if (is_Rscript) {
928      arg <- sprintf("--default-packages=%s", pkgs)
929      rscript_args_internal <- c(arg, rscript_args_internal)
930    } else {
931      ## FIXME: Make 'rscript_envs' work this way so they are applied sooner
932      ## in the R startup process, instead via -e 'Sys.setenv(FOO="1")'.
933      arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs)
934
935      ## Is the cluster node launched in a MS Windows machine?
936      on_MSWindows <- (rscript_sh %in% c("cmd", "cmd2"))
937      if (on_MSWindows) {
938        ## On MS Windows, we have to use special '/path/to/R FOO=1 ...'
939        rscript_args <- c(arg, rscript_args)
940      } else {
941        ## Everywhere else, we can use 'FOO=1 /path/to/R ...'
942        rscript <- c(arg, rscript)
943      }
944    }
945  }
946
947  ## Port that the Rscript should use to connect back to the master
948  if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) {
949    rscript_port <- assertPort(port + (rank - 1L))
950    if (verbose) {
951      mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank-1L, rscript_port)
952    }
953  } else {
954    rscript_port <- port
955    if (verbose) {
956      mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port)
957    }
958  }
959
960  if (length(socketOptions) == 1L) {
961    code <- sprintf("options(socketOptions = \"%s\")", socketOptions)
962    rscript_expr <- c("-e", shQuote(code, type = rscript_sh))
963    rscript_args_internal <- c(rscript_args_internal, rscript_expr)
964  }
965
966  if (length(rscript_startup) > 0L) {
967    rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "")
968    rscript_startup <- shQuote(rscript_startup, type = rscript_sh)
969    rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value))
970    rscript_startup <- unlist(rscript_startup, use.names = FALSE)
971    rscript_args_internal <- c(rscript_args_internal, rscript_startup)
972  }
973
974  if (length(rscript_envs) > 0L) {
975    names <- names(rscript_envs)
976    if (is.null(names)) {
977      copy <- seq_along(rscript_envs)
978    } else {
979      copy <- which(nchar(names) == 0L)
980    }
981    if (length(copy) > 0L) {
982      missing <- NULL
983      for (idx in copy) {
984        name <- rscript_envs[idx]
985        if (!nzchar(name)) {
986          stop("Argument 'rscript_envs' contains an empty non-named environment variable")
987        }
988        value <- Sys.getenv(name, NA_character_)
989        if (!is.na(value)) {
990          rscript_envs[idx] <- value
991          names(rscript_envs)[idx] <- name
992        } else {
993          missing <- c(missing, name)
994        }
995      }
996      if (length(missing) > 0L) {
997        warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", "))
998      }
999      names <- names(rscript_envs)
1000      rscript_envs <- rscript_envs[nzchar(names)]
1001      names <- names(rscript_envs)
1002    }
1003    ## Any environment variables to unset?
1004    if (length(unset <- which(is.na(rscript_envs))) > 0L) {
1005      names <- names(rscript_envs[unset])
1006      code <- sprintf("\"%s\"", names)
1007      code <- paste(code, collapse = ", ")
1008      code <- paste0("Sys.unsetenv(c(", code, "))")
1009      tryCatch({
1010        parse(text = code)
1011      }, error = function(ex) {
1012        stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", "))
1013      })
1014      rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh))
1015      rscript_envs <- rscript_envs[-unset]
1016      names <- names(rscript_envs)
1017    }
1018
1019    ## Any environment variables to set?
1020    if (length(names) > 0L) {
1021      code <- sprintf('"%s"="%s"', names, rscript_envs)
1022      code <- paste(code, collapse = ", ")
1023      code <- paste0("Sys.setenv(", code, ")")
1024      tryCatch({
1025        parse(text = code)
1026      }, error = function(ex) {
1027        stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ", "))
1028      })
1029      rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh))
1030    }
1031  }
1032
1033  if (length(rscript_libs) > 0L) {
1034    ## Make sure to preserve backslashes, e.g. in Windows network drives
1035    rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE)
1036    code <- paste0('"', rscript_libs, '"')
1037    code[rscript_libs == "*"] <- ".libPaths()"
1038    code <- paste(code, collapse = ",")
1039    code <- paste0('.libPaths(c(', code, '))')
1040    tryCatch({
1041      parse(text = code)
1042    }, error = function(ex) {
1043      stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", "))
1044    })
1045    rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh))
1046  }
1047
1048  ## .{slave,work}RSOCK() command already specified?
1049  if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) {
1050    ## In R (>= 4.1.0), parallel:::.slaveRSOCK() was renamed to .workRSOCK()
1051    cmd <- "workRSOCK <- tryCatch(parallel:::.workRSOCK, error=function(e) parallel:::.slaveRSOCK); workRSOCK()"
1052    rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh))
1053  }
1054
1055  ## Append or inject rscript_args_internal?
1056  idx <- which(rscript_args == "*")
1057  if (length(idx) == 0L) {
1058    rscript_args <- c(rscript_args, rscript_args_internal)
1059  } else if (length(idx) == 1L) {
1060    n <- length(rscript_args)
1061    if (idx == 1L) {
1062      rscript_args <- c(rscript_args_internal, rscript_args[-1])
1063    } else if (idx == n) {
1064      rscript_args <- c(rscript_args[-n], rscript_args_internal)
1065    } else {
1066      rscript_args <- c(rscript_args[1:(idx-1)], rscript_args_internal,
1067                        rscript_args[(idx+1):n])
1068    }
1069  } else {
1070    stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " ")))
1071  }
1072
1073  rscript <- paste(rscript, collapse = " ")
1074  rscript_args <- paste(rscript_args, collapse = " ")
1075  envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR,
1076                    " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy)
1077
1078  cmd <- paste(rscript, rscript_args, envvars)
1079
1080  ## Renice?
1081  if (!is.na(renice) && renice > 0L) {
1082    cmd <- sprintf("nice --adjustment=%d %s", renice, cmd)
1083  }
1084
1085  if (!localMachine) {
1086    ## Find default SSH client
1087    find <- is.null(rshcmd)
1088    if (find) {
1089      which <- NULL
1090      if (verbose) {
1091        mdebugf("%sWill search for all 'rshcmd' available\n",
1092                verbose_prefix)
1093      }
1094    } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) {
1095      find <- TRUE
1096      if (verbose) {
1097        mdebugf("%sWill search for specified 'rshcmd' types: %s\n",
1098                verbose_prefix, paste(sQuote(rshcmd), collapse = ", "))
1099      }
1100      which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd)
1101    }
1102
1103    if (find) {
1104      rshcmd <- find_rshcmd(which = which,
1105                            must_work = !localMachine && !manual && !dryrun)
1106      if (verbose) {
1107        s <- unlist(lapply(rshcmd, FUN = function(r) {
1108          sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version")))
1109        }))
1110        s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n")
1111        mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s)
1112      }
1113      rshcmd <- rshcmd[[1]]
1114    } else {
1115      if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- "<unknown>"
1116      if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "<unknown>"
1117    }
1118
1119    ## Holds a pathname with an optional set of command-line options
1120    stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L)
1121
1122    s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version")))
1123    rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s)
1124
1125    if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label)
1126
1127    ## User?
1128    if (length(user) == 1L) rshopts <- c("-l", user, rshopts)
1129
1130    ## Reverse tunneling?
1131    if (revtunnel) {
1132      ## WORKAROUND: The Windows 10 loopback resolution uses IPv6 by default
1133      ## and the server is not listening for "localhost".  The solution is
1134      ## to use "127.0.0.1" instead, or force IPv4 by using ssh option '-4'.
1135      ## For more details, see
1136      ## https://github.com/PowerShell/Win32-OpenSSH/issues/1265#issuecomment-855234326 for
1137      if (is_localhost(master) && .Platform$OS.type == "windows" && (
1138           isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) ||
1139           basename(rshcmd[1]) == "ssh"
1140         )) {
1141        master <- "127.0.0.1"
1142      }
1143      rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts)
1144    }
1145
1146    ## SSH log file?
1147    if (is.character(rshlogfile)) {
1148      rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts)
1149    }
1150
1151    rshopts <- paste(rshopts, collapse = " ")
1152
1153    ## Local commands
1154    rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker)
1155    local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh))
1156  } else {
1157    rshcmd_label <- NULL
1158    rsh_call <- NULL
1159    local_cmd <- cmd
1160  }
1161  stop_if_not(length(local_cmd) == 1L)
1162
1163  options <- structure(list(
1164    local_cmd = local_cmd,
1165    worker = worker,
1166    rank = rank,
1167    rshlogfile = rshlogfile,
1168    port = port,
1169    connectTimeout = connectTimeout,
1170    timeout = timeout,
1171    useXDR = useXDR,
1172    pidfile = pidfile,
1173    setup_strategy = setup_strategy,
1174    ## For messages, warnings, and errors:
1175    outfile = outfile,
1176    rshcmd_label = rshcmd_label,
1177    rsh_call = rsh_call,
1178    cmd = cmd,
1179    localMachine = localMachine,
1180    manual = manual,
1181    dryrun = dryrun,
1182    quiet = quiet,
1183    rshcmd = rshcmd,
1184    revtunnel = revtunnel
1185  ), class = c("makeNodePSOCKOptions", "makeNodeOptions"))
1186
1187  ## Return options?
1188  if (action == "options") return(options)
1189
1190  launchNodePSOCK(options, verbose = verbose)
1191}
1192
1193
1194launchNodePSOCK <- function(options, verbose = FALSE) {
1195  stop_if_not(inherits(options, "makeNodePSOCKOptions"))
1196
1197  local_cmd <- options[["local_cmd"]]
1198  worker <- options[["worker"]]
1199  rank <- options[["rank"]]
1200  rshlogfile <- options[["rshlogfile"]]
1201  port <- options[["port"]]
1202  connectTimeout <- options[["connectTimeout"]]
1203  timeout <- options[["timeout"]]
1204  pidfile <- options[["pidfile"]]
1205  ## For messages, warnings, and errors"]]
1206  useXDR <- options[["useXDR"]]
1207  outfile <- options[["outfile"]]
1208  rshcmd_label <- options[["rshcmd_label"]]
1209  rsh_call <- options[["rsh_call"]]
1210  cmd <- options[["cmd"]]
1211  localMachine <- options[["localMachine"]]
1212  manual <- options[["manual"]]
1213  dryrun <- options[["dryrun"]]
1214  quiet <- options[["quiet"]]
1215  rshcmd <- options[["rshcmd"]]
1216  revtunnel <- options[["revtunnel"]]
1217  setup_strategy <- options[["setup_strategy"]]
1218
1219  if (setup_strategy == "parallel") {
1220    stop("INTERNAL ERROR: launchNodePSOCK() called with setup_strategy='parallel', which should never occur")
1221  }
1222
1223  verbose <- as.logical(verbose)
1224  stop_if_not(length(verbose) == 1L, !is.na(verbose))
1225
1226  verbose_prefix <- "[local output] "
1227
1228  is_worker_output_visible <- is.null(outfile)
1229
1230  if (manual || dryrun) {
1231    if (!quiet) {
1232      msg <- c("----------------------------------------------------------------------")
1233      if (localMachine) {
1234        msg <- c(msg, sprintf("Manually, start worker #%s on local machine %s with:", rank, sQuote(worker)), sprintf("\n  %s\n", cmd))
1235      } else {
1236        msg <- c(msg, sprintf("Manually, (i) login into external machine %s:", sQuote(worker)),
1237                 sprintf("\n  %s\n", rsh_call))
1238        msg <- c(msg, sprintf("and (ii) start worker #%s from there:", rank),
1239                 sprintf("\n  %s\n", cmd))
1240        msg <- c(msg, sprintf("Alternatively, start worker #%s from the local machine by combining both step in a single call:", rank),
1241                 sprintf("\n  %s\n", local_cmd))
1242      }
1243      msg <- paste(c(msg, ""), collapse = "\n")
1244      cat(msg)
1245      flush.console()
1246    }
1247    if (dryrun) return(NULL)
1248  } else {
1249    if (verbose) {
1250      mdebugf("%sStarting worker #%s on %s: %s", verbose_prefix, rank, sQuote(worker), local_cmd)
1251    }
1252    input <- if (.Platform$OS.type == "windows") "" else NULL
1253    res <- system(local_cmd, wait = FALSE, input = input)
1254    if (verbose) {
1255      mdebugf("%s- Exit code of system() call: %s", verbose_prefix, res)
1256    }
1257    if (res != 0) {
1258      warnf("system(%s) had a non-zero exit code: %d", local_cmd, res)
1259    }
1260  }
1261
1262  if (verbose) {
1263    mdebugf("%sWaiting for worker #%s on %s to connect back", verbose_prefix, rank, sQuote(worker))
1264    if (is_worker_output_visible) {
1265      if (.Platform$OS.type == "windows") {
1266        mdebugf("%s- Detected 'outfile=NULL' on Windows: this will make the output from the background worker visible when running R from a terminal, but it will most likely not be visible when using a GUI.", verbose_prefix)
1267      } else {
1268        mdebugf("%s- Detected 'outfile=NULL': this will make the output from the background worker visible", verbose_prefix)
1269      }
1270    }
1271  }
1272
1273
1274  con <- local({
1275     ## Apply connection time limit "only to the rest of the current computation".
1276     ## NOTE: Regardless of transient = TRUE / FALSE, it still seems we need to
1277     ##       undo it manually :/  /HB 2016-11-05
1278     setTimeLimit(elapsed = connectTimeout)
1279     on.exit(setTimeLimit(elapsed = Inf))
1280
1281     localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost")
1282     warnings <- list()
1283     tryCatch({
1284       withCallingHandlers({
1285         socketConnection(localhostHostname, port = port, server = TRUE,
1286                          blocking = TRUE, open = "a+b", timeout = timeout)
1287       }, warning = function(w) {
1288         if (verbose) {
1289           mdebugf("%sDetected a warning from socketConnection(): %s", verbose_prefix, sQuote(conditionMessage(w)))
1290         }
1291         warnings <<- c(warnings, list(w))
1292       })
1293     }, error = function(ex) {
1294       setTimeLimit(elapsed = Inf)
1295
1296       ## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1297       ## Post-mortem analysis
1298       ## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1299       machineType <- if (localMachine) "local" else "remote"
1300       msg <- sprintf("Failed to launch and connect to R worker on %s machine %s from local machine %s.\n", machineType, sQuote(worker), sQuote(Sys.info()[["nodename"]]))
1301
1302       ## Inspect and report on the error message
1303       cmsg <- conditionMessage(ex)
1304       if (grepl(gettext("reached elapsed time limit"), cmsg)) {
1305         msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s (which suggests that the connection timeout of %.0f seconds (argument 'connectTimeout') kicked in)\n", sQuote(cmsg), connectTimeout))
1306       } else {
1307         msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s\n", sQuote(cmsg)))
1308       }
1309
1310       ## Inspect and report on any warnings
1311       if (length(warnings) > 0) {
1312         msg <- c(msg, sprintf(" * In addition, socketConnection() produced %d warning(s):\n", length(warnings)))
1313         for (kk in seq_along(warnings)) {
1314           cmsg <- conditionMessage(warnings[[kk]])
1315           if (grepl("port [0-9]+ cannot be opened", cmsg)) {
1316             msg <- c(msg, sprintf("   - Warning #%d: %s (which suggests that this port is either already occupied by another process or blocked by the firewall on your local machine)\n", kk, sQuote(cmsg)))
1317           } else {
1318             msg <- c(msg, sprintf("   - Warning #%d: %s\n", kk, sQuote(cmsg)))
1319           }
1320         }
1321       }
1322
1323       ## Report on how the local socket connect was setup
1324       msg <- c(msg, sprintf(" * The localhost socket connection that failed to connect to the R worker used port %d using a communication timeout of %.0f seconds and a connection timeout of %.0f seconds.\n", port, timeout, connectTimeout))
1325
1326       ## Report on how the worker was launched
1327       msg <- c(msg, sprintf(" * Worker launch call: %s.\n", local_cmd))
1328
1329       ## Do we know the PID of the worker? If so, try to kill it to avoid
1330       ## leaving a stray process behind
1331       ## Comment: readWorkerPID() must be done *after* socketConnection()
1332       ## on R 3.4.4, otherwise socketConnection() will fail. Not sure why.
1333       ## /HB 2019-01-24
1334       pid <- readWorkerPID(pidfile)
1335       if (!is.null(pid)) {
1336         if (verbose) mdebugf("Killing worker process (PID %d) if still alive", pid)
1337         ## WARNING: pid_kill() calls pid_exists() [twice] and on Windows
1338         ## pid_exists() uses system('tasklist') which can be very very slow
1339         ## /HB 2019-01-24
1340         success <- pid_kill(pid)
1341         if (verbose) mdebugf("Worker (PID %d) was successfully killed: %s", pid, success)
1342         msg <- c(msg, sprintf(" * Worker (PID %d) was successfully killed: %s\n", pid, success))
1343       } else if (localMachine) {
1344         msg <- c(msg, sprintf(" * Failed to kill local worker because it's PID is could not be identified.\n"))
1345       }
1346
1347       ## Propose further troubleshooting methods
1348       suggestions <- NULL
1349
1350       ## Enable verbose=TRUE?
1351       if (!verbose) {
1352         suggestions <- c(suggestions, "Set 'verbose=TRUE' to see more details.")
1353       }
1354
1355       ## outfile=NULL?
1356       if (.Platform$OS.type == "windows") {
1357         if (is_worker_output_visible) {
1358           suggestions <- c(suggestions, "On Windows, to see output from worker, set 'outfile=NULL' and run R from a terminal (not a GUI).")
1359         } else {
1360           suggestions <- c(suggestions, "On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).")
1361         }
1362       } else {
1363         if (!is_worker_output_visible) {
1364           suggestions <- c(suggestions, "Set 'outfile=NULL' to see output from worker.")
1365         }
1366       }
1367
1368       ## Log file?
1369       if (is.character(rshlogfile)) {
1370         smsg <- sprintf("Inspect the content of log file %s for %s.", sQuote(rshlogfile), paste(sQuote(rshcmd), collapse = " "))
1371         lmsg <- tryCatch(readLines(rshlogfile, n = 15L, warn = FALSE), error = function(ex) NULL)
1372         if (length(lmsg) > 0) {
1373           lmsg <- sprintf("     %2d: %s", seq_along(lmsg), lmsg)
1374           smsg <- sprintf("%s The first %d lines are:\n%s", smsg, length(lmsg), paste(lmsg, collapse = "\n"))
1375         }
1376         suggestions <- c(suggestions, smsg)
1377       } else {
1378         suggestions <- c(suggestions, sprintf("Set 'rshlogfile=TRUE' to enable logging for %s.", paste(sQuote(rshcmd), collapse = " ")))
1379       }
1380
1381       ## Special: Windows 10 ssh client may not support reverse tunneling. /2018-11-10
1382       ## https://github.com/PowerShell/Win32-OpenSSH/issues/1265
1383       if (!localMachine && revtunnel && isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) {
1384         suggestions <- c(suggestions, sprintf("The 'rshcmd' (%s) used may not support reverse tunneling (revtunnel = TRUE). See ?parallelly::makeClusterPSOCK for alternatives.\n", rshcmd_label))
1385       }
1386
1387       if (length(suggestions) > 0) {
1388         suggestions <- sprintf("   - Suggestion #%d: %s\n", seq_along(suggestions), suggestions)
1389         msg <- c(msg, " * Troubleshooting suggestions:\n", suggestions)
1390       }
1391
1392       msg <- paste(msg, collapse = "")
1393       ex$message <- msg
1394
1395       ## Re-signal as an PSOCKConnectionError error
1396       class(ex) <- c("PSOCKConnectionError", class(ex))
1397
1398       ## Relay error and temporarily avoid truncating the error message
1399       ## in case it is too long
1400       local({
1401         oopts <- options(warning.length = 2000L)
1402         on.exit(options(oopts))
1403         stop(ex)
1404       })
1405     })
1406  })
1407  setTimeLimit(elapsed = Inf)
1408
1409  if (verbose) {
1410    mdebugf("%sConnection with worker #%s on %s established", verbose_prefix, rank, sQuote(worker))
1411  }
1412
1413  structure(list(con = con, host = worker, rank = rank, rshlogfile = rshlogfile),
1414            class = c("RichSOCKnode", if (useXDR) "SOCKnode" else "SOCK0node"))
1415} ## makeNodePSOCK()
1416
1417
1418## Checks if a given worker is the same as the localhost.  It is, iff:
1419##
1420## * worker == "localhost"
1421## * worker == "127.0.0.1"
1422## * worker == hostname
1423## * worker and hostname appears on the same line in /etc/hosts
1424##
1425## This should cover cases such as:
1426## * Calling is_localhost("n3") from machine n3
1427## * Calling is_localhost("n3.myserver.org") from machine n3[.myserver.org]
1428##
1429## References:
1430## * https://en.wikipedia.org/wiki/Hostname
1431#' @importFrom utils file_test
1432is_localhost <- local({
1433  localhosts <- c("localhost", "127.0.0.1")
1434  non_localhosts <- character(0L)
1435
1436  function(worker, hostname = Sys.info()[["nodename"]], pathnames = "/etc/hosts") {
1437    ## INTERNAL: Clear list of known local hosts?
1438    if (is.null(worker) && is.null(hostname)) {
1439      localhosts <<- c("localhost", "127.0.0.1")
1440      non_localhosts <<- character(0L)
1441      return(NA)
1442    }
1443
1444    stop_if_not(length(worker) == 1, length(hostname) == 1)
1445
1446    ## Already known to a localhost or not to one?
1447    if (worker %in% localhosts) return(TRUE)
1448    if (worker %in% non_localhosts) return(FALSE)
1449
1450    if (worker == hostname) {
1451      ## Add worker to the list of known local hosts.
1452      localhosts <<- unique(c(localhosts, worker))
1453      return(TRUE)
1454    }
1455
1456    alias <- getOption2("parallelly.localhost.hostname")
1457    if (is.character(alias) && worker == alias) {
1458      ## Add worker to the list of known local hosts.
1459      localhosts <<- unique(c(localhosts, worker))
1460      return(TRUE)
1461    }
1462
1463    ## Scan known "hosts" files
1464    pathnames <- pathnames[file_test("-f", pathnames)]
1465    if (length(pathnames) == 0L) return(FALSE)
1466
1467    ## Search for (hostname, worker) and (worker, hostname)
1468    ## occuring on the same line and are separates by one or
1469    ## more whitespace symbols (but nothing else).
1470    pattern <- sprintf("^((|.*[[:space:]])%s[[:space:]]+%s([[:space:]]+|)|(|.*[[:space:]])%s[[:space:]]+%s([[:space:]]+|))$", hostname, worker, worker, hostname)
1471
1472    for (pathname in pathnames) {
1473      bfr <- readLines(pathname, warn = FALSE)
1474      if (any(grepl(pattern, bfr, ignore.case = TRUE))) {
1475        ## Add worker to the list of known local hosts.
1476        localhosts <<- unique(c(localhosts, worker))
1477        return(TRUE)
1478      }
1479    }
1480
1481    ## Add worker to the list of known non-local hosts.
1482    non_localhosts <<- unique(c(non_localhosts, worker))
1483
1484    FALSE
1485  }
1486}) ## is_localhost()
1487
1488
1489## Checks if a worker is specified by its IP number.
1490is_ip_number <- function(worker) {
1491  ip <- strsplit(worker, split = ".", fixed = TRUE)[[1]]
1492  if (length(ip) != 4) return(FALSE)
1493  ip <- as.integer(ip)
1494  if (anyNA(ip)) return(FALSE)
1495  all(0 <= ip & ip <= 255)
1496}
1497
1498## Checks if a worker is specified as a fully qualified domain name (FQDN)
1499is_fqdn <- function(worker) {
1500  grepl(".", worker, fixed = TRUE)
1501}
1502
1503
1504#' Search for SSH clients on the current system
1505#'
1506#' @param which A character vector specifying which types of SSH clients
1507#' to search for.  If NULL, a default set of clients supported by the
1508#' current platform is searched for.
1509#'
1510#' @param first If TRUE, the first client found is returned, otherwise
1511#' all located clients are returned.
1512#'
1513#' @param must_work If TRUE and no clients was found, then an error
1514#' is produced, otherwise only a warning.
1515#'
1516#' @return A named list of pathnames to all located SSH clients.
1517#' The pathnames may be followed by zero or more command-line options,
1518#' i.e. the elements of the returned list are character vectors of length
1519#' one or more.
1520#' If `first = TRUE`, only the first one is returned.
1521#' Attribute `version` contains the output from querying the
1522#' executable for its version (via command-line option `-V`).
1523#'
1524#' @keywords internal
1525find_rshcmd <- function(which = NULL, first = FALSE, must_work = TRUE) {
1526  query_version <- function(bin, args = "-V") {
1527    v <- suppressWarnings(system2(bin, args = args, stdout = TRUE, stderr = TRUE))
1528    v <- paste(v, collapse = "; ")
1529    stop_if_not(length(v) == 1L)
1530    v
1531  }
1532
1533  find_rstudio_ssh <- function() {
1534    path <- Sys.getenv("RSTUDIO_MSYS_SSH")
1535    if (!file_test("-d", path)) return(NULL)
1536    path <- normalizePath(path)
1537    path_org <- Sys.getenv("PATH")
1538    on.exit(Sys.setenv(PATH = path_org))
1539
1540    ## Set PATH to only look in RSTUDIO_MSYS_SSH to avoid
1541    ## picking up other clients with the same name
1542    ## Comment: In RStudio, RSTUDIO_MSYS_SSH is appended
1543    ## to the PATH, see PATH in 'Tools -> Shell ...'.
1544    Sys.setenv(PATH = path)
1545    bin <- Sys.which("ssh")
1546    if (!nzchar(bin)) return(NULL)
1547    attr(bin, "type") <- "rstudio-ssh"
1548    attr(bin, "version") <- query_version(bin, args = "-V")
1549    bin
1550  }
1551
1552  find_putty_plink <- function() {
1553    bin <- Sys.which("plink")
1554    if (!nzchar(bin)) return(NULL)
1555    res <- c(bin, "-ssh")
1556    attr(res, "type") <- "putty-plink"
1557    attr(res, "version") <- query_version(bin, args = "-V")
1558    res
1559  }
1560
1561  find_ssh <- function() {
1562    bin <- Sys.which("ssh")
1563    if (!nzchar(bin)) return(NULL)
1564    attr(bin, "type") <- "ssh"
1565    v <- query_version(bin, args = "-V")
1566    attr(bin, "version") <- v
1567    if (any(grepl("OpenSSH_for_Windows", v)))
1568      attr(bin, "OpenSSH_for_Windows") <- TRUE
1569    bin
1570  }
1571
1572  if (!is.null(which)) stop_if_not(is.character(which), length(which) >= 1L, !anyNA(which))
1573  stop_if_not(is.logical(first), length(first) == 1L, !is.na(first))
1574  stop_if_not(is.logical(must_work), length(must_work) == 1L, !is.na(must_work))
1575
1576  if (is.null(which)) {
1577    if (.Platform$OS.type == "windows") {
1578      which <- c("ssh", "putty-plink", "rstudio-ssh")
1579    } else {
1580      which <- c("ssh")
1581    }
1582  }
1583
1584  res <- list()
1585  for (name in which) {
1586    pathname <- switch(name,
1587      "ssh"         = find_ssh(),
1588      "putty-plink" = find_putty_plink(),
1589      "rstudio-ssh" = find_rstudio_ssh(),
1590      stopf("Unknown 'rshcmd' type: %s", sQuote(name))
1591    )
1592
1593    if (!is.null(pathname)) {
1594      if (first) return(pathname)
1595      res[[name]] <- pathname
1596    }
1597  }
1598
1599  if (length(res) > 0) return(res)
1600
1601  msg <- sprintf("Failed to locate a default SSH client (checked: %s). Please specify one via argument 'rshcmd'.", paste(sQuote(which), collapse = ", ")) #nolint
1602  if (must_work) stop(msg)
1603
1604  pathname <- "ssh"
1605  msg <- sprintf("%s Will still try with %s.", msg, sQuote(paste(pathname, collapse = " ")))
1606  warning(msg)
1607  pathname
1608}
1609
1610
1611#' @importFrom utils installed.packages
1612session_info <- function(pkgs = getOption2("parallelly.makeNodePSOCK.sessionInfo.pkgs", FALSE)) {
1613  libs <- .libPaths()
1614  info <- list(
1615    r = c(R.version, os.type = .Platform$OS.type),
1616    system = as.list(Sys.info()),
1617    libs = libs,
1618    pkgs = if (isTRUE(pkgs)) {
1619      structure(lapply(libs, FUN = function(lib.loc) {
1620        pkgs <- installed.packages(lib.loc = lib.loc)
1621        if (length(pkgs) == 0) return(NULL)
1622        paste0(pkgs[, "Package"], "_", pkgs[, "Version"])
1623      }), names = libs)
1624    },
1625    pwd = getwd(),
1626    process = list(pid = Sys.getpid())
1627  )
1628  info
1629}
1630
1631
1632#' @importFrom utils capture.output
1633#' @importFrom parallel clusterCall
1634add_cluster_session_info <- local({
1635  get_session_info <- session_info
1636  formals(get_session_info)$pkgs <- FALSE
1637  environment(get_session_info) <- getNamespace("utils")
1638
1639  function(cl) {
1640    stop_if_not(inherits(cl, "cluster"))
1641
1642    for (ii in seq_along(cl)) {
1643      node <- cl[[ii]]
1644      if (is.null(node)) next  ## Happens with dryrun = TRUE
1645
1646      ## Session information already collected?
1647      if (!is.null(node$session_info)) next
1648
1649      pkgs <- getOption2("parallelly.makeNodePSOCK.sessionInfo.pkgs", FALSE)
1650      node$session_info <- clusterCall(cl[ii], fun = get_session_info, pkgs = pkgs)[[1]]
1651
1652      ## Sanity check, iff possible
1653      if (inherits(node, "SOCK0node") || inherits(node, "SOCKnode")) {
1654        pid <- capture.output(print(node))
1655        pid <- as.integer(gsub(".* ", "", pid))
1656        stop_if_not(node$session_info$process$pid == pid)
1657      }
1658
1659      cl[[ii]] <- node
1660    }
1661
1662    cl
1663  }
1664}) ## add_cluster_session_info()
1665
1666
1667## Gets the Windows build version, e.g. '10.0.17134.523' (Windows 10 v1803)
1668## and '10.0.17763.253' (Windows 10 v1809).
1669windows_build_version <- local({
1670  if (.Platform$OS.type != "windows") return(function() NULL)
1671  function() {
1672    res <- shell("ver", intern = TRUE)
1673    if (length(res) == 0) return(NULL)
1674    res <- grep("Microsoft", res, value = TRUE)
1675    if (length(res) == 0) return(NULL)
1676    res <- gsub(".*Version ([0-9.]+).*", "\\1", res)
1677    tryCatch({
1678      numeric_version(res)
1679    }, error = function(ex) NULL)
1680  }
1681})
1682
1683
1684useWorkerPID <- local({
1685  parent_pid <- NULL
1686  .cache <- list()
1687
1688  makeResult <- function(rank, rscript_sh) {
1689    if (is.null(parent_pid)) parent_pid <<- Sys.getpid()
1690    pidfile <- tempfile(pattern = sprintf("worker.rank=%d.parallelly.parent=%d.",
1691                   rank, parent_pid), fileext = ".pid")
1692    pidfile <- normalizePath(pidfile, winslash = "/", mustWork = FALSE)
1693    pidcode <- sprintf('try(suppressWarnings(cat(Sys.getpid(),file="%s")), silent = TRUE)', pidfile)
1694    rscript_pid_args <- c("-e", shQuote(pidcode, type = rscript_sh))
1695    list(pidfile = pidfile, rscript_pid_args = rscript_pid_args)
1696  }
1697
1698  function(rscript, rank, rscript_sh, force = FALSE, verbose = FALSE) {
1699    autoKill <- getOption2("parallelly.makeNodePSOCK.autoKill", TRUE)
1700    if (!isTRUE(as.logical(autoKill))) return(list())
1701
1702    result <- makeResult(rank, rscript_sh = rscript_sh)
1703
1704    ## Already cached?
1705    key <- paste(rscript, collapse = "\t")
1706    if (!force && isTRUE(.cache[[key]])) return(result)
1707
1708    test_cmd <- paste(c(
1709      rscript,
1710      result$rscript_pid_args,
1711      "-e", shQuote(sprintf('file.exists("%s")', result$pidfile), type = rscript_sh)
1712    ), collapse = " ")
1713    if (verbose) {
1714      mdebugf("Testing if worker's PID can be inferred: %s", sQuote(test_cmd))
1715    }
1716
1717    input <- NULL
1718
1719    ## AD HOC: 'singularity exec ... Rscript' requires input="".  If not,
1720    ## they will be terminated because they try to read from non-existing
1721    ## standard input. /HB 2019-02-14
1722    if (any(grepl("singularity", rscript, ignore.case = TRUE))) input <- ""
1723
1724    res <- system(test_cmd, intern = TRUE, input = input)
1725    status <- attr(res, "status")
1726    suppressWarnings(file.remove(result$pidfile))
1727
1728    .cache[[key]] <<- (is.null(status) || status == 0L) && any(grepl("TRUE", res))
1729    if (verbose) mdebugf("- Possible to infer worker's PID: %s", .cache[[key]])
1730
1731    result
1732  }
1733})
1734
1735
1736readWorkerPID <- function(pidfile, wait = 0.5, maxTries = 8L, verbose = FALSE) {
1737  if (is.null(pidfile)) return(NULL)
1738
1739  if (verbose) mdebug("Attempting to infer PID for worker process ...")
1740  pid <- NULL
1741
1742  ## Wait for PID file
1743  tries <- 0L
1744  while (!file.exists(pidfile) && tries <= maxTries) {
1745    Sys.sleep(wait)
1746    tries <- tries + 1L
1747  }
1748
1749  if (file.exists(pidfile)) {
1750    pid0 <- NULL
1751    for (tries in 1:maxTries) {
1752      pid0 <- tryCatch(readLines(pidfile, warn = FALSE), error = identity)
1753      if (!inherits(pid0, "error")) break
1754      pid0 <- NULL
1755      Sys.sleep(wait)
1756    }
1757    file.remove(pidfile)
1758
1759    if (length(pid0) > 0L) {
1760      ## Use last one, if more than one ("should not happend")
1761      pid <- as.integer(pid0[length(pid0)])
1762      if (verbose) mdebugf(" - pid: %s", pid)
1763      if (is.na(pid)) {
1764        warnf("Worker PID is a non-integer: %s", pid0)
1765        pid <- NULL
1766      } else if (pid == Sys.getpid()) {
1767        warnf("Hmm... worker PID and parent PID are the same: %s", pid)
1768        pid <- NULL
1769      }
1770    }
1771  }
1772
1773  if (verbose) mdebug("Attempting to infer PID for worker process ... done")
1774
1775  pid
1776} ## readWorkerPID()
1777
1778
1779
1780#' @export
1781summary.RichSOCKnode <- function(object, ...) {
1782  res <- list(
1783    host      = NA_character_,
1784    r_version = NA_character_,
1785    platform  = NA_character_,
1786    pwd       = NA_character_,
1787    pid       = NA_integer_
1788  )
1789  host <- object[["host"]]
1790  if (!is.null(host)) res$host <- host
1791  session_info <- object[["session_info"]]
1792  if (!is.null(session_info)) {
1793    res$r_version <- session_info[["r"]][["version.string"]]
1794    res$platform <- session_info[["r"]][["platform"]]
1795    res$pwd <- session_info[["pwd"]]
1796    res$pid <- session_info[["process"]][["pid"]]
1797  }
1798  as.data.frame(res, stringsAsFactors = FALSE)
1799}
1800
1801#' @export
1802summary.RichSOCKcluster <- function(object, ...) {
1803  res <- lapply(object, FUN = function(node) {
1804    if (is.null(node)) return(summary.RichSOCKnode(node))
1805    summary(node)
1806  })
1807  res <- do.call(rbind, res)
1808  rownames(res) <- NULL
1809  res
1810}
1811
1812#' @export
1813print.RichSOCKcluster <- function (x, ...) {
1814  info <- summary(x)
1815  txt <- sprintf("host %s", sQuote(info[["host"]]))
1816  specs <- sprintf("(%s, platform %s)", info[["r_version"]], info[["platform"]])
1817  specs[is.na(info[["r_version"]])] <- "(R version and platform not queried)"
1818  txt <- paste(txt, specs, sep = " ")
1819  t <- table(txt)
1820  t <- t[order(t, decreasing = TRUE)]
1821  w <- ifelse(t == 1L, "node is", "nodes are")
1822  txt <- sprintf("%d %s on %s", t, w, names(t))
1823  txt <- paste(txt, collapse = ", ")
1824  txt <- sprintf("Socket cluster with %d nodes where %s", length(x), txt)
1825
1826  ## Report on autoStop?
1827  if (!is.null(attr(x, "gcMe"))) {
1828    txt <- sprintf("%s. This cluster is registered to be automatically stopped by the garbage collector", txt)
1829  }
1830
1831  cat(txt, "\n", sep = "")
1832  invisible(x)
1833}
1834