1#  File src/library/parallel/R/snow.R
2#  Part of the R package, https://www.R-project.org
3#
4#  Copyright (C) 1995-2020 The R Core Team
5#
6#  This program is free software; you can redistribute it and/or modify
7#  it under the terms of the GNU General Public License as published by
8#  the Free Software Foundation; either version 2 of the License, or
9#  (at your option) any later version.
10#
11#  This program is distributed in the hope that it will be useful,
12#  but WITHOUT ANY WARRANTY; without even the implied warranty of
13#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14#  GNU General Public License for more details.
15#
16#  A copy of the GNU General Public License is available at
17#  https://www.R-project.org/Licenses/
18
19## Derived from snow 0.3-6 by Luke Tierney
20
21.reg <-  new.env()
22assign("default", NULL, envir = .reg)
23
24defaultCluster <- function(cl = NULL)
25{
26    if(is.null(cl)) cl <- get("default", envir = .reg)
27    if(is.null(cl)) stop("no cluster 'cl' supplied and none is registered")
28    checkCluster(cl)
29    cl
30}
31
32setDefaultCluster <- function(cl = NULL)
33{
34    if(!is.null(cl)) checkCluster(cl)
35    assign("default", cl, envir = .reg)
36}
37
38getDefaultCluster <-
39function()
40    get("default", envir = .reg)
41
42#
43# Checking and subsetting
44#
45
46checkCluster <- function(cl)
47    if (!inherits(cl, "cluster")) stop("not a valid cluster");
48
49`[.cluster` <- function(cl, ...) {
50    v <- NextMethod()
51    class(v) <- class(cl)
52    v
53}
54
55
56#
57# Higher-Level Node Functions
58#
59
60closeNode <- function(node) UseMethod("closeNode")
61closeNode.default <- function(node) {}
62
63## These have SOCK methods
64sendData <- function(node, data) UseMethod("sendData")
65recvData <- function(node) UseMethod("recvData")
66recvOneData <- function(cl) UseMethod("recvOneData")
67
68postNode <- function(con, type, value = NULL, tag = NULL)
69    sendData(con, list(type = type, data = value, tag = tag))
70
71stopNode <- function(n) {
72    postNode(n, "DONE")
73    closeNode(n)
74}
75
76
77
78#
79#  Cluster Creation and Destruction
80#
81
82defaultClusterOptions <- NULL
83
84#**** check valid cluster option
85
86initDefaultClusterOptions <- function(libname)
87{
88    rscript <- file.path(R.home("bin"), "Rscript")
89    port <- Sys.getenv("R_PARALLEL_PORT")
90    port <- if (identical(port, "random")) NA else as.integer(port)
91    if (is.na(port)) {
92	seed <- .GlobalEnv$.Random.seed
93        ran1 <- sample.int(.Machine$integer.max - 1L, 1L) / .Machine$integer.max
94        port <- 11000 + 1000 * ((ran1 + unclass(Sys.time()) / 300) %% 1)
95	if(is.null(seed)) ## there was none, initially
96	    rm(    ".Random.seed",       envir = .GlobalEnv, inherits = FALSE)
97	else # reset
98	    assign(".Random.seed", seed, envir = .GlobalEnv, inherits = FALSE)
99    }
100    Sys.i <- Sys.info()
101    options <- list(port = as.integer(port),
102                    setup_timeout = 60 * 2,      # 2 minutes
103                    timeout = 60 * 60 * 24 * 30, # 30 days
104                    setup_strategy = "parallel",
105                    master = Sys.i[["nodename"]],
106                    homogeneous = TRUE,
107                    type = "PSOCK",
108                    outfile = "/dev/null",
109                    rscript = rscript,
110                    rscript_args = character(),
111                    user = Sys.i[["user"]],
112                    rshcmd = "ssh",
113                    manual = FALSE,
114                    methods = TRUE,
115                    renice = NA_integer_,
116                    ## rest are unused in parallel
117                    rhome = R.home(),
118                    rlibs = Sys.getenv("R_LIBS"),
119                    scriptdir = file.path(libname, "parallel"),
120                    rprog = file.path(R.home("bin"), "R"),
121                    snowlib = .libPaths()[1],
122                    useRscript = TRUE, # for use by snow clusters
123                    useXDR = TRUE)
124    defaultClusterOptions <<- addClusterOptions(emptyenv(), options)
125}
126
127addClusterOptions <- function(options, new) {
128    if (!is.null(new)) {
129        options <- new.env(parent = options)
130        names <- names(new)
131        for (i in seq_along(new))
132            assign(names[i], new[[i]], envir = options)
133    }
134    options
135}
136
137getClusterOption <- function(name, options = defaultClusterOptions)
138    get(name, envir = options)
139
140setDefaultClusterOptions <- function(...) {
141    list <- list(...)
142    names <- names(list)
143    for (i in seq_along(list))
144        assign(names[i], list[[i]], envir = defaultClusterOptions)
145}
146
147
148makeCluster <-
149    function (spec, type = getClusterOption("type"), ...)
150{
151    switch(type,
152           PSOCK = makePSOCKcluster(names = spec, ...),
153           FORK = makeForkCluster(nnodes = spec, ...),
154           SOCK = snow::makeSOCKcluster(names = spec, ...),
155           MPI = snow::makeMPIcluster(count = spec, ...),
156           NWS = snow::makeNWScluster(names = spec, ...),
157           stop("unknown cluster type"))
158}
159
160
161stopCluster <- function(cl = NULL)
162{
163    cl <- defaultCluster(cl)
164    if(identical(cl, get("default", envir = .reg)))
165        assign("default", NULL, envir = .reg)
166    UseMethod("stopCluster")
167}
168
169stopCluster.default <- function(cl) for (n in cl) stopNode(n)
170
171
172#
173# Cluster Functions
174#
175
176sendCall <- function (con, fun, args, return = TRUE, tag = NULL)
177{
178    timing <-  .snowTimingData$running()
179    if (timing)
180        start <- proc.time()[3L]
181    postNode(con, "EXEC",
182             list(fun = fun, args = args, return = return, tag = tag))
183    if (timing)
184        .snowTimingData$enterSend(con$rank, start, proc.time()[3L])
185    NULL
186}
187
188recvResult <- function(con)
189{
190    if (.snowTimingData$running()) {
191        start <- proc.time()[3L]
192        r <- recvData(con)
193        end <- proc.time()[3L]
194        .snowTimingData$enterRecv(con$rank, start, end, r$time[3L])
195    }
196    else r <- recvData(con)
197    r$value
198}
199
200checkForRemoteErrors <- function(val)
201{
202    count <- 0
203    firstmsg <- NULL
204    for (v in val) {
205        if (inherits(v, "try-error")) {
206            count <- count + 1
207            if (count == 1) firstmsg <- v
208        }
209    }
210    ## These will not translate
211    if (count == 1)
212        stop("one node produced an error: ", firstmsg, domain = NA)
213    else if (count > 1)
214        stop(count, " nodes produced errors; first error: ", firstmsg, domain = NA)
215    val
216}
217
218recvOneResult <- function (cl) {
219    if (.snowTimingData$running()) {
220        start <- proc.time()[3]
221        v <- recvOneData(cl)
222        end <- proc.time()[3]
223        .snowTimingData$enterRecv(v$node, start, end, v$value$time[3])
224    }
225    else v <- recvOneData(cl)
226    list(value = v$value$value, node = v$node, tag = v$value$tag)
227}
228
229findRecvOneTag <- function(cl, anytag) {
230    rtag <- NULL
231    for (node in cl) {
232        if (is.null(rtag))
233            rtag <- node$RECVTAG
234        else if (rtag != node$RECVTAG) {
235            rtag <- anytag
236            break;
237        }
238    }
239    rtag
240}
241
242### ========== snow support ===========
243
244## place holder for now.
245.snowTimingData <-
246    list(running = function() FALSE,
247         enterSend = function(...) {},
248         enterRecv = function(...) {})
249
250
251closeNode.NWSnode <- function(node) snow::closeNode.NWSnode(node)
252
253recvData.MPInode <- function(node) snow::recvData.MPInode(node)
254recvData.NWSnode <- function(node) snow::recvData.NWSnode(node)
255
256recvOneData.MPIcluster <- function(cl) snow::recvOneData.MPIcluster(cl)
257recvOneData.NWScluster <- function(cl) snow::recvOneData.NWScluster(cl)
258
259sendData.MPInode <- function(node, data) snow::sendData.MPInode(node, data)
260sendData.NWSnode <- function(node, data) snow::sendData.NWSnode(node, data)
261
262## these use NextMethod() so need copies.
263stopCluster.MPIcluster <- function(cl) {
264    NextMethod()
265    snow::setMPIcluster(NULL)
266}
267
268stopCluster.spawnedMPIcluster <- function(cl) {
269    comm <- 1
270    NextMethod()
271    Rmpi::mpi.comm.disconnect(comm)
272}
273
274stopCluster.NWScluster <- function(cl) {
275    NextMethod()
276    nws::nwsDeleteWs(cl[[1]]$wsServer, nws::nwsWsName(cl[[1]]$ws))
277    close(cl[[1]]$wsServer)
278}
279
280