# File src/library/parallel/R/snow.R # Part of the R package, https://www.R-project.org # # Copyright (C) 1995-2020 The R Core Team # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # A copy of the GNU General Public License is available at # https://www.R-project.org/Licenses/ ## Derived from snow 0.3-6 by Luke Tierney .reg <- new.env() assign("default", NULL, envir = .reg) defaultCluster <- function(cl = NULL) { if(is.null(cl)) cl <- get("default", envir = .reg) if(is.null(cl)) stop("no cluster 'cl' supplied and none is registered") checkCluster(cl) cl } setDefaultCluster <- function(cl = NULL) { if(!is.null(cl)) checkCluster(cl) assign("default", cl, envir = .reg) } getDefaultCluster <- function() get("default", envir = .reg) # # Checking and subsetting # checkCluster <- function(cl) if (!inherits(cl, "cluster")) stop("not a valid cluster"); `[.cluster` <- function(cl, ...) { v <- NextMethod() class(v) <- class(cl) v } # # Higher-Level Node Functions # closeNode <- function(node) UseMethod("closeNode") closeNode.default <- function(node) {} ## These have SOCK methods sendData <- function(node, data) UseMethod("sendData") recvData <- function(node) UseMethod("recvData") recvOneData <- function(cl) UseMethod("recvOneData") postNode <- function(con, type, value = NULL, tag = NULL) sendData(con, list(type = type, data = value, tag = tag)) stopNode <- function(n) { postNode(n, "DONE") closeNode(n) } # # Cluster Creation and Destruction # defaultClusterOptions <- NULL #**** check valid cluster option initDefaultClusterOptions <- function(libname) { rscript <- file.path(R.home("bin"), "Rscript") port <- Sys.getenv("R_PARALLEL_PORT") port <- if (identical(port, "random")) NA else as.integer(port) if (is.na(port)) { seed <- .GlobalEnv$.Random.seed ran1 <- sample.int(.Machine$integer.max - 1L, 1L) / .Machine$integer.max port <- 11000 + 1000 * ((ran1 + unclass(Sys.time()) / 300) %% 1) if(is.null(seed)) ## there was none, initially rm( ".Random.seed", envir = .GlobalEnv, inherits = FALSE) else # reset assign(".Random.seed", seed, envir = .GlobalEnv, inherits = FALSE) } Sys.i <- Sys.info() options <- list(port = as.integer(port), setup_timeout = 60 * 2, # 2 minutes timeout = 60 * 60 * 24 * 30, # 30 days setup_strategy = "parallel", master = Sys.i[["nodename"]], homogeneous = TRUE, type = "PSOCK", outfile = "/dev/null", rscript = rscript, rscript_args = character(), user = Sys.i[["user"]], rshcmd = "ssh", manual = FALSE, methods = TRUE, renice = NA_integer_, ## rest are unused in parallel rhome = R.home(), rlibs = Sys.getenv("R_LIBS"), scriptdir = file.path(libname, "parallel"), rprog = file.path(R.home("bin"), "R"), snowlib = .libPaths()[1], useRscript = TRUE, # for use by snow clusters useXDR = TRUE) defaultClusterOptions <<- addClusterOptions(emptyenv(), options) } addClusterOptions <- function(options, new) { if (!is.null(new)) { options <- new.env(parent = options) names <- names(new) for (i in seq_along(new)) assign(names[i], new[[i]], envir = options) } options } getClusterOption <- function(name, options = defaultClusterOptions) get(name, envir = options) setDefaultClusterOptions <- function(...) { list <- list(...) names <- names(list) for (i in seq_along(list)) assign(names[i], list[[i]], envir = defaultClusterOptions) } makeCluster <- function (spec, type = getClusterOption("type"), ...) { switch(type, PSOCK = makePSOCKcluster(names = spec, ...), FORK = makeForkCluster(nnodes = spec, ...), SOCK = snow::makeSOCKcluster(names = spec, ...), MPI = snow::makeMPIcluster(count = spec, ...), NWS = snow::makeNWScluster(names = spec, ...), stop("unknown cluster type")) } stopCluster <- function(cl = NULL) { cl <- defaultCluster(cl) if(identical(cl, get("default", envir = .reg))) assign("default", NULL, envir = .reg) UseMethod("stopCluster") } stopCluster.default <- function(cl) for (n in cl) stopNode(n) # # Cluster Functions # sendCall <- function (con, fun, args, return = TRUE, tag = NULL) { timing <- .snowTimingData$running() if (timing) start <- proc.time()[3L] postNode(con, "EXEC", list(fun = fun, args = args, return = return, tag = tag)) if (timing) .snowTimingData$enterSend(con$rank, start, proc.time()[3L]) NULL } recvResult <- function(con) { if (.snowTimingData$running()) { start <- proc.time()[3L] r <- recvData(con) end <- proc.time()[3L] .snowTimingData$enterRecv(con$rank, start, end, r$time[3L]) } else r <- recvData(con) r$value } checkForRemoteErrors <- function(val) { count <- 0 firstmsg <- NULL for (v in val) { if (inherits(v, "try-error")) { count <- count + 1 if (count == 1) firstmsg <- v } } ## These will not translate if (count == 1) stop("one node produced an error: ", firstmsg, domain = NA) else if (count > 1) stop(count, " nodes produced errors; first error: ", firstmsg, domain = NA) val } recvOneResult <- function (cl) { if (.snowTimingData$running()) { start <- proc.time()[3] v <- recvOneData(cl) end <- proc.time()[3] .snowTimingData$enterRecv(v$node, start, end, v$value$time[3]) } else v <- recvOneData(cl) list(value = v$value$value, node = v$node, tag = v$value$tag) } findRecvOneTag <- function(cl, anytag) { rtag <- NULL for (node in cl) { if (is.null(rtag)) rtag <- node$RECVTAG else if (rtag != node$RECVTAG) { rtag <- anytag break; } } rtag } ### ========== snow support =========== ## place holder for now. .snowTimingData <- list(running = function() FALSE, enterSend = function(...) {}, enterRecv = function(...) {}) closeNode.NWSnode <- function(node) snow::closeNode.NWSnode(node) recvData.MPInode <- function(node) snow::recvData.MPInode(node) recvData.NWSnode <- function(node) snow::recvData.NWSnode(node) recvOneData.MPIcluster <- function(cl) snow::recvOneData.MPIcluster(cl) recvOneData.NWScluster <- function(cl) snow::recvOneData.NWScluster(cl) sendData.MPInode <- function(node, data) snow::sendData.MPInode(node, data) sendData.NWSnode <- function(node, data) snow::sendData.NWSnode(node, data) ## these use NextMethod() so need copies. stopCluster.MPIcluster <- function(cl) { NextMethod() snow::setMPIcluster(NULL) } stopCluster.spawnedMPIcluster <- function(cl) { comm <- 1 NextMethod() Rmpi::mpi.comm.disconnect(comm) } stopCluster.NWScluster <- function(cl) { NextMethod() nws::nwsDeleteWs(cl[[1]]$wsServer, nws::nwsWsName(cl[[1]]$ws)) close(cl[[1]]$wsServer) }