1FutureRegistry <- local({
2  db <- list()
3
4  indexOf <- function(futures, future) {
5    for (ii in seq_along(futures)) {
6      if (identical(future, futures[[ii]])) return(ii)
7    }
8    NA_integer_
9  }
10
11  collectValues <- function(where, futures, firstOnly = TRUE) {
12    for (ii in seq_along(futures)) {
13      future <- futures[[ii]]
14
15      ## Is future even launched?
16      if (future$state == "created") next
17
18      ## NOTE: It is when calling resolved() on a future with
19      ##       early signaling is enabled that conditioned
20      ##       may be signaled.
21      if (resolved(future, run = FALSE)) {
22        ## (a) Let future cleanup after itself, iff needed.
23        ##     This, this may result in a call to
24        ##     FutureRegistry(..., action = "remove").
25        tryCatch({
26          value(future, stdout = FALSE, signal = FALSE)
27        }, FutureError = function(ex) {
28          mdebugf("Detected a %s while FutureRegistry collecting results:", class(ex)[1])
29          mprint(ex)
30        })
31
32        ## (b) Make sure future is removed from registry, unless
33        ##     already done via above value() call.
34        futuresDB <- db[[where]]
35        idx <- indexOf(futuresDB, future = future)
36        if (!is.na(idx)) {
37          futuresDB[[idx]] <- NULL
38          db[[where]] <<- futuresDB
39        }
40
41        ## (c) Collect only the first resolved future?
42        if (firstOnly) break
43      }
44    } ## for (ii ...)
45
46    invisible(futures)
47  } ## collectValues()
48
49
50  function(where, action = c("add", "remove", "list", "collect-first", "collect-all", "reset"), future = NULL, earlySignal = TRUE, ...) {
51    stop_if_not(length(where) == 1, nzchar(where))
52    futures <- db[[where]]
53
54    ## Automatically create?
55    if (is.null(futures)) {
56      futures <- list()
57      db[[where]] <<- futures
58    }
59
60    if (action == "add") {
61      idx <- indexOf(futures, future = future)
62      if (!is.na(idx)) {
63        msg <- sprintf("Cannot add to %s registry. %s is already registered.", sQuote(where), class(future)[1])
64        mdebug("ERROR: ", msg)
65        stop(FutureError(msg, future = future))
66      }
67      futures[[length(futures)+1L]] <- future
68      db[[where]] <<- futures
69    } else if (action == "remove") {
70      idx <- indexOf(futures, future = future)
71      if (is.na(idx)) {
72        msg <- sprintf("Cannot remove from %s registry. %s not registered.", sQuote(where), class(future)[1])
73        mdebug("ERROR: ", msg)
74        stop(FutureError(msg, future = future))
75      }
76      futures[[idx]] <- NULL
77      db[[where]] <<- futures
78    } else if (action == "collect-first") {
79      collectValues(where, futures = futures, firstOnly = TRUE)
80    } else if (action == "collect-all") {
81      collectValues(where, futures = futures, firstOnly = FALSE)
82    } else if (action == "reset") {
83      db[[where]] <<- list()
84    } else if (action == "list") {
85    } else {
86      msg <- sprintf("INTERNAL ERROR: Unknown action to %s registry: %s", sQuote(where), action)
87      mdebug(msg)
88      stop(FutureError(msg, future = future))
89    }
90
91    ## Early signaling of conditions?
92    if (earlySignal && length(futures) > 0L) {
93      ## Which futures have early signaling enabled?
94      idxs <- lapply(futures, FUN = function(f) f$earlySignal)
95      idxs <- which(unlist(idxs, use.names = FALSE))
96
97      ## Any futures to be scanned for early signaling?
98      if (length(idxs) > 0) {
99        ## Collect values, which will trigger signaling during
100        ## calls to resolved().
101        collectValues(where, futures = futures[idxs], firstOnly = FALSE)
102      }
103    }
104
105    futures
106  }
107})
108