1## Load namespace already here; it'll set some options based on
2## environment variables
3loadNamespace("future")
4
5## Record original state
6ovars <- ls()
7oenvs <- oenvs0 <- Sys.getenv()
8oopts0 <- options()
9
10covr_testing <- ("covr" %in% loadedNamespaces())
11on_solaris <- grepl("^solaris", R.version$os)
12on_macos <- grepl("^darwin", R.version$os)
13on_githubactions <- as.logical(Sys.getenv("GITHUB_ACTIONS", "FALSE"))
14
15## Default options
16oopts <- options(
17  warn = 1L,
18  mc.cores = 2L,
19  future.debug = TRUE,
20  ## Reset the following during testing in case
21  ## they are set on the test system
22  future.availableCores.system = NULL,
23  future.availableCores.fallback = NULL
24)
25
26## Comment: The below should be set automatically whenever the future package
27## is loaded and 'R CMD check' runs.  The below is added in case R is changed
28## in the future and we fail to detect 'R CMD check'.
29Sys.setenv(R_PARALLELLY_MAKENODEPSOCK_CONNECTTIMEOUT = 2 * 60)
30Sys.setenv(R_PARALLELLY_MAKENODEPSOCK_TIMEOUT = 2 * 60)
31Sys.setenv(R_PARALLELLY_MAKENODEPSOCK_SESSIONINFO_PKGS = TRUE)
32Sys.setenv(R_FUTURE_WAIT_INTERVAL = 0.01) ## 0.01s (instead of default 0.2s)
33
34## Label PSOCK cluster workers (to help troubleshooting)
35test_script <- grep("[.]R$", commandArgs(), value = TRUE)[1]
36if (is.na(test_script)) test_script <- "UNKNOWN"
37worker_label <- sprintf("future/tests/%s:%s:%s:%s", test_script, Sys.info()[["nodename"]], Sys.info()[["user"]], Sys.getpid())
38Sys.setenv(R_PARALLELLY_MAKENODEPSOCK_RSCRIPT_LABEL = worker_label)
39
40## Reset the following during testing in case
41## they are set on the test system
42oenvs2 <- Sys.unsetenv(c(
43  "R_PARALLELLY_AVAILABLECORES_SYSTEM",
44  "R_PARALLELLY_AVAILABLECORES_FALLBACK",
45  ## SGE
46  "NSLOTS", "PE_HOSTFILE",
47  ## Slurm
48  "SLURM_CPUS_PER_TASK",
49  ## TORQUE / PBS
50  "NCPUS", "PBS_NUM_PPN", "PBS_NODEFILE", "PBS_NP", "PBS_NUM_NODES"
51))
52
53oplan <- future::plan()
54
55## Use eager futures by default
56future::plan("sequential")
57
58fullTest <- (Sys.getenv("_R_CHECK_FULL_") != "")
59isWin32 <- (.Platform$OS.type == "windows" && .Platform$r_arch == "i386")
60
61## Private future functions
62.onLoad <- future:::.onLoad
63.onAttach <- future:::.onAttach
64asIEC <- future:::asIEC
65ClusterRegistry <- future:::ClusterRegistry
66constant <- future:::constant
67FutureRegistry <- future:::FutureRegistry
68gassign <- future:::gassign
69get_future <- future:::get_future
70geval <- future:::geval
71grmall <- future:::grmall
72hpaste <- future:::hpaste
73inRCmdCheck <- future:::inRCmdCheck
74importParallel <- future:::importParallel
75mdebug <- future:::mdebug
76mdebugf <- future:::mdebugf
77myExternalIP <- future:::myExternalIP
78myInternalIP <- future:::myInternalIP
79parseCmdArgs <- future:::parseCmdArgs
80requestCore <- future:::requestCore
81requestNode <- future:::requestNode
82requirePackages <- future:::requirePackages
83tweakExpression <- future:::tweakExpression
84whichIndex <- future:::whichIndex
85pid_exists <- future:::pid_exists
86isFALSE <- future:::isFALSE
87isNA <- future:::isNA
88supports_omp_threads <- future:::supports_omp_threads
89
90## Local functions for test scripts
91printf <- function(...) cat(sprintf(...))
92mstr <- function(...) message(paste(capture.output(str(...)), collapse = "\n"))
93attachLocally <- function(x, envir = parent.frame()) {
94  for (name in names(x)) {
95    assign(name, value = x[[name]], envir = envir)
96  }
97}
98
99supportedStrategies <- function(cores = NA_integer_, excl = "cluster", ...) {
100  strategies <- future:::supportedStrategies(...)
101  strategies <- setdiff(strategies, excl)
102
103  if (!is.na(cores)) {
104    if (cores == 1L) {
105      strategies <- setdiff(strategies, c("multicore", "multisession"))
106    } else if (cores > 1L) {
107      strategies <- setdiff(strategies,
108                            c("sequential", "uniprocess", "eager", "lazy"))
109    }
110  }
111
112  ## Don't test deprecated 'multiprocess'
113  strategies <- setdiff(strategies, "multiprocess")
114
115  strategies
116}
117
118availCores <- min(2L, future::availableCores())
119
120
121## WORKAROUND: capture.output() gained argument 'split' in R 3.3.0
122if (getRversion() >= "3.3.0") {
123  capture.output <- utils::capture.output
124} else {
125  capture.output <- function(..., split = FALSE) utils::capture.output(...)
126}
127
128recordConditions <- function(expr, ..., parse = TRUE) {
129  conditions <- list()
130  withCallingHandlers(expr, condition = function(c) {
131    attr(c, "received") <- Sys.time()
132    conditions[[length(conditions) + 1L]] <<- c
133  })
134  conditions
135}
136
137recordRelay <- function(...) {
138  stdout <- capture.output(conditions <- recordConditions(...), split = TRUE)
139  if (length(stdout) > 0) stdout <- paste0(stdout, "\n")
140  msgs <- sapply(conditions, FUN = conditionMessage)
141  list(stdout = stdout, msgs = msgs)
142}
143