1
2# +-----------------------------+     +-------------------------------+
3# | Main R process              |     | Subprocess 1                  |
4# | +------------------------+  |     | +---------------------------+ |
5# | | test_dir_parallel()    |  |     | | test_file()               | |
6# | | +-------------------+  |  |     | | +-----------------------+ | |
7# | | | Event loop        |< ------+  | | | SubprocessReporter    | | |
8# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
9# | |    |                   |  |  |  | | | | test_that()       | | | |
10# | |    v                   |  |  |  | | | +-------------------+ | | |
11# | | +-------------------+  |  |  |  | | |    |                  | | |
12# | | | Progress2Reporter |  |  |  |  | | |    v                  | | |
13# | | +-------------------+  |  |  |  | | | +-------------------+ | | |
14# | +------------------------+  |  |--------| signalCondition() | | | |
15# +-----------------------------+  |  | | | +-------------------+ | | |
16#                                  |  | | +-----------------------+ | |
17#                                  |  | +---------------------------+ |
18#                                  |  +-------------------------------+
19#                                  |  +-------------------------------+
20#                                  |--| Subprocess 2                  |
21#                                  |  +-------------------------------+
22#                                  |  +-------------------------------+
23#                                  +--| Subprocess 3                  |
24#                                     +-------------------------------+
25#                                       ...
26#
27# ## Notes
28#
29# * Subprocesses run `callr::r_session` R sessions. They are re-used,
30#   one R session can be used for several test_file() calls.
31# * Helper and setup files are loaded in the subprocesses after this.
32# * The main process puts all test files in the task queue, and then
33#   runs an event loop.
34
35test_files_parallel <- function(
36                       test_dir,
37                       test_package,
38                       test_paths,
39                       load_helpers = TRUE,
40                       reporter = default_parallel_reporter(),
41                       env = NULL,
42                       stop_on_failure = FALSE,
43                       stop_on_warning = FALSE,
44                       wrap = TRUE,  # unused, to match test_files signature
45                       load_package = c("none", "installed", "source")
46                       ) {
47
48
49  reporters <- test_files_reporter(reporter)
50
51  # TODO: support timeouts. 20-30s for each file by default?
52
53  num_workers <- min(default_num_cpus(), length(test_paths))
54  inform(paste0(
55    "Starting ", num_workers, " test process",
56    if (num_workers != 1) "es"
57  ))
58
59  # Set up work queue ------------------------------------------
60  queue <- NULL
61  withr::defer(queue_teardown(queue))
62
63  # Start workers in parallel and add test tasks to queue.
64  queue <- queue_setup(
65    test_paths = test_paths,
66    test_package = test_package,
67    test_dir = test_dir,
68    load_helpers = load_helpers,
69    num_workers = num_workers,
70    load_package = load_package
71  )
72
73  with_reporter(reporters$multi, {
74    parallel_updates <- reporter$capabilities$parallel_updates
75    if (parallel_updates) {
76      parallel_event_loop_smooth(queue, reporters)
77    } else {
78      parallel_event_loop_chunky(queue, reporters)
79    }
80  })
81
82  test_files_check(reporters$list$get_results(),
83    stop_on_failure = stop_on_failure,
84    stop_on_warning = stop_on_warning
85  )
86}
87
88default_num_cpus <- function() {
89  # Use common option, if set
90  ncpus <- getOption("Ncpus", NULL)
91  if (!is.null(ncpus)) {
92    ncpus <- suppressWarnings(as.integer(ncpus))
93    if (is.na(ncpus)) abort("`getOption(Ncpus)` must be an integer")
94    return(ncpus)
95  }
96
97  # Otherwise use env var if set
98  ncpus <- Sys.getenv("TESTTHAT_CPUS", "")
99  if (ncpus != "") {
100    ncpus <- suppressWarnings(as.integer(ncpus))
101    if (is.na(ncpus)) abort("TESTTHAT_CPUS must be an integer")
102    return(ncpus)
103  }
104
105  # Otherwise 2
106  2L
107}
108
109parallel_event_loop_smooth <- function(queue, reporters) {
110  update_interval <- 0.1
111  next_update <- proc.time()[[3]] + update_interval
112
113  while (!queue$is_idle()) {
114    # How much time do we have to poll before the next UI update?
115    now <- proc.time()[[3]]
116    poll_time <- max(next_update - now, 0)
117    next_update <- now + update_interval
118
119    msgs <- queue$poll(poll_time)
120
121    updated <- FALSE
122    for (x in msgs) {
123      if (x$code != PROCESS_MSG) {
124        next
125      }
126
127      m <- x$message
128      if (!inherits(m, "testthat_message")) {
129        message(m)
130        next
131      }
132
133      if (m$cmd != "DONE") {
134        reporters$multi$start_file(m$filename)
135        do.call(reporters$multi[[m$cmd]], m$args)
136        updated <- TRUE
137      }
138    }
139
140    # We need to spin, even if there were no events
141    if (!updated) reporters$multi$update()
142  }
143}
144
145parallel_event_loop_chunky <- function(queue, reporters) {
146  files <- list()
147  while (!queue$is_idle()) {
148    msgs <- queue$poll(Inf)
149    for (x in msgs) {
150      if (x$code != PROCESS_MSG) {
151        next
152      }
153
154      m <- x$message
155      if (!inherits(m, "testthat_message")) {
156        message(m)
157        next
158      }
159
160      # Record all events until we get end of file, then we replay them all
161      # with the local reporters. This prevents out of order reporting.
162      if (m$cmd != "DONE") {
163        files[[m$filename]] <- append(files[[m$filename]], list(m))
164      } else {
165        replay_events(reporters$multi, files[[m$filename]])
166        reporters$multi$end_context_if_started()
167        files[[m$filename]] <- NULL
168      }
169    }
170  }
171}
172
173replay_events <- function(reporter, events) {
174  for (event in events) {
175    do.call(reporter[[event$cmd]], event$args)
176  }
177}
178
179queue_setup <- function(test_paths,
180                        test_package,
181                        test_dir,
182                        num_workers,
183                        load_helpers,
184                        load_package) {
185
186  # TODO: observe `load_package`, but the "none" default is not
187  # OK for the subprocess, because it'll not have the tested package
188  if (load_package == "none") load_package <- "source"
189
190  # TODO: similarly, load_helpers = FALSE, coming from devtools,
191  # is not appropriate in the subprocess
192  load_helpers <- TRUE
193
194  test_package <- test_package %||% Sys.getenv("TESTTHAT_PKG")
195
196  # First we load the package "manually", in case it is testthat itself
197  load_hook <- expr({
198    switch(!!load_package,
199      installed = library(!!test_package, character.only = TRUE),
200      source = pkgload::load_all(!!test_dir, helpers = FALSE, quiet = TRUE)
201    )
202    asNamespace("testthat")$queue_process_setup(
203      test_package = !!test_package,
204      test_dir = !!test_dir,
205      load_helpers = !!load_helpers,
206      load_package = "none"
207    )
208  })
209  queue <- task_q$new(concurrency = num_workers, load_hook = load_hook)
210
211  fun <- transport_fun(function(path) asNamespace("testthat")$queue_task(path))
212  for (path in test_paths) {
213    queue$push(fun, list(path))
214  }
215
216  queue
217}
218
219queue_process_setup <- function(test_package, test_dir, load_helpers, load_package) {
220  env <- asNamespace("testthat")$test_files_setup_env(
221    test_package,
222    test_dir,
223    load_package
224  )
225  asNamespace("testthat")$test_files_setup_state(
226    test_dir = test_dir,
227    test_package = test_package,
228    load_helpers = load_helpers,
229    env = env,
230    .env = .GlobalEnv
231  )
232
233  # Save test environment in global env where it can easily be retrieved
234  .GlobalEnv$.test_env <- env
235}
236
237queue_task <- function(path) {
238  env <- .GlobalEnv$.test_env
239
240  withr::local_envvar("TESTTHAT_IS_PARALLEL" = "true")
241  reporters <- test_files_reporter(SubprocessReporter$new())
242  with_reporter(reporters$multi, test_one_file(path, env = env))
243  NULL
244}
245
246# Clean up subprocesses: we call teardown methods, but we only give them a
247# second, before killing the whole process tree using ps's env var marker
248# method.
249queue_teardown <- function(queue) {
250  if (is.null(queue)) {
251    return()
252  }
253
254  tasks <- queue$list_tasks()
255  num <- nrow(tasks)
256
257  clean_fn <- function() {
258    withr::deferred_run(.GlobalEnv)
259    quit(save = "no", status = 1L, runLast = TRUE)
260  }
261
262  topoll <- list()
263  for (i in seq_len(num)) {
264    if (!is.null(tasks$worker[[i]])) {
265      # The worker might have crashed or exited, so this might fail.
266      # If it does then we'll just ignore that worker
267      tryCatch({
268        tasks$worker[[i]]$call(clean_fn)
269        topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection())
270      }, error = function(e) tasks$worker[i] <- list(NULL))
271    }
272  }
273
274  # Give covr time to write out the coverage files
275  if (in_covr()) grace <- 30L else grace <- 3L
276  limit <- Sys.time() + grace
277  while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
278    timeout <- as.double(timeout, units = "secs") * 1000
279    pr <- processx::poll(topoll, as.integer(timeout))
280    topoll <- topoll[pr != "ready"]
281  }
282
283  for (i in seq_len(num)) {
284    if (!is.null(tasks$worker[[i]])) {
285      tryCatch(
286        close(tasks$worker[[i]]$get_input_connection()),
287        error = function(e) NULL
288      )
289      if (ps::ps_is_supported()) {
290        tasks$worker[[i]]$kill_tree()
291      } else {
292        tasks$worker[[i]]$kill()
293      }
294    }
295  }
296}
297
298# Reporter that just forwards events in the subprocess back to the main process
299#
300# Ideally, these messages would be throttled, i.e. if the test code
301# emits many expectation conditions fast, SubprocessReporter should
302# collect several of them and only emit a condition a couple of times
303# a second. End-of-test and end-of-file events would be transmitted
304# immediately.
305SubprocessReporter <- R6::R6Class("SubprocessReporter",
306  inherit = Reporter,
307  public = list(
308    start_file = function(filename) {
309      private$filename <- filename
310      private$event("start_file", filename)
311    },
312    start_test = function(context, test) {
313      private$event("start_test", context, test)
314    },
315    start_context = function(context) {
316      private$event("start_context", context)
317    },
318    add_result = function(context, test, result) {
319      if (inherits(result, "expectation_success")) {
320        # Strip bulky components to reduce data transfer cost
321        result[["srcref"]] <- NULL
322        result[["trace"]] <- NULL
323      }
324      private$event("add_result", context, test, result)
325    },
326    end_test = function(context, test) {
327      private$event("end_test", context, test)
328    },
329    end_context = function(context) {
330      private$event("end_context", context)
331    },
332    end_file = function() {
333      private$event("end_file")
334    },
335    end_reporter = function() {
336      private$event("DONE")
337    }
338  ),
339
340  private = list(
341    filename = NULL,
342    event = function(cmd, ...) {
343      msg <- list(
344        code = PROCESS_MSG,
345        cmd = cmd,
346        filename = private$filename,
347        time = proc.time()[[3]],
348        args = list(...)
349      )
350      class(msg) <- c("testthat_message", "callr_message", "condition")
351      signalCondition(msg)
352    }
353  )
354)
355