1# File src/library/parallel/R/snow.R 2# Part of the R package, https://www.R-project.org 3# 4# Copyright (C) 1995-2020 The R Core Team 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# A copy of the GNU General Public License is available at 17# https://www.R-project.org/Licenses/ 18 19## Derived from snow 0.3-6 by Luke Tierney 20 21.reg <- new.env() 22assign("default", NULL, envir = .reg) 23 24defaultCluster <- function(cl = NULL) 25{ 26 if(is.null(cl)) cl <- get("default", envir = .reg) 27 if(is.null(cl)) stop("no cluster 'cl' supplied and none is registered") 28 checkCluster(cl) 29 cl 30} 31 32setDefaultCluster <- function(cl = NULL) 33{ 34 if(!is.null(cl)) checkCluster(cl) 35 assign("default", cl, envir = .reg) 36} 37 38getDefaultCluster <- 39function() 40 get("default", envir = .reg) 41 42# 43# Checking and subsetting 44# 45 46checkCluster <- function(cl) 47 if (!inherits(cl, "cluster")) stop("not a valid cluster"); 48 49`[.cluster` <- function(cl, ...) { 50 v <- NextMethod() 51 class(v) <- class(cl) 52 v 53} 54 55 56# 57# Higher-Level Node Functions 58# 59 60closeNode <- function(node) UseMethod("closeNode") 61closeNode.default <- function(node) {} 62 63## These have SOCK methods 64sendData <- function(node, data) UseMethod("sendData") 65recvData <- function(node) UseMethod("recvData") 66recvOneData <- function(cl) UseMethod("recvOneData") 67 68postNode <- function(con, type, value = NULL, tag = NULL) 69 sendData(con, list(type = type, data = value, tag = tag)) 70 71stopNode <- function(n) { 72 postNode(n, "DONE") 73 closeNode(n) 74} 75 76 77 78# 79# Cluster Creation and Destruction 80# 81 82defaultClusterOptions <- NULL 83 84#**** check valid cluster option 85 86initDefaultClusterOptions <- function(libname) 87{ 88 rscript <- file.path(R.home("bin"), "Rscript") 89 port <- Sys.getenv("R_PARALLEL_PORT") 90 port <- if (identical(port, "random")) NA else as.integer(port) 91 if (is.na(port)) { 92 seed <- .GlobalEnv$.Random.seed 93 ran1 <- sample.int(.Machine$integer.max - 1L, 1L) / .Machine$integer.max 94 port <- 11000 + 1000 * ((ran1 + unclass(Sys.time()) / 300) %% 1) 95 if(is.null(seed)) ## there was none, initially 96 rm( ".Random.seed", envir = .GlobalEnv, inherits = FALSE) 97 else # reset 98 assign(".Random.seed", seed, envir = .GlobalEnv, inherits = FALSE) 99 } 100 Sys.i <- Sys.info() 101 options <- list(port = as.integer(port), 102 setup_timeout = 60 * 2, # 2 minutes 103 timeout = 60 * 60 * 24 * 30, # 30 days 104 setup_strategy = "parallel", 105 master = Sys.i[["nodename"]], 106 homogeneous = TRUE, 107 type = "PSOCK", 108 outfile = "/dev/null", 109 rscript = rscript, 110 rscript_args = character(), 111 user = Sys.i[["user"]], 112 rshcmd = "ssh", 113 manual = FALSE, 114 methods = TRUE, 115 renice = NA_integer_, 116 ## rest are unused in parallel 117 rhome = R.home(), 118 rlibs = Sys.getenv("R_LIBS"), 119 scriptdir = file.path(libname, "parallel"), 120 rprog = file.path(R.home("bin"), "R"), 121 snowlib = .libPaths()[1], 122 useRscript = TRUE, # for use by snow clusters 123 useXDR = TRUE) 124 defaultClusterOptions <<- addClusterOptions(emptyenv(), options) 125} 126 127addClusterOptions <- function(options, new) { 128 if (!is.null(new)) { 129 options <- new.env(parent = options) 130 names <- names(new) 131 for (i in seq_along(new)) 132 assign(names[i], new[[i]], envir = options) 133 } 134 options 135} 136 137getClusterOption <- function(name, options = defaultClusterOptions) 138 get(name, envir = options) 139 140setDefaultClusterOptions <- function(...) { 141 list <- list(...) 142 names <- names(list) 143 for (i in seq_along(list)) 144 assign(names[i], list[[i]], envir = defaultClusterOptions) 145} 146 147 148makeCluster <- 149 function (spec, type = getClusterOption("type"), ...) 150{ 151 switch(type, 152 PSOCK = makePSOCKcluster(names = spec, ...), 153 FORK = makeForkCluster(nnodes = spec, ...), 154 SOCK = snow::makeSOCKcluster(names = spec, ...), 155 MPI = snow::makeMPIcluster(count = spec, ...), 156 NWS = snow::makeNWScluster(names = spec, ...), 157 stop("unknown cluster type")) 158} 159 160 161stopCluster <- function(cl = NULL) 162{ 163 cl <- defaultCluster(cl) 164 if(identical(cl, get("default", envir = .reg))) 165 assign("default", NULL, envir = .reg) 166 UseMethod("stopCluster") 167} 168 169stopCluster.default <- function(cl) for (n in cl) stopNode(n) 170 171 172# 173# Cluster Functions 174# 175 176sendCall <- function (con, fun, args, return = TRUE, tag = NULL) 177{ 178 timing <- .snowTimingData$running() 179 if (timing) 180 start <- proc.time()[3L] 181 postNode(con, "EXEC", 182 list(fun = fun, args = args, return = return, tag = tag)) 183 if (timing) 184 .snowTimingData$enterSend(con$rank, start, proc.time()[3L]) 185 NULL 186} 187 188recvResult <- function(con) 189{ 190 if (.snowTimingData$running()) { 191 start <- proc.time()[3L] 192 r <- recvData(con) 193 end <- proc.time()[3L] 194 .snowTimingData$enterRecv(con$rank, start, end, r$time[3L]) 195 } 196 else r <- recvData(con) 197 r$value 198} 199 200checkForRemoteErrors <- function(val) 201{ 202 count <- 0 203 firstmsg <- NULL 204 for (v in val) { 205 if (inherits(v, "try-error")) { 206 count <- count + 1 207 if (count == 1) firstmsg <- v 208 } 209 } 210 ## These will not translate 211 if (count == 1) 212 stop("one node produced an error: ", firstmsg, domain = NA) 213 else if (count > 1) 214 stop(count, " nodes produced errors; first error: ", firstmsg, domain = NA) 215 val 216} 217 218recvOneResult <- function (cl) { 219 if (.snowTimingData$running()) { 220 start <- proc.time()[3] 221 v <- recvOneData(cl) 222 end <- proc.time()[3] 223 .snowTimingData$enterRecv(v$node, start, end, v$value$time[3]) 224 } 225 else v <- recvOneData(cl) 226 list(value = v$value$value, node = v$node, tag = v$value$tag) 227} 228 229findRecvOneTag <- function(cl, anytag) { 230 rtag <- NULL 231 for (node in cl) { 232 if (is.null(rtag)) 233 rtag <- node$RECVTAG 234 else if (rtag != node$RECVTAG) { 235 rtag <- anytag 236 break; 237 } 238 } 239 rtag 240} 241 242### ========== snow support =========== 243 244## place holder for now. 245.snowTimingData <- 246 list(running = function() FALSE, 247 enterSend = function(...) {}, 248 enterRecv = function(...) {}) 249 250 251closeNode.NWSnode <- function(node) snow::closeNode.NWSnode(node) 252 253recvData.MPInode <- function(node) snow::recvData.MPInode(node) 254recvData.NWSnode <- function(node) snow::recvData.NWSnode(node) 255 256recvOneData.MPIcluster <- function(cl) snow::recvOneData.MPIcluster(cl) 257recvOneData.NWScluster <- function(cl) snow::recvOneData.NWScluster(cl) 258 259sendData.MPInode <- function(node, data) snow::sendData.MPInode(node, data) 260sendData.NWSnode <- function(node, data) snow::sendData.NWSnode(node, data) 261 262## these use NextMethod() so need copies. 263stopCluster.MPIcluster <- function(cl) { 264 NextMethod() 265 snow::setMPIcluster(NULL) 266} 267 268stopCluster.spawnedMPIcluster <- function(cl) { 269 comm <- 1 270 NextMethod() 271 Rmpi::mpi.comm.disconnect(comm) 272} 273 274stopCluster.NWScluster <- function(cl) { 275 NextMethod() 276 nws::nwsDeleteWs(cl[[1]]$wsServer, nws::nwsWsName(cl[[1]]$ws)) 277 close(cl[[1]]$wsServer) 278} 279 280