1#' Resolve one or more futures synchronously 2#' 3#' This function provides an efficient mechanism for waiting for multiple 4#' futures in a container (e.g. list or environment) to be resolved while in 5#' the meanwhile retrieving values of already resolved futures. 6#' 7#' @param x A [Future] to be resolved, or a list, an environment, or a 8#' list environment of futures to be resolved. 9#' 10#' @param idxs (optional) integer or logical index specifying the subset of 11#' elements to check. 12#' 13#' @param recursive A non-negative number specifying how deep of a recursion 14#' should be done. If TRUE, an infinite recursion is used. If FALSE or zero, 15#' no recursion is performed. 16#' 17#' @param result (internal) If TRUE, the results are retrieved, otherwise not. 18#' 19#' @param stdout (internal) If TRUE, captured standard output is relayed, otherwise note. 20#' 21#' @param signal (internal) If TRUE, captured \link[base]{conditions} are relayed, 22#' otherwise not. 23#' 24#' @param force (internal) If TRUE, captured standard output and captured 25#' \link[base]{conditions} already relayed is relayed again, otherwise not. 26#' 27#' @param sleep Number of seconds to wait before checking if futures have been 28#' resolved since last time. 29#' 30#' @param value (DEFUNCT) Use argument `result` instead. 31#' 32#' @param \dots Not used. 33#' 34#' @return Returns `x` (regardless of subsetting or not). 35#' If `signal` is TRUE and one of the futures produces an error, then 36#' that error is produced. 37#' 38#' @details 39#' This function is resolves synchronously, i.e. it blocks until `x` and 40#' any containing futures are resolved. 41#' 42#' @seealso To resolve a future _variable_, first retrieve its 43#' [Future] object using [futureOf()], e.g. 44#' `resolve(futureOf(x))`. 45#' 46#' @export 47resolve <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 1.0, value = result, ...) UseMethod("resolve") 48 49#' @export 50resolve.default <- function(x, ...) x 51 52#' @export 53resolve.Future <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) { 54 ## BACKWARD COMPATIBILITY 55 if (value && missing(result)) { 56 .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName) 57 } 58 59 if (is.logical(recursive)) { 60 if (recursive) recursive <- getOption("future.resolve.recursive", 99) 61 } 62 recursive <- as.numeric(recursive) 63 64 ## Nothing to do? 65 if (recursive < 0) return(x) 66 67 relay <- (stdout || signal) 68 result <- result || relay 69 70 ## Lazy future that is not yet launched? 71 if (x$state == 'created') x <- run(x) 72 73 ## Poll for the Future to finish 74 while (!resolved(x)) { 75 Sys.sleep(sleep) 76 } 77 78 msg <- sprintf("A %s was resolved", class(x)[1]) 79 80 ## Retrieve results? 81 if (result) { 82 if (is.null(x$result)) { 83 x$result <- result(x) 84 msg <- sprintf("%s and its result was collected", msg) 85 } else { 86 sprintf("%s and its result was already collected", msg) 87 } 88 89 ## Recursively resolve result value? 90 if (recursive > 0) { 91 value <- x$result$value 92 if (!is.atomic(value)) { 93 resolve(value, recursive = recursive - 1, result = TRUE, stdout = stdout, signal = signal, sleep = sleep, ...) 94 msg <- sprintf("%s (and resolved itself)", msg) 95 } 96 value <- NULL ## Not needed anymore 97 } 98 result <- NULL ## Not needed anymore 99 100 if (stdout) value(x, stdout = TRUE, signal = FALSE) 101 if (signal) { 102 ## Always signal immediateCondition:s and as soon as possible. 103 ## They will always be signaled if they exist. 104 signalImmediateConditions(x) 105 106 ## Signal all other types of condition 107 signalConditions(x, exclude = getOption("future.relay.immediate", "immediateCondition"), resignal = TRUE, force = TRUE) 108 } 109 } else { 110 msg <- sprintf("%s (result was not collected)", msg) 111 } 112 113 mdebug(msg) 114 115 x 116} ## resolve() for Future 117 118 119#' @export 120resolve.list <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) { 121 ## BACKWARD COMPATIBILITY 122 if (value && missing(result)) { 123 .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName) 124 } 125 126 if (is.logical(recursive)) { 127 if (recursive) recursive <- getOption("future.resolve.recursive", 99) 128 } 129 recursive <- as.numeric(recursive) 130 131 ## Nothing to do? 132 if (recursive < 0) return(x) 133 134 nx <- .length(x) 135 136 ## Nothing to do? 137 if (nx == 0) return(x) 138 139 relay <- (stdout || signal) 140 result <- result || relay 141 142 x0 <- x 143 144 ## Subset? 145 if (!is.null(idxs)) { 146 ## Nothing to do? 147 if (length(idxs) == 0) return(x) 148 149 ## Multi-dimensional indices? 150 if (is.matrix(idxs)) { 151 idxs <- whichIndex(idxs, dim = dim(x), dimnames = dimnames(x)) 152 } 153 idxs <- unique(idxs) 154 155 if (is.numeric(idxs)) { 156 idxs <- as.numeric(idxs) 157 if (any(idxs < 1 | idxs > nx)) { 158 stopf("Indices out of range [1,%d]: %s", nx, hpaste(idxs)) 159 } 160 } else { 161 names <- names(x) 162 if (is.null(names)) { 163 stop("Named subsetting not possible. Elements are not named.") 164 } 165 166 idxs <- as.character(idxs) 167 unknown <- idxs[!is.element(idxs, names)] 168 if (length(unknown) > 0) { 169 stopf("Unknown elements: %s", hpaste(sQuote(unknown))) 170 } 171 } 172 173 x <- x[idxs] 174 nx <- .length(x) 175 } 176 177 debug <- getOption("future.debug", FALSE) 178 if (debug) { 179 mdebug("resolve() on list ...") 180 mdebugf(" recursive: %s", recursive) 181 } 182 183 ## NOTE: Everything is considered non-resolved by default 184 185 ## Total number of values to resolve 186 total <- nx 187 remaining <- seq_len(nx) 188 189 ## Relay? 190 signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug) 191 192 if (debug) { 193 mdebugf(" length: %d", nx) 194 mdebugf(" elements: %s", hpaste(sQuote(names(x)))) 195 } 196 197 ## Resolve all elements 198 while (length(remaining) > 0) { 199 for (ii in remaining) { 200 obj <- x[[ii]] 201 202 if (is.atomic(obj)) { 203 if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii) 204 } else { 205 ## If an unresolved future, move on to the next object 206 ## so that future can be resolved in the asynchronously 207 if (inherits(obj, "Future")) { 208 ## Lazy future that is not yet launched? 209 if (obj$state == 'created') obj <- run(obj) 210 if (!resolved(obj)) next 211 if (debug) mdebugf("Future #%d", ii) 212 if (result) value(obj, stdout = FALSE, signal = FALSE) 213 } 214 215 relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii) 216 217 ## In all other cases, try to resolve 218 resolve(obj, 219 recursive = recursive - 1, 220 result = result, 221 stdout = stdout && relay_ok, 222 signal = signal && relay_ok, 223 sleep = sleep, ...) 224 } 225 226 ## Assume resolved at this point 227 remaining <- setdiff(remaining, ii) 228 if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii) 229 stop_if_not(!anyNA(remaining)) 230 } # for (ii ...) 231 232 ## Wait a bit before checking again 233 if (length(remaining) > 0) Sys.sleep(sleep) 234 } # while (...) 235 236 if (relay || force) { 237 if (debug) mdebug("Relaying remaining futures") 238 signalConditionsASAP(resignal = FALSE, pos = 0L) 239 } 240 241 if (debug) mdebug("resolve() on list ... DONE") 242 243 x0 244} ## resolve() for list 245 246 247#' @export 248resolve.environment <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) { 249 ## BACKWARD COMPATIBILITY 250 if (value && missing(result)) { 251 .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName) 252 } 253 254 if (is.logical(recursive)) { 255 if (recursive) recursive <- getOption("future.resolve.recursive", 99) 256 } 257 recursive <- as.numeric(recursive) 258 259 ## Nothing to do? 260 if (recursive < 0) return(x) 261 262 nx <- .length(x) 263 264 ## Nothing to do? 265 if (nx == 0) return(x) 266 267 ## Subset? 268 if (is.null(idxs)) { 269 ## names(x) is only supported in R (>= 3.2.0) 270 idxs <- ls(envir = x, all.names = TRUE) 271 } else { 272 ## Nothing to do? 273 if (length(idxs) == 0) return(x) 274 275 ## names(x) is only supported in R (>= 3.2.0) 276 names <- ls(envir = x, all.names = TRUE) 277 278 ## Sanity check (because nx == 0 returns early above) 279 stop_if_not(length(names) > 0) 280 281 idxs <- unique(idxs) 282 283 idxs <- as.character(idxs) 284 unknown <- idxs[!is.element(idxs, names)] 285 if (length(unknown) > 0) { 286 stopf("Unknown elements: %s", hpaste(sQuote(unknown))) 287 } 288 } 289 290 291 ## Nothing to do? 292 nx <- length(idxs) 293 if (nx == 0) return(x) 294 295 relay <- (stdout || signal) 296 result <- result || relay 297 298 debug <- getOption("future.debug", FALSE) 299 if (debug) { 300 mdebug("resolve() on environment ...") 301 mdebugf(" recursive: %s", recursive) 302 } 303 304 ## Coerce future promises into Future objects 305 x0 <- x 306 x <- futures(x) 307 nx <- .length(x) 308 names <- ls(envir = x, all.names = TRUE) 309 stop_if_not(length(names) == nx) 310 311 ## Everything is considered non-resolved by default 312 remaining <- seq_len(nx) 313 314 ## Relay? 315 signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug) 316 317 if (debug) mdebugf(" elements: [%d] %s", nx, hpaste(sQuote(idxs))) 318 319 ## Resolve all elements 320 while (length(remaining) > 0) { 321 for (ii in remaining) { 322 name <- names[ii] 323 obj <- x[[name]] 324 325 if (is.atomic(obj)) { 326 if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii) 327 } else { 328 ## If an unresolved future, move on to the next object 329 ## so that future can be resolved in the asynchronously 330 if (inherits(obj, "Future")) { 331 ## Lazy future that is not yet launched? 332 if (obj$state == 'created') obj <- run(obj) 333 if (!resolved(obj)) next 334 if (debug) mdebugf("Future #%d", ii) 335 if (result) value(obj, stdout = FALSE, signal = FALSE) 336 } 337 338 relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii) 339 340 ## In all other cases, try to resolve 341 resolve(obj, 342 recursive = recursive - 1, 343 result = result, 344 stdout = stdout && relay_ok, 345 signal = signal && relay_ok, 346 sleep = sleep, ...) 347 } 348 349 ## Assume resolved at this point 350 remaining <- setdiff(remaining, ii) 351 if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii) 352 stop_if_not(!anyNA(remaining)) 353 } # for (ii ...) 354 355 ## Wait a bit before checking again 356 if (length(remaining) > 0) Sys.sleep(sleep) 357 } # while (...) 358 359 if (relay || force) { 360 if (debug) mdebug("Relaying remaining futures") 361 signalConditionsASAP(resignal = FALSE, pos = 0L) 362 } 363 364 if (debug) mdebug("resolve() on environment ... DONE") 365 366 x0 367} ## resolve() for environment 368 369 370#' @export 371resolve.listenv <- function(x, idxs = NULL, recursive = 0, result = FALSE, stdout = FALSE, signal = FALSE, force = FALSE, sleep = 0.1, value = result, ...) { 372 ## BACKWARD COMPATIBILITY 373 if (value && missing(result)) { 374 .Defunct(msg = "Argument 'value' of resolve() is defunct. It was deprecated in future (>= 1.15.0). Use 'result' instead.", package = .packageName) 375 } 376 377 if (is.logical(recursive)) { 378 if (recursive) recursive <- getOption("future.resolve.recursive", 99) 379 } 380 recursive <- as.numeric(recursive) 381 382 ## Nothing to do? 383 if (recursive < 0) return(x) 384 385 ## NOTE: Contrary to other implementations that use .length(x), we here 386 ## do need to use generic length() that dispatches on class. 387 nx <- length(x) 388 389 ## Nothing to do? 390 if (nx == 0) return(x) 391 392 ## Subset? 393 if (is.null(idxs)) { 394 idxs <- seq_along(x) 395 } else { 396 ## Nothing to do? 397 if (length(idxs) == 0) return(x) 398 399 ## Multi-dimensional indices? 400 if (is.matrix(idxs)) { 401 idxs <- whichIndex(idxs, dim = dim(x), dimnames = dimnames(x)) 402 } 403 idxs <- unique(idxs) 404 405 if (is.numeric(idxs)) { 406 if (any(idxs < 1 | idxs > nx)) { 407 stopf("Indices out of range [1,%d]: %s", nx, hpaste(idxs)) 408 } 409 } else { 410 names <- names(x) 411 412 ## Sanity check (because nx == 0 returns early above) 413 stop_if_not(length(names) > 0) 414 415 idxs <- as.character(idxs) 416 unknown <- idxs[!is.element(idxs, names)] 417 if (length(unknown) > 0) { 418 stopf("Unknown elements: %s", hpaste(sQuote(unknown))) 419 } 420 } 421 } 422 423 ## Nothing to do? 424 nx <- length(idxs) 425 if (nx == 0) return(x) 426 427 relay <- (stdout || signal) 428 result <- result || relay 429 430 debug <- getOption("future.debug", FALSE) 431 if (debug) { 432 mdebug("resolve() on list environment ...") 433 mdebugf(" recursive: %s", recursive) 434 } 435 436 ## Coerce future promises into Future objects 437 x0 <- x 438 x <- futures(x) 439 nx <- length(x) 440 441 ## Everything is considered non-resolved by default 442 remaining <- seq_len(nx) 443 444 ## Relay? 445 signalConditionsASAP <- make_signalConditionsASAP(nx, stdout = stdout, signal = signal, force = force, debug = debug) 446 447 if (debug) { 448 mdebugf(" length: %d", nx) 449 mdebugf(" elements: %s", hpaste(sQuote(names(x)))) 450 } 451 452 ## Resolve all elements 453 while (length(remaining) > 0) { 454 for (ii in remaining) { 455 obj <- x[[ii]] 456 457 if (is.atomic(obj)) { 458 if (relay) signalConditionsASAP(obj, resignal = FALSE, pos = ii) 459 } else { 460 ## If an unresolved future, move on to the next object 461 ## so that future can be resolved in the asynchronously 462 if (inherits(obj, "Future")) { 463 ## Lazy future that is not yet launched? 464 if (obj$state == 'created') obj <- run(obj) 465 if (!resolved(obj)) next 466 if (debug) mdebugf("Future #%d", ii) 467 if (result) value(obj, stdout = FALSE, signal = FALSE) 468 } 469 470 relay_ok <- relay && signalConditionsASAP(obj, resignal = FALSE, pos = ii) 471 472 ## In all other cases, try to resolve 473 resolve(obj, 474 recursive = recursive - 1, 475 result = result, 476 stdout = stdout && relay_ok, 477 signal = signal && relay_ok, 478 sleep = sleep, ...) 479 } 480 481 ## Assume resolved at this point 482 remaining <- setdiff(remaining, ii) 483 if (debug) mdebugf(" length: %d (resolved future %s)", length(remaining), ii) 484 stop_if_not(!anyNA(remaining)) 485 } # for (ii ...) 486 487 ## Wait a bit before checking again 488 if (length(remaining) > 0) Sys.sleep(sleep) 489 } # while (...) 490 491 if (relay || force) { 492 if (debug) mdebug("Relaying remaining futures") 493 signalConditionsASAP(resignal = FALSE, pos = 0L) 494 } 495 496 if (debug) mdebug("resolve() on list environment ... DONE") 497 498 x0 499} ## resolve() for list environment 500