1FutureRegistry <- local({ 2 db <- list() 3 4 indexOf <- function(futures, future) { 5 for (ii in seq_along(futures)) { 6 if (identical(future, futures[[ii]])) return(ii) 7 } 8 NA_integer_ 9 } 10 11 collectValues <- function(where, futures, firstOnly = TRUE) { 12 for (ii in seq_along(futures)) { 13 future <- futures[[ii]] 14 15 ## Is future even launched? 16 if (future$state == "created") next 17 18 ## NOTE: It is when calling resolved() on a future with 19 ## early signaling is enabled that conditioned 20 ## may be signaled. 21 if (resolved(future, run = FALSE)) { 22 ## (a) Let future cleanup after itself, iff needed. 23 ## This, this may result in a call to 24 ## FutureRegistry(..., action = "remove"). 25 tryCatch({ 26 value(future, stdout = FALSE, signal = FALSE) 27 }, FutureError = function(ex) { 28 mdebugf("Detected a %s while FutureRegistry collecting results:", class(ex)[1]) 29 mprint(ex) 30 }) 31 32 ## (b) Make sure future is removed from registry, unless 33 ## already done via above value() call. 34 futuresDB <- db[[where]] 35 idx <- indexOf(futuresDB, future = future) 36 if (!is.na(idx)) { 37 futuresDB[[idx]] <- NULL 38 db[[where]] <<- futuresDB 39 } 40 41 ## (c) Collect only the first resolved future? 42 if (firstOnly) break 43 } 44 } ## for (ii ...) 45 46 invisible(futures) 47 } ## collectValues() 48 49 50 function(where, action = c("add", "remove", "list", "collect-first", "collect-all", "reset"), future = NULL, earlySignal = TRUE, ...) { 51 stop_if_not(length(where) == 1, nzchar(where)) 52 futures <- db[[where]] 53 54 ## Automatically create? 55 if (is.null(futures)) { 56 futures <- list() 57 db[[where]] <<- futures 58 } 59 60 if (action == "add") { 61 idx <- indexOf(futures, future = future) 62 if (!is.na(idx)) { 63 msg <- sprintf("Cannot add to %s registry. %s is already registered.", sQuote(where), class(future)[1]) 64 mdebug("ERROR: ", msg) 65 stop(FutureError(msg, future = future)) 66 } 67 futures[[length(futures)+1L]] <- future 68 db[[where]] <<- futures 69 } else if (action == "remove") { 70 idx <- indexOf(futures, future = future) 71 if (is.na(idx)) { 72 msg <- sprintf("Cannot remove from %s registry. %s not registered.", sQuote(where), class(future)[1]) 73 mdebug("ERROR: ", msg) 74 stop(FutureError(msg, future = future)) 75 } 76 futures[[idx]] <- NULL 77 db[[where]] <<- futures 78 } else if (action == "collect-first") { 79 collectValues(where, futures = futures, firstOnly = TRUE) 80 } else if (action == "collect-all") { 81 collectValues(where, futures = futures, firstOnly = FALSE) 82 } else if (action == "reset") { 83 db[[where]] <<- list() 84 } else if (action == "list") { 85 } else { 86 msg <- sprintf("INTERNAL ERROR: Unknown action to %s registry: %s", sQuote(where), action) 87 mdebug(msg) 88 stop(FutureError(msg, future = future)) 89 } 90 91 ## Early signaling of conditions? 92 if (earlySignal && length(futures) > 0L) { 93 ## Which futures have early signaling enabled? 94 idxs <- lapply(futures, FUN = function(f) f$earlySignal) 95 idxs <- which(unlist(idxs, use.names = FALSE)) 96 97 ## Any futures to be scanned for early signaling? 98 if (length(idxs) > 0) { 99 ## Collect values, which will trigger signaling during 100 ## calls to resolved(). 101 collectValues(where, futures = futures[idxs], firstOnly = FALSE) 102 } 103 } 104 105 futures 106 } 107}) 108