1#' Add or Remove a Global 'progression' Handler 2#' 3#' @param action (character string) 4#' If `"add"`, a global handler is added. 5#' If `"remove"`, it is removed, if it exists. 6#' If `"query"`, checks whether a handler is registered or not. 7#' 8#' @return Returns TRUE if a handler is registered, otherwise FALSE. 9#' If `action = "query"`, the value is visible, otherwise invisible. 10#' 11#' @section Requirements: 12#' This function requires R (>= 4.0.0) - the version in which global calling 13#' handlers where introduces. 14#' 15#' @example incl/register_global_progression_handler.R 16#' 17#' @keywords internal 18register_global_progression_handler <- function(action = c("add", "remove", "query")) { 19 action <- match.arg(action[1], choices = c("add", "remove", "query", "status")) 20 21 if (getRversion() < "4.0.0") { 22 warning("register_global_progression_handler() requires R (>= 4.0.0)") 23 return(invisible(FALSE)) 24 } 25 26 ## All existing handlers 27 handlers <- globalCallingHandlers() 28 29 exists <- vapply(handlers, FUN = identical, global_progression_handler, FUN.VALUE = FALSE) 30 if (sum(exists) > 1L) { 31 warning("Detected more than one registered 'global_progression_handler'. Did you register it manually?") 32 } 33 34 if (action == "add") { 35 if (!any(exists)) { 36 globalCallingHandlers(condition = global_progression_handler) 37 } 38 invisible(TRUE) 39 } else if (action == "remove") { 40 global_progression_handler(control_progression("shutdown")) 41 handlers <- handlers[!exists] 42 ## Remove all 43 globalCallingHandlers(NULL) 44 ## Add back the ones we didn't drop 45 globalCallingHandlers(handlers) 46 invisible(FALSE) 47 } else if (action == "query") { 48 any(exists) 49 } else if (action == "status") { 50 global_progression_handler(control_progression("status")) 51 } 52} 53 54 55#' A Global Calling Handler For 'progression':s 56#' 57#' @param progression A [progression] conditions. 58#' 59#' @return Nothing. 60#' 61#' @section Requirements: 62#' This function requires R (>= 4.0.0) - the version in which global calling 63#' handlers where introduces. 64#' 65#' @keywords internal 66global_progression_handler <- local({ 67 current_progressor_uuid <- NULL 68 calling_handler <- NULL 69 delays <- NULL 70 stdout_file <- NULL 71 capture_conditions <- NA 72 conditions <- list() 73 genv <- globalenv() 74 75 update_calling_handler <- function() { 76 handlers <- handlers() 77 # Nothing to do? 78 if (length(handlers) == 0L) return(NULL) 79 80 handlers <- as_progression_handler(handlers) 81 82 # Nothing to do? 83 if (length(handlers) == 0L) return(NULL) 84 85 ## Do we need to buffer? 86 delays <<- use_delays(handlers) 87 88 calling_handler <<- make_calling_handler(handlers) 89 } 90 91 interrupt_calling_handler <- function(progression = control_progression("interrupt"), debug = FALSE) { 92 if (is.null(calling_handler)) return() 93 94 ## Don't capture conditions that are produced by progression handlers 95 capture_conditions <<- FALSE 96 on.exit(capture_conditions <<- TRUE) 97 98 ## Any buffered output to flush? 99 if (isTRUE(attr(delays$terminal, "flush"))) { 100 if (length(conditions) > 0L || has_buffered_stdout(stdout_file)) { 101 calling_handler(control_progression("hide")) 102 stdout_file <<- flush_stdout(stdout_file, close = FALSE) 103 conditions <<- flush_conditions(conditions) 104 } 105 } 106 107 calling_handler(progression) 108 } 109 110 finish <- function(progression = control_progression("shutdown"), debug = FALSE) { 111 finished <- FALSE 112 113 ## Is progress handler active? 114 if (!is.null(current_progressor_uuid)) { 115 if (debug) message(" - shutdown progression handlers") 116 if (!is.null(calling_handler)) { 117 stdout_file <<- delay_stdout(delays, stdout_file = stdout_file) 118 finished <- calling_handler(progression) 119 ## Note that we might not be able to close 'stdout_file' due 120 ## to blocking, non-balanced sinks 121 stdout_file <<- flush_stdout(stdout_file, close = TRUE, must_work = FALSE) 122 conditions <<- flush_conditions(conditions) 123 delays <<- NULL 124 if (debug) message(" - finished: ", finished) 125 } else { 126 finished <- TRUE 127 } 128 } else { 129 if (debug) message(" - no active global progression handler") 130 } 131 132 ## Note that we might not have been able to close 'stdout_file' in previous 133 ## calls to finish() due to blocking, non-balanced sinks. Try again here, 134 ## just in case 135 if (!is.null(stdout_file)) { 136 stdout_file <<- flush_stdout(stdout_file, close = TRUE, must_work = FALSE) 137 } 138 139 current_progressor_uuid <<- NULL 140 calling_handler <<- NULL 141 capture_conditions <<- NA 142 finished <- TRUE 143 stop_if_not(length(conditions) == 0L, is.null(delays), isTRUE(finished), is.na(capture_conditions)) 144 145 finished 146 } 147 148 handle_progression <- function(progression, debug = getOption("progressr.global.debug", FALSE)) { 149 ## To please R CMD check 150 calling_handler <- NULL; rm(list = "calling_handler") 151 152 ## Don't capture conditions that are produced by progression handlers 153 last_capture_conditions <- capture_conditions 154 capture_conditions <<- FALSE 155 on.exit({ 156 if (is.null(current_progressor_uuid)) { 157 capture_conditions <<- NA 158 } else if (!is.na(capture_conditions)) { 159 capture_conditions <<- TRUE 160 } 161 }) 162 163 stop_if_not(inherits(progression, "progression")) 164 165 assign(".Last.progression", value = progression, envir = genv, inherits = FALSE) 166 167 if (debug) message(sprintf("*** Caught a %s condition:", sQuote(class(progression)[1]))) 168 progressor_uuid <- progression[["progressor_uuid"]] 169 if (debug) message(" - source: ", progressor_uuid) 170 171 ## Listen to this progressor? 172 if (!is.null(current_progressor_uuid) && 173 !identical(progressor_uuid, current_progressor_uuid)) { 174 if (debug) message(" - action: ignoring, already listening to another") 175 return() 176 } 177 178 179 if (!is.null(calling_handler) && !is.null(delays)) { 180 ## Any buffered output to flush? 181 if (isTRUE(attr(delays$terminal, "flush"))) { 182 if (length(conditions) > 0L || has_buffered_stdout(stdout_file)) { 183 calling_handler(control_progression("hide")) 184 stdout_file <<- flush_stdout(stdout_file, close = FALSE) 185 stop_if_not(inherits(stdout_file, "connection")) 186 conditions <<- flush_conditions(conditions) 187 calling_handler(control_progression("unhide")) 188 } 189 } 190 } 191 192 type <- progression[["type"]] 193 if (debug) message(" - type: ", type) 194 195 if (type == "initiate") { 196 if (identical(progressor_uuid, current_progressor_uuid)) { 197 stop(sprintf("INTERNAL ERROR: Already listening to this progressor which just sent another %s request", sQuote(type))) 198 } 199 if (debug) message(" - start listening") 200# finished <- finish(debug = debug) 201# stop_if_not(is.null(stdout_file), length(conditions) == 0L) 202 current_progressor_uuid <<- progressor_uuid 203 if (debug) message(" - reset progression handlers") 204 update_calling_handler() 205 if (!is.null(calling_handler)) { 206 stdout_file <<- delay_stdout(delays, stdout_file = stdout_file) 207 calling_handler(control_progression("reset")) 208 if (debug) message(" - initiate progression handlers") 209 finished <- calling_handler(progression) 210 if (debug) message(" - finished: ", finished) 211 if (finished) { 212 finished <- finish(debug = debug) 213 stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished)) 214 } 215 } 216 } else if (type == "update") { 217 if (is.null(current_progressor_uuid)) { 218 ## We might receive zero-amount progress updates after the fact that the 219 ## progress has been completed 220 amount <- progression$amount 221 if (!is.numeric(amount) || amount > 0) { 222 warning(sprintf("[progressr]: Received a progression %s request (amount=%g; msg=%s) but is not listening to this progressor. This can happen when code signals more progress updates than it configured the progressor to do. When the progressor completes all steps, it shuts down resulting in the global progression handler to no longer listen to it", sQuote(type), amount, sQuote(conditionMessage(progression)))) 223 } 224 return() 225 } 226 227 if (debug) message(" - update progression handlers") 228 if (!is.null(calling_handler)) { 229 stdout_file <<- delay_stdout(delays, stdout_file = stdout_file) 230 finished <- calling_handler(progression) 231 if (debug) message(" - finished: ", finished) 232 if (finished) { 233 calling_handler(control_progression("shutdown")) 234 finished <- finish(debug = debug) 235 stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished)) 236 } 237 } 238 } else if (type == "finish") { 239 finished <- finish(debug = debug) 240 stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished)) 241 } else if (type == "status") { 242 status <- list( 243 current_progressor_uuid = current_progressor_uuid, 244 calling_handler = calling_handler, 245 delays = delays, 246 stdout_file = stdout_file, 247 capture_conditions = last_capture_conditions, 248 conditions = conditions 249 ) 250 if (debug) message(" - done") 251 return(status) 252 } 253 254 if (debug) message(" - done") 255 } ## handle_progression() 256 257 258 function(condition) { 259 debug <- getOption("progressr.global.debug", FALSE) 260 261 ## Shut down progression handling? 262 if (inherits(condition, c("interrupt", "error"))) { 263 if (inherits(condition, "interrupt") && 264 isTRUE(getOption("progressr.interrupts", TRUE))) { 265 suspendInterrupts({ 266 interrupt_calling_handler(debug = debug) 267 }) 268 } 269 270 suspendInterrupts({ 271 progression <- control_progression("shutdown") 272 finished <- finish(debug = debug) 273 stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished)) 274 }) 275 return() 276 } 277 278 ## A 'progression' update? 279 if (inherits(condition, "progression")) { 280 suspendInterrupts({ 281 res <- handle_progression(condition, debug = debug) 282 }) 283 return(res) 284 } 285 286 ## Nothing do to? 287 if (is.na(capture_conditions) || !isTRUE(capture_conditions)) return() 288 289 ## Nothing do to? 290 if (is.null(delays) || !inherits(condition, delays$conditions)) return() 291 292 ## Record non-progression condition to be flushed later 293 conditions[[length(conditions) + 1L]] <<- condition 294 295 ## Muffle it for now 296 if (inherits(condition, "message")) { 297 invokeRestart("muffleMessage") 298 } else if (inherits(condition, "warning")) { 299 invokeRestart("muffleWarning") 300 } else if (inherits(condition, "condition")) { 301 ## If there is a "muffle" restart for this condition, 302 ## then invoke that restart, i.e. "muffle" the condition 303 restarts <- computeRestarts(condition) 304 for (restart in restarts) { 305 name <- restart$name 306 if (is.null(name)) next 307 if (!grepl("^muffle", name)) next 308 invokeRestart(restart) 309 break 310 } 311 } 312 } 313}) ## global_progression_handler() 314 315 316 317if (getRversion() < "4.0.0") { 318 globalCallingHandlers <- function(...) { 319 stop("register_global_progression_handler() requires R (>= 4.0.0)") 320 } 321} 322 323 324 325buffer_stdout <- function() { 326 stdout_file <- rawConnection(raw(0L), open = "w") 327 sink(stdout_file, type = "output", split = FALSE) 328 attr(stdout_file, "sink_index") <- sink.number(type = "output") 329 stdout_file 330} ## buffer_stdout() 331 332flush_stdout <- function(stdout_file, close = TRUE, must_work = FALSE) { 333 if (is.null(stdout_file)) return(NULL) 334 335 ## Can we close the sink we opened? 336 ## It could be that a progressor completes while there is a surrounding 337 ## sink active, e.g. an active capture.output(), or when signalled within 338 ## a sequential future. Because of this, we might not be able to flush 339 ## close the sink here. 340 sink_index <- attr(stdout_file, "sink_index") 341 if (sink_index != sink.number("output")) { 342 if (must_work) { 343 stop(sprintf("[progressr] Cannot flush stdout because the current sink index (%d) is out of sync with the sink we want to close (%d)", sink.number("output"), sink_index)) 344 } 345 return(stdout_file) 346 } 347 348 sink(split = FALSE, type = "output") 349 stdout <- rawToChar(rawConnectionValue(stdout_file)) 350 if (length(stdout) > 0) cat(stdout, file = stdout()) 351 close(stdout_file) 352 stdout_file <- NULL 353 if (!close) stdout_file <- buffer_stdout() 354 stdout_file 355} ## flush_stdout() 356 357has_buffered_stdout <- function(stdout_file) { 358 !is.null(stdout_file) && (length(rawConnectionValue(stdout_file)) > 0L) 359} 360 361flush_conditions <- function(conditions) { 362 for (c in conditions) { 363 if (inherits(c, "message")) { 364 message(c) 365 } else if (inherits(c, "warning")) { 366 warning(c) 367 } else if (inherits(c, "condition")) { 368 signalCondition(c) 369 } 370 } 371 list() 372} ## flush_conditions() 373 374 375 376as_progression_handler <- function(handlers, drop = TRUE) { 377 ## FIXME(?) 378 if (!is.list(handlers)) handlers <- list(handlers) 379 380 for (kk in seq_along(handlers)) { 381 handler <- handlers[[kk]] 382 stop_if_not(is.function(handler)) 383 if (!inherits(handler, "progression_handler")) { 384 handler <- handler() 385 stop_if_not(is.function(handler), 386 inherits(handler, "progression_handler")) 387 handlers[[kk]] <- handler 388 } 389 } 390 391 ## Keep only enabled handlers? 392 if (drop) { 393 enabled <- vapply(handlers, FUN = function(h) { 394 env <- environment(h) 395 value <- env$enable 396 isTRUE(value) || is.null(value) 397 }, FUN.VALUE = TRUE) 398 handlers <- handlers[enabled] 399 } 400 401 handlers 402} 403