1#' Add or Remove a Global 'progression' Handler
2#'
3#' @param action (character string)
4#' If `"add"`, a global handler is added.
5#' If `"remove"`, it is removed, if it exists.
6#' If `"query"`, checks whether a handler is registered or not.
7#'
8#' @return Returns TRUE if a handler is registered, otherwise FALSE.
9#' If `action = "query"`, the value is visible, otherwise invisible.
10#'
11#' @section Requirements:
12#' This function requires R (>= 4.0.0) - the version in which global calling
13#' handlers where introduces.
14#'
15#' @example incl/register_global_progression_handler.R
16#'
17#' @keywords internal
18register_global_progression_handler <- function(action = c("add", "remove", "query")) {
19  action <- match.arg(action[1], choices = c("add", "remove", "query", "status"))
20
21  if (getRversion() < "4.0.0") {
22    warning("register_global_progression_handler() requires R (>= 4.0.0)")
23    return(invisible(FALSE))
24  }
25
26  ## All existing handlers
27  handlers <- globalCallingHandlers()
28
29  exists <- vapply(handlers, FUN = identical, global_progression_handler, FUN.VALUE = FALSE)
30  if (sum(exists) > 1L) {
31    warning("Detected more than one registered 'global_progression_handler'. Did you register it manually?")
32  }
33
34  if (action == "add") {
35    if (!any(exists)) {
36      globalCallingHandlers(condition = global_progression_handler)
37    }
38    invisible(TRUE)
39  } else if (action == "remove") {
40    global_progression_handler(control_progression("shutdown"))
41    handlers <- handlers[!exists]
42    ## Remove all
43    globalCallingHandlers(NULL)
44    ## Add back the ones we didn't drop
45    globalCallingHandlers(handlers)
46    invisible(FALSE)
47  } else if (action == "query") {
48    any(exists)
49  } else if (action == "status") {
50    global_progression_handler(control_progression("status"))
51  }
52}
53
54
55#' A Global Calling Handler For 'progression':s
56#'
57#' @param progression A [progression] conditions.
58#'
59#' @return Nothing.
60#'
61#' @section Requirements:
62#' This function requires R (>= 4.0.0) - the version in which global calling
63#' handlers where introduces.
64#'
65#' @keywords internal
66global_progression_handler <- local({
67  current_progressor_uuid <- NULL
68  calling_handler <- NULL
69  delays <- NULL
70  stdout_file <- NULL
71  capture_conditions <- NA
72  conditions <- list()
73  genv <- globalenv()
74
75  update_calling_handler <- function() {
76    handlers <- handlers()
77    # Nothing to do?
78    if (length(handlers) == 0L) return(NULL)
79
80    handlers <- as_progression_handler(handlers)
81
82    # Nothing to do?
83    if (length(handlers) == 0L) return(NULL)
84
85    ## Do we need to buffer?
86    delays <<- use_delays(handlers)
87
88    calling_handler <<- make_calling_handler(handlers)
89  }
90
91  interrupt_calling_handler <- function(progression = control_progression("interrupt"), debug = FALSE) {
92    if (is.null(calling_handler)) return()
93
94    ## Don't capture conditions that are produced by progression handlers
95    capture_conditions <<- FALSE
96    on.exit(capture_conditions <<- TRUE)
97
98    ## Any buffered output to flush?
99    if (isTRUE(attr(delays$terminal, "flush"))) {
100      if (length(conditions) > 0L || has_buffered_stdout(stdout_file)) {
101        calling_handler(control_progression("hide"))
102        stdout_file <<- flush_stdout(stdout_file, close = FALSE)
103        conditions <<- flush_conditions(conditions)
104      }
105    }
106
107    calling_handler(progression)
108  }
109
110  finish <- function(progression = control_progression("shutdown"), debug = FALSE) {
111    finished <- FALSE
112
113    ## Is progress handler active?
114    if (!is.null(current_progressor_uuid)) {
115      if (debug) message(" - shutdown progression handlers")
116      if (!is.null(calling_handler)) {
117        stdout_file <<- delay_stdout(delays, stdout_file = stdout_file)
118        finished <- calling_handler(progression)
119        ## Note that we might not be able to close 'stdout_file' due
120        ## to blocking, non-balanced sinks
121        stdout_file <<- flush_stdout(stdout_file, close = TRUE, must_work = FALSE)
122        conditions <<- flush_conditions(conditions)
123        delays <<- NULL
124        if (debug) message(" - finished: ", finished)
125      } else {
126        finished <- TRUE
127      }
128    } else {
129      if (debug) message(" - no active global progression handler")
130    }
131
132    ## Note that we might not have been able to close 'stdout_file' in previous
133    ## calls to finish() due to blocking, non-balanced sinks. Try again here,
134    ## just in case
135    if (!is.null(stdout_file)) {
136      stdout_file <<- flush_stdout(stdout_file, close = TRUE, must_work = FALSE)
137    }
138
139    current_progressor_uuid <<- NULL
140    calling_handler <<- NULL
141    capture_conditions <<- NA
142    finished <- TRUE
143    stop_if_not(length(conditions) == 0L, is.null(delays), isTRUE(finished), is.na(capture_conditions))
144
145    finished
146  }
147
148  handle_progression <- function(progression, debug = getOption("progressr.global.debug", FALSE)) {
149    ## To please R CMD check
150    calling_handler <- NULL; rm(list = "calling_handler")
151
152    ## Don't capture conditions that are produced by progression handlers
153    last_capture_conditions <- capture_conditions
154    capture_conditions <<- FALSE
155    on.exit({
156      if (is.null(current_progressor_uuid)) {
157        capture_conditions <<- NA
158      } else if (!is.na(capture_conditions)) {
159        capture_conditions <<- TRUE
160      }
161    })
162
163    stop_if_not(inherits(progression, "progression"))
164
165    assign(".Last.progression", value = progression, envir = genv, inherits = FALSE)
166
167    if (debug) message(sprintf("*** Caught a %s condition:", sQuote(class(progression)[1])))
168    progressor_uuid <- progression[["progressor_uuid"]]
169    if (debug) message(" - source: ", progressor_uuid)
170
171    ## Listen to this progressor?
172    if (!is.null(current_progressor_uuid) &&
173        !identical(progressor_uuid, current_progressor_uuid)) {
174      if (debug) message(" - action: ignoring, already listening to another")
175      return()
176    }
177
178
179    if (!is.null(calling_handler) && !is.null(delays)) {
180      ## Any buffered output to flush?
181      if (isTRUE(attr(delays$terminal, "flush"))) {
182        if (length(conditions) > 0L || has_buffered_stdout(stdout_file)) {
183          calling_handler(control_progression("hide"))
184          stdout_file <<- flush_stdout(stdout_file, close = FALSE)
185          stop_if_not(inherits(stdout_file, "connection"))
186          conditions <<- flush_conditions(conditions)
187          calling_handler(control_progression("unhide"))
188        }
189      }
190    }
191
192    type <- progression[["type"]]
193    if (debug) message(" - type: ", type)
194
195    if (type == "initiate") {
196      if (identical(progressor_uuid, current_progressor_uuid)) {
197        stop(sprintf("INTERNAL ERROR: Already listening to this progressor which just sent another %s request", sQuote(type)))
198      }
199      if (debug) message(" - start listening")
200#      finished <- finish(debug = debug)
201#      stop_if_not(is.null(stdout_file), length(conditions) == 0L)
202      current_progressor_uuid <<- progressor_uuid
203      if (debug) message(" - reset progression handlers")
204      update_calling_handler()
205      if (!is.null(calling_handler)) {
206        stdout_file <<- delay_stdout(delays, stdout_file = stdout_file)
207        calling_handler(control_progression("reset"))
208        if (debug) message(" - initiate progression handlers")
209        finished <- calling_handler(progression)
210        if (debug) message(" - finished: ", finished)
211        if (finished) {
212          finished <- finish(debug = debug)
213          stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished))
214        }
215      }
216    } else if (type == "update") {
217      if (is.null(current_progressor_uuid)) {
218        ## We might receive zero-amount progress updates after the fact that the
219        ## progress has been completed
220        amount <- progression$amount
221        if (!is.numeric(amount) || amount > 0) {
222          warning(sprintf("[progressr]: Received a progression %s request (amount=%g; msg=%s) but is not listening to this progressor. This can happen when code signals more progress updates than it configured the progressor to do. When the progressor completes all steps, it shuts down resulting in the global progression handler to no longer listen to it", sQuote(type), amount, sQuote(conditionMessage(progression))))
223        }
224        return()
225      }
226
227      if (debug) message(" - update progression handlers")
228      if (!is.null(calling_handler)) {
229        stdout_file <<- delay_stdout(delays, stdout_file = stdout_file)
230        finished <- calling_handler(progression)
231        if (debug) message(" - finished: ", finished)
232        if (finished) {
233          calling_handler(control_progression("shutdown"))
234          finished <- finish(debug = debug)
235          stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished))
236        }
237      }
238    } else if (type == "finish") {
239      finished <- finish(debug = debug)
240      stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished))
241    } else if (type == "status") {
242      status <- list(
243        current_progressor_uuid = current_progressor_uuid,
244        calling_handler = calling_handler,
245        delays = delays,
246        stdout_file = stdout_file,
247        capture_conditions = last_capture_conditions,
248        conditions = conditions
249      )
250      if (debug) message(" - done")
251      return(status)
252    }
253
254    if (debug) message(" - done")
255  } ## handle_progression()
256
257
258  function(condition) {
259    debug <- getOption("progressr.global.debug", FALSE)
260
261    ## Shut down progression handling?
262    if (inherits(condition, c("interrupt", "error"))) {
263      if (inherits(condition, "interrupt") &&
264          isTRUE(getOption("progressr.interrupts", TRUE))) {
265        suspendInterrupts({
266          interrupt_calling_handler(debug = debug)
267        })
268      }
269
270      suspendInterrupts({
271        progression <- control_progression("shutdown")
272        finished <- finish(debug = debug)
273        stop_if_not(length(conditions) == 0L, is.na(capture_conditions), isTRUE(finished))
274      })
275      return()
276    }
277
278    ## A 'progression' update?
279    if (inherits(condition, "progression")) {
280      suspendInterrupts({
281        res <- handle_progression(condition, debug = debug)
282      })
283      return(res)
284    }
285
286    ## Nothing do to?
287    if (is.na(capture_conditions) || !isTRUE(capture_conditions)) return()
288
289    ## Nothing do to?
290    if (is.null(delays) || !inherits(condition, delays$conditions)) return()
291
292    ## Record non-progression condition to be flushed later
293    conditions[[length(conditions) + 1L]] <<- condition
294
295    ## Muffle it for now
296    if (inherits(condition, "message")) {
297      invokeRestart("muffleMessage")
298    } else if (inherits(condition, "warning")) {
299      invokeRestart("muffleWarning")
300    } else if (inherits(condition, "condition")) {
301      ## If there is a "muffle" restart for this condition,
302      ## then invoke that restart, i.e. "muffle" the condition
303      restarts <- computeRestarts(condition)
304      for (restart in restarts) {
305        name <- restart$name
306        if (is.null(name)) next
307        if (!grepl("^muffle", name)) next
308        invokeRestart(restart)
309        break
310      }
311    }
312  }
313}) ## global_progression_handler()
314
315
316
317if (getRversion() < "4.0.0") {
318  globalCallingHandlers <- function(...) {
319    stop("register_global_progression_handler() requires R (>= 4.0.0)")
320  }
321}
322
323
324
325buffer_stdout <- function() {
326  stdout_file <- rawConnection(raw(0L), open = "w")
327  sink(stdout_file, type = "output", split = FALSE)
328  attr(stdout_file, "sink_index") <- sink.number(type = "output")
329  stdout_file
330} ## buffer_stdout()
331
332flush_stdout <- function(stdout_file, close = TRUE, must_work = FALSE) {
333  if (is.null(stdout_file)) return(NULL)
334
335  ## Can we close the sink we opened?
336  ## It could be that a progressor completes while there is a surrounding
337  ## sink active, e.g. an active capture.output(), or when signalled within
338  ## a sequential future.  Because of this, we might not be able to flush
339  ## close the sink here.
340  sink_index <- attr(stdout_file, "sink_index")
341  if (sink_index != sink.number("output")) {
342    if (must_work) {
343      stop(sprintf("[progressr] Cannot flush stdout because the current sink index (%d) is out of sync with the sink we want to close (%d)", sink.number("output"), sink_index))
344    }
345    return(stdout_file)
346  }
347
348  sink(split = FALSE, type = "output")
349  stdout <- rawToChar(rawConnectionValue(stdout_file))
350  if (length(stdout) > 0) cat(stdout, file = stdout())
351  close(stdout_file)
352  stdout_file <- NULL
353  if (!close) stdout_file <- buffer_stdout()
354  stdout_file
355} ## flush_stdout()
356
357has_buffered_stdout <- function(stdout_file) {
358  !is.null(stdout_file) && (length(rawConnectionValue(stdout_file)) > 0L)
359}
360
361flush_conditions <- function(conditions) {
362  for (c in conditions) {
363    if (inherits(c, "message")) {
364      message(c)
365    } else if (inherits(c, "warning")) {
366      warning(c)
367    } else if (inherits(c, "condition")) {
368      signalCondition(c)
369    }
370  }
371  list()
372} ## flush_conditions()
373
374
375
376as_progression_handler <- function(handlers, drop = TRUE) {
377  ## FIXME(?)
378  if (!is.list(handlers)) handlers <- list(handlers)
379
380  for (kk in seq_along(handlers)) {
381    handler <- handlers[[kk]]
382    stop_if_not(is.function(handler))
383    if (!inherits(handler, "progression_handler")) {
384      handler <- handler()
385      stop_if_not(is.function(handler),
386                  inherits(handler, "progression_handler"))
387      handlers[[kk]] <- handler
388    }
389  }
390
391  ## Keep only enabled handlers?
392  if (drop) {
393    enabled <- vapply(handlers, FUN = function(h) {
394      env <- environment(h)
395      value <- env$enable
396      isTRUE(value) || is.null(value)
397    }, FUN.VALUE = TRUE)
398    handlers <- handlers[enabled]
399  }
400
401  handlers
402}
403