1 2# +-----------------------------+ +-------------------------------+ 3# | Main R process | | Subprocess 1 | 4# | +------------------------+ | | +---------------------------+ | 5# | | test_dir_parallel() | | | | test_file() | | 6# | | +-------------------+ | | | | +-----------------------+ | | 7# | | | Event loop |< ------+ | | | SubprocessReporter | | | 8# | | +-------------------+ | | | | | | +-------------------+ | | | 9# | | | | | | | | | | test_that() | | | | 10# | | v | | | | | | +-------------------+ | | | 11# | | +-------------------+ | | | | | | | | | | 12# | | | Progress2Reporter | | | | | | | v | | | 13# | | +-------------------+ | | | | | | +-------------------+ | | | 14# | +------------------------+ | |--------| signalCondition() | | | | 15# +-----------------------------+ | | | | +-------------------+ | | | 16# | | | +-----------------------+ | | 17# | | +---------------------------+ | 18# | +-------------------------------+ 19# | +-------------------------------+ 20# |--| Subprocess 2 | 21# | +-------------------------------+ 22# | +-------------------------------+ 23# +--| Subprocess 3 | 24# +-------------------------------+ 25# ... 26# 27# ## Notes 28# 29# * Subprocesses run `callr::r_session` R sessions. They are re-used, 30# one R session can be used for several test_file() calls. 31# * Helper and setup files are loaded in the subprocesses after this. 32# * The main process puts all test files in the task queue, and then 33# runs an event loop. 34 35test_files_parallel <- function( 36 test_dir, 37 test_package, 38 test_paths, 39 load_helpers = TRUE, 40 reporter = default_parallel_reporter(), 41 env = NULL, 42 stop_on_failure = FALSE, 43 stop_on_warning = FALSE, 44 wrap = TRUE, # unused, to match test_files signature 45 load_package = c("none", "installed", "source") 46 ) { 47 48 49 reporters <- test_files_reporter(reporter) 50 51 # TODO: support timeouts. 20-30s for each file by default? 52 53 num_workers <- min(default_num_cpus(), length(test_paths)) 54 inform(paste0( 55 "Starting ", num_workers, " test process", 56 if (num_workers != 1) "es" 57 )) 58 59 # Set up work queue ------------------------------------------ 60 queue <- NULL 61 withr::defer(queue_teardown(queue)) 62 63 # Start workers in parallel and add test tasks to queue. 64 queue <- queue_setup( 65 test_paths = test_paths, 66 test_package = test_package, 67 test_dir = test_dir, 68 load_helpers = load_helpers, 69 num_workers = num_workers, 70 load_package = load_package 71 ) 72 73 with_reporter(reporters$multi, { 74 parallel_updates <- reporter$capabilities$parallel_updates 75 if (parallel_updates) { 76 parallel_event_loop_smooth(queue, reporters) 77 } else { 78 parallel_event_loop_chunky(queue, reporters) 79 } 80 }) 81 82 test_files_check(reporters$list$get_results(), 83 stop_on_failure = stop_on_failure, 84 stop_on_warning = stop_on_warning 85 ) 86} 87 88default_num_cpus <- function() { 89 # Use common option, if set 90 ncpus <- getOption("Ncpus", NULL) 91 if (!is.null(ncpus)) { 92 ncpus <- suppressWarnings(as.integer(ncpus)) 93 if (is.na(ncpus)) abort("`getOption(Ncpus)` must be an integer") 94 return(ncpus) 95 } 96 97 # Otherwise use env var if set 98 ncpus <- Sys.getenv("TESTTHAT_CPUS", "") 99 if (ncpus != "") { 100 ncpus <- suppressWarnings(as.integer(ncpus)) 101 if (is.na(ncpus)) abort("TESTTHAT_CPUS must be an integer") 102 return(ncpus) 103 } 104 105 # Otherwise 2 106 2L 107} 108 109parallel_event_loop_smooth <- function(queue, reporters) { 110 update_interval <- 0.1 111 next_update <- proc.time()[[3]] + update_interval 112 113 while (!queue$is_idle()) { 114 # How much time do we have to poll before the next UI update? 115 now <- proc.time()[[3]] 116 poll_time <- max(next_update - now, 0) 117 next_update <- now + update_interval 118 119 msgs <- queue$poll(poll_time) 120 121 updated <- FALSE 122 for (x in msgs) { 123 if (x$code != PROCESS_MSG) { 124 next 125 } 126 127 m <- x$message 128 if (!inherits(m, "testthat_message")) { 129 message(m) 130 next 131 } 132 133 if (m$cmd != "DONE") { 134 reporters$multi$start_file(m$filename) 135 do.call(reporters$multi[[m$cmd]], m$args) 136 updated <- TRUE 137 } 138 } 139 140 # We need to spin, even if there were no events 141 if (!updated) reporters$multi$update() 142 } 143} 144 145parallel_event_loop_chunky <- function(queue, reporters) { 146 files <- list() 147 while (!queue$is_idle()) { 148 msgs <- queue$poll(Inf) 149 for (x in msgs) { 150 if (x$code != PROCESS_MSG) { 151 next 152 } 153 154 m <- x$message 155 if (!inherits(m, "testthat_message")) { 156 message(m) 157 next 158 } 159 160 # Record all events until we get end of file, then we replay them all 161 # with the local reporters. This prevents out of order reporting. 162 if (m$cmd != "DONE") { 163 files[[m$filename]] <- append(files[[m$filename]], list(m)) 164 } else { 165 replay_events(reporters$multi, files[[m$filename]]) 166 reporters$multi$end_context_if_started() 167 files[[m$filename]] <- NULL 168 } 169 } 170 } 171} 172 173replay_events <- function(reporter, events) { 174 for (event in events) { 175 do.call(reporter[[event$cmd]], event$args) 176 } 177} 178 179queue_setup <- function(test_paths, 180 test_package, 181 test_dir, 182 num_workers, 183 load_helpers, 184 load_package) { 185 186 # TODO: observe `load_package`, but the "none" default is not 187 # OK for the subprocess, because it'll not have the tested package 188 if (load_package == "none") load_package <- "source" 189 190 # TODO: similarly, load_helpers = FALSE, coming from devtools, 191 # is not appropriate in the subprocess 192 load_helpers <- TRUE 193 194 test_package <- test_package %||% Sys.getenv("TESTTHAT_PKG") 195 196 # First we load the package "manually", in case it is testthat itself 197 load_hook <- expr({ 198 switch(!!load_package, 199 installed = library(!!test_package, character.only = TRUE), 200 source = pkgload::load_all(!!test_dir, helpers = FALSE, quiet = TRUE) 201 ) 202 asNamespace("testthat")$queue_process_setup( 203 test_package = !!test_package, 204 test_dir = !!test_dir, 205 load_helpers = !!load_helpers, 206 load_package = "none" 207 ) 208 }) 209 queue <- task_q$new(concurrency = num_workers, load_hook = load_hook) 210 211 fun <- transport_fun(function(path) asNamespace("testthat")$queue_task(path)) 212 for (path in test_paths) { 213 queue$push(fun, list(path)) 214 } 215 216 queue 217} 218 219queue_process_setup <- function(test_package, test_dir, load_helpers, load_package) { 220 env <- asNamespace("testthat")$test_files_setup_env( 221 test_package, 222 test_dir, 223 load_package 224 ) 225 asNamespace("testthat")$test_files_setup_state( 226 test_dir = test_dir, 227 test_package = test_package, 228 load_helpers = load_helpers, 229 env = env, 230 .env = .GlobalEnv 231 ) 232 233 # Save test environment in global env where it can easily be retrieved 234 .GlobalEnv$.test_env <- env 235} 236 237queue_task <- function(path) { 238 env <- .GlobalEnv$.test_env 239 240 withr::local_envvar("TESTTHAT_IS_PARALLEL" = "true") 241 reporters <- test_files_reporter(SubprocessReporter$new()) 242 with_reporter(reporters$multi, test_one_file(path, env = env)) 243 NULL 244} 245 246# Clean up subprocesses: we call teardown methods, but we only give them a 247# second, before killing the whole process tree using ps's env var marker 248# method. 249queue_teardown <- function(queue) { 250 if (is.null(queue)) { 251 return() 252 } 253 254 tasks <- queue$list_tasks() 255 num <- nrow(tasks) 256 257 clean_fn <- function() { 258 withr::deferred_run(.GlobalEnv) 259 quit(save = "no", status = 1L, runLast = TRUE) 260 } 261 262 topoll <- list() 263 for (i in seq_len(num)) { 264 if (!is.null(tasks$worker[[i]])) { 265 # The worker might have crashed or exited, so this might fail. 266 # If it does then we'll just ignore that worker 267 tryCatch({ 268 tasks$worker[[i]]$call(clean_fn) 269 topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection()) 270 }, error = function(e) tasks$worker[i] <- list(NULL)) 271 } 272 } 273 274 # Give covr time to write out the coverage files 275 if (in_covr()) grace <- 30L else grace <- 3L 276 limit <- Sys.time() + grace 277 while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) { 278 timeout <- as.double(timeout, units = "secs") * 1000 279 pr <- processx::poll(topoll, as.integer(timeout)) 280 topoll <- topoll[pr != "ready"] 281 } 282 283 for (i in seq_len(num)) { 284 if (!is.null(tasks$worker[[i]])) { 285 tryCatch( 286 close(tasks$worker[[i]]$get_input_connection()), 287 error = function(e) NULL 288 ) 289 if (ps::ps_is_supported()) { 290 tasks$worker[[i]]$kill_tree() 291 } else { 292 tasks$worker[[i]]$kill() 293 } 294 } 295 } 296} 297 298# Reporter that just forwards events in the subprocess back to the main process 299# 300# Ideally, these messages would be throttled, i.e. if the test code 301# emits many expectation conditions fast, SubprocessReporter should 302# collect several of them and only emit a condition a couple of times 303# a second. End-of-test and end-of-file events would be transmitted 304# immediately. 305SubprocessReporter <- R6::R6Class("SubprocessReporter", 306 inherit = Reporter, 307 public = list( 308 start_file = function(filename) { 309 private$filename <- filename 310 private$event("start_file", filename) 311 }, 312 start_test = function(context, test) { 313 private$event("start_test", context, test) 314 }, 315 start_context = function(context) { 316 private$event("start_context", context) 317 }, 318 add_result = function(context, test, result) { 319 if (inherits(result, "expectation_success")) { 320 # Strip bulky components to reduce data transfer cost 321 result[["srcref"]] <- NULL 322 result[["trace"]] <- NULL 323 } 324 private$event("add_result", context, test, result) 325 }, 326 end_test = function(context, test) { 327 private$event("end_test", context, test) 328 }, 329 end_context = function(context) { 330 private$event("end_context", context) 331 }, 332 end_file = function() { 333 private$event("end_file") 334 }, 335 end_reporter = function() { 336 private$event("DONE") 337 } 338 ), 339 340 private = list( 341 filename = NULL, 342 event = function(cmd, ...) { 343 msg <- list( 344 code = PROCESS_MSG, 345 cmd = cmd, 346 filename = private$filename, 347 time = proc.time()[[3]], 348 args = list(...) 349 ) 350 class(msg) <- c("testthat_message", "callr_message", "condition") 351 signalCondition(msg) 352 } 353 ) 354) 355