1#' Resolve one or more futures synchronously
2#'
3#' This function provides an efficient mechanism for waiting for multiple
4#' futures in a container (e.g. list or environment) to be resolved while in
5#' the meanwhile retrieving values of already resolved futures.
6#'
7#' @param x A [Future] to be resolved, or a list, an environment, or a
8#' list environment of futures to be resolved.
9#'
10#' @param idxs (optional) integer or logical index specifying the subset of
11#' elements to check.
12#'
13#' @param recursive A non-negative number specifying how deep of a recursion
14#' should be done.  If TRUE, an infinite recursion is used.  If FALSE or zero,
15#' no recursion is performed.
16#'
17#' @param result (internal) If TRUE, the results are retrieved, otherwise not.
18#'
19#' @param stdout (internal) If TRUE, captured standard output is relayed, otherwise note.
20#'
21#' @param signal (internal) If TRUE, captured \link[base]{conditions} are relayed,
22#' otherwise not.
23#'
24#' @param force (internal) If TRUE, captured standard output and captured
25#' \link[base]{conditions} already relayed is relayed again, otherwise not.
26#'
27#' @param sleep Number of seconds to wait before checking if futures have been
28#' resolved since last time.
29#'
30#' @param value (DEFUNCT) Use argument `result` instead.
31#'
32#' @param \dots Not used.
33#'
34#' @return Returns `x` (regardless of subsetting or not).
35#' If `signal` is TRUE and one of the futures produces an error, then
36#' that error is produced.
37#'
38#' @details
39#' This function is resolves synchronously, i.e. it blocks until `x` and
40#' any containing futures are resolved.
41#'
42#' @seealso To resolve a future _variable_, first retrieve its
43#' [Future] object using [futureOf()], e.g.
44#' `resolve(futureOf(x))`.
45#'
46#' @export
47resolve <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 1.0, value = result, ...) UseMethod("resolve")
48
49#' @export
50resolve.default <- function(x, ...) x
51
52#' @export
53resolve.Future <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) {
54  ## BACKWARD COMPATIBILITY
55  if (value && missing(result)) {
56    .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName)
57  }
58
59  if (is.logical(recursive)) {
60    if (recursive) recursive <- getOption("future.resolve.recursive", 99)
61  }
62  recursive <- as.numeric(recursive)
63
64  ## Nothing to do?
65  if (recursive < 0) return(x)
66
67  relay <- (stdout || signal)
68  result <- result || relay
69
70  ## Lazy future that is not yet launched?
71  if (x$state == 'created') x <- run(x)
72
73  ## Poll for the Future to finish
74  while (!resolved(x)) {
75    Sys.sleep(sleep)
76  }
77
78  msg <- sprintf("A %s was resolved", class(x)[1])
79
80  ## Retrieve results?
81  if (result) {
82    if (is.null(x$result)) {
83      x$result <- result(x)
84      msg <- sprintf("%s and its result was collected", msg)
85    } else {
86      sprintf("%s and its result was already collected", msg)
87    }
88
89    ## Recursively resolve result value?
90    if (recursive > 0) {
91      value <- x$result$value
92      if (!is.atomic(value)) {
93        resolve(value, recursive = recursive - 1, result = TRUE, stdout = stdout, signal = signal, sleep = sleep, ...)
94        msg <- sprintf("%s (and resolved itself)", msg)
95      }
96      value <- NULL  ## Not needed anymore
97    }
98    result <- NULL     ## Not needed anymore
99
100    if (stdout) value(x, stdout = TRUE, signal = FALSE)
101    if (signal) {
102      ## Always signal immediateCondition:s and as soon as possible.
103      ## They will always be signaled if they exist.
104      signalImmediateConditions(x)
105
106      ## Signal all other types of condition
107      signalConditions(x, exclude = getOption("future.relay.immediate", "immediateCondition"), resignal = TRUE, force = TRUE)
108    }
109  } else {
110    msg <- sprintf("%s (result was not collected)", msg)
111  }
112
113  mdebug(msg)
114
115  x
116} ## resolve() for Future
117
118
119#' @export
120resolve.list <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) {
121  ## BACKWARD COMPATIBILITY
122  if (value && missing(result)) {
123    .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName)
124  }
125
126  if (is.logical(recursive)) {
127    if (recursive) recursive <- getOption("future.resolve.recursive", 99)
128  }
129  recursive <- as.numeric(recursive)
130
131  ## Nothing to do?
132  if (recursive < 0) return(x)
133
134  nx <- .length(x)
135
136  ## Nothing to do?
137  if (nx == 0) return(x)
138
139  relay <- (stdout || signal)
140  result <- result || relay
141
142  x0 <- x
143
144  ## Subset?
145  if (!is.null(idxs)) {
146    ## Nothing to do?
147    if (length(idxs) == 0) return(x)
148
149    ## Multi-dimensional indices?
150    if (is.matrix(idxs)) {
151      idxs <- whichIndex(idxs, dim = dim(x), dimnames = dimnames(x))
152    }
153    idxs <- unique(idxs)
154
155    if (is.numeric(idxs)) {
156      idxs <- as.numeric(idxs)
157      if (any(idxs < 1 | idxs > nx)) {
158        stopf("Indices out of range [1,%d]: %s", nx, hpaste(idxs))
159      }
160    } else {
161      names <- names(x)
162      if (is.null(names)) {
163        stop("Named subsetting not possible. Elements are not named.")
164      }
165
166      idxs <- as.character(idxs)
167      unknown <- idxs[!is.element(idxs, names)]
168      if (length(unknown) > 0) {
169        stopf("Unknown elements: %s", hpaste(sQuote(unknown)))
170      }
171    }
172
173    x <- x[idxs]
174    nx <- .length(x)
175  }
176
177  debug <- getOption("future.debug", FALSE)
178  if (debug) {
179    mdebug("resolve() on list ...")
180    mdebugf(" recursive: %s", recursive)
181  }
182
183  ## NOTE: Everything is considered non-resolved by default
184
185  ## Total number of values to resolve
186  total <- nx
187  remaining <- seq_len(nx)
188
189  ## Relay?
190  signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug)
191
192  if (debug) {
193    mdebugf(" length: %d", nx)
194    mdebugf(" elements: %s", hpaste(sQuote(names(x))))
195  }
196
197  ## Resolve all elements
198  while (length(remaining) > 0) {
199    for (ii in remaining) {
200      obj <- x[[ii]]
201
202      if (is.atomic(obj)) {
203        if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii)
204      } else {
205        ## If an unresolved future, move on to the next object
206        ## so that future can be resolved in the asynchronously
207        if (inherits(obj, "Future")) {
208          ## Lazy future that is not yet launched?
209          if (obj$state == 'created') obj <- run(obj)
210          if (!resolved(obj)) next
211          if (debug) mdebugf("Future #%d", ii)
212          if (result) value(obj, stdout = FALSE, signal = FALSE)
213        }
214
215        relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii)
216
217        ## In all other cases, try to resolve
218        resolve(obj,
219                recursive = recursive - 1,
220                result = result,
221                stdout = stdout && relay_ok,
222                signal = signal && relay_ok,
223                sleep = sleep, ...)
224      }
225
226      ## Assume resolved at this point
227      remaining <- setdiff(remaining, ii)
228      if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii)
229      stop_if_not(!anyNA(remaining))
230    } # for (ii ...)
231
232    ## Wait a bit before checking again
233    if (length(remaining) > 0) Sys.sleep(sleep)
234  } # while (...)
235
236  if (relay || force) {
237    if (debug) mdebug("Relaying remaining futures")
238    signalConditionsASAP(resignal = FALSE, pos = 0L)
239  }
240
241  if (debug) mdebug("resolve() on list ... DONE")
242
243  x0
244} ## resolve() for list
245
246
247#' @export
248resolve.environment <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) {
249  ## BACKWARD COMPATIBILITY
250  if (value && missing(result)) {
251    .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName)
252  }
253
254  if (is.logical(recursive)) {
255    if (recursive) recursive <- getOption("future.resolve.recursive", 99)
256  }
257  recursive <- as.numeric(recursive)
258
259  ## Nothing to do?
260  if (recursive < 0) return(x)
261
262  nx <- .length(x)
263
264  ## Nothing to do?
265  if (nx == 0) return(x)
266
267  ## Subset?
268  if (is.null(idxs)) {
269    ## names(x) is only supported in R (>= 3.2.0)
270    idxs <- ls(envir = x, all.names = TRUE)
271  } else {
272    ## Nothing to do?
273    if (length(idxs) == 0) return(x)
274
275    ## names(x) is only supported in R (>= 3.2.0)
276    names <- ls(envir = x, all.names = TRUE)
277
278    ## Sanity check (because nx == 0 returns early above)
279    stop_if_not(length(names) > 0)
280
281    idxs <- unique(idxs)
282
283    idxs <- as.character(idxs)
284    unknown <- idxs[!is.element(idxs, names)]
285    if (length(unknown) > 0) {
286      stopf("Unknown elements: %s", hpaste(sQuote(unknown)))
287    }
288  }
289
290
291  ## Nothing to do?
292  nx <- length(idxs)
293  if (nx == 0) return(x)
294
295  relay <- (stdout || signal)
296  result <- result || relay
297
298  debug <- getOption("future.debug", FALSE)
299  if (debug) {
300    mdebug("resolve() on environment ...")
301    mdebugf(" recursive: %s", recursive)
302  }
303
304  ## Coerce future promises into Future objects
305  x0 <- x
306  x <- futures(x)
307  nx <- .length(x)
308  names <- ls(envir = x, all.names = TRUE)
309  stop_if_not(length(names) == nx)
310
311  ## Everything is considered non-resolved by default
312  remaining <- seq_len(nx)
313
314  ## Relay?
315  signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug)
316
317  if (debug) mdebugf(" elements: [%d] %s", nx, hpaste(sQuote(idxs)))
318
319  ## Resolve all elements
320  while (length(remaining) > 0) {
321    for (ii in remaining) {
322      name <- names[ii]
323      obj <- x[[name]]
324
325      if (is.atomic(obj)) {
326        if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii)
327      } else {
328        ## If an unresolved future, move on to the next object
329        ## so that future can be resolved in the asynchronously
330        if (inherits(obj, "Future")) {
331          ## Lazy future that is not yet launched?
332          if (obj$state == 'created') obj <- run(obj)
333          if (!resolved(obj)) next
334          if (debug) mdebugf("Future #%d", ii)
335          if (result) value(obj, stdout = FALSE, signal = FALSE)
336        }
337
338        relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii)
339
340        ## In all other cases, try to resolve
341        resolve(obj,
342                recursive = recursive - 1,
343                result = result,
344                stdout = stdout && relay_ok,
345                signal = signal && relay_ok,
346                sleep = sleep, ...)
347      }
348
349      ## Assume resolved at this point
350      remaining <- setdiff(remaining, ii)
351      if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii)
352      stop_if_not(!anyNA(remaining))
353    } # for (ii ...)
354
355    ## Wait a bit before checking again
356    if (length(remaining) > 0) Sys.sleep(sleep)
357  } # while (...)
358
359  if (relay || force) {
360    if (debug) mdebug("Relaying remaining futures")
361    signalConditionsASAP(resignal = FALSE, pos = 0L)
362  }
363
364  if (debug) mdebug("resolve() on environment ... DONE")
365
366  x0
367} ## resolve() for environment
368
369
370#' @export
371resolve.listenv <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) {
372  ## BACKWARD COMPATIBILITY
373  if (value && missing(result)) {
374    .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName)
375  }
376
377  if (is.logical(recursive)) {
378    if (recursive) recursive <- getOption("future.resolve.recursive", 99)
379  }
380  recursive <- as.numeric(recursive)
381
382  ## Nothing to do?
383  if (recursive < 0) return(x)
384
385  ## NOTE: Contrary to other implementations that use .length(x), we here
386  ## do need to use generic length() that dispatches on class.
387  nx <- length(x)
388
389  ## Nothing to do?
390  if (nx == 0) return(x)
391
392  ## Subset?
393  if (is.null(idxs)) {
394    idxs <- seq_along(x)
395  } else {
396    ## Nothing to do?
397    if (length(idxs) == 0) return(x)
398
399    ## Multi-dimensional indices?
400    if (is.matrix(idxs)) {
401      idxs <- whichIndex(idxs, dim = dim(x), dimnames = dimnames(x))
402    }
403    idxs <- unique(idxs)
404
405    if (is.numeric(idxs)) {
406      if (any(idxs < 1 | idxs > nx)) {
407        stopf("Indices out of range [1,%d]: %s", nx, hpaste(idxs))
408      }
409    } else {
410      names <- names(x)
411
412      ## Sanity check (because nx == 0 returns early above)
413      stop_if_not(length(names) > 0)
414
415      idxs <- as.character(idxs)
416      unknown <- idxs[!is.element(idxs, names)]
417      if (length(unknown) > 0) {
418        stopf("Unknown elements: %s", hpaste(sQuote(unknown)))
419      }
420    }
421  }
422
423  ## Nothing to do?
424  nx <- length(idxs)
425  if (nx == 0) return(x)
426
427  relay <- (stdout || signal)
428  result <- result || relay
429
430  debug <- getOption("future.debug", FALSE)
431  if (debug) {
432    mdebug("resolve() on list environment ...")
433    mdebugf(" recursive: %s", recursive)
434  }
435
436  ## Coerce future promises into Future objects
437  x0 <- x
438  x <- futures(x)
439  nx <- length(x)
440
441  ## Everything is considered non-resolved by default
442  remaining <- seq_len(nx)
443
444  ## Relay?
445  signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug)
446
447  if (debug) {
448    mdebugf(" length: %d", nx)
449    mdebugf(" elements: %s", hpaste(sQuote(names(x))))
450  }
451
452  ## Resolve all elements
453  while (length(remaining) > 0) {
454    for (ii in remaining) {
455      obj <- x[[ii]]
456
457      if (is.atomic(obj)) {
458        if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii)
459      } else {
460        ## If an unresolved future, move on to the next object
461        ## so that future can be resolved in the asynchronously
462        if (inherits(obj, "Future")) {
463          ## Lazy future that is not yet launched?
464          if (obj$state == 'created') obj <- run(obj)
465          if (!resolved(obj)) next
466          if (debug) mdebugf("Future #%d", ii)
467          if (result) value(obj, stdout = FALSE, signal = FALSE)
468        }
469
470        relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii)
471
472        ## In all other cases, try to resolve
473        resolve(obj,
474                recursive = recursive - 1,
475                result = result,
476                stdout = stdout && relay_ok,
477                signal = signal && relay_ok,
478                sleep = sleep, ...)
479      }
480
481      ## Assume resolved at this point
482      remaining <- setdiff(remaining, ii)
483      if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii)
484      stop_if_not(!anyNA(remaining))
485    } # for (ii ...)
486
487    ## Wait a bit before checking again
488    if (length(remaining) > 0) Sys.sleep(sleep)
489  } # while (...)
490
491  if (relay || force) {
492    if (debug) mdebug("Relaying remaining futures")
493    signalConditionsASAP(resignal = FALSE, pos = 0L)
494  }
495
496  if (debug) mdebug("resolve() on list environment ... DONE")
497
498  x0
499} ## resolve() for list environment
500