1processId <- local({ 2 # pid is not sufficient to uniquely identify a process, because 3 # distributed futures span machines which could introduce pid 4 # collisions. 5 cached <- NULL 6 function() { 7 if (is.null(cached)) { 8 cached <<- rlang::hash(list( 9 Sys.info(), 10 Sys.time() 11 )) 12 } 13 # Sys.getpid() cannot be cached because forked children will 14 # then have the same processId as their parents. 15 paste(cached, Sys.getpid()) 16 } 17}) 18 19#' @include graph.R 20Context <- R6Class( 21 'Context', 22 portable = FALSE, 23 class = FALSE, 24 public = list( 25 id = character(0), 26 .reactId = character(0), 27 .reactType = "other", 28 .label = character(0), # For debug purposes 29 .invalidated = FALSE, 30 .invalidateCallbacks = list(), 31 .flushCallbacks = list(), 32 .domain = NULL, 33 .pid = NULL, 34 .weak = NULL, 35 36 initialize = function( 37 domain, label='', type='other', prevId='', 38 reactId = rLog$noReactId, 39 id = .getReactiveEnvironment()$nextId(), # For dummy context 40 weak = FALSE 41 ) { 42 id <<- id 43 .label <<- label 44 .domain <<- domain 45 .pid <<- processId() 46 .reactId <<- reactId 47 .reactType <<- type 48 .weak <<- weak 49 rLog$createContext(id, label, type, prevId, domain) 50 }, 51 run = function(func) { 52 "Run the provided function under this context." 53 54 promises::with_promise_domain(reactivePromiseDomain(), { 55 withReactiveDomain(.domain, { 56 env <- .getReactiveEnvironment() 57 rLog$enter(.reactId, id, .reactType, .domain) 58 on.exit(rLog$exit(.reactId, id, .reactType, .domain), add = TRUE) 59 env$runWith(self, func) 60 }) 61 }) 62 }, 63 invalidate = function() { 64 "Invalidate this context. It will immediately call the callbacks 65 that have been registered with onInvalidate()." 66 67 if (!identical(.pid, processId())) { 68 rlang::abort("Reactive context was created in one process and invalidated from another.") 69 } 70 71 if (.invalidated) 72 return() 73 .invalidated <<- TRUE 74 75 rLog$invalidateStart(.reactId, id, .reactType, .domain) 76 on.exit(rLog$invalidateEnd(.reactId, id, .reactType, .domain), add = TRUE) 77 78 lapply(.invalidateCallbacks, function(func) { 79 func() 80 }) 81 .invalidateCallbacks <<- list() 82 NULL 83 }, 84 onInvalidate = function(func) { 85 "Register a function to be called when this context is invalidated. 86 If this context is already invalidated, the function is called 87 immediately." 88 89 if (!identical(.pid, processId())) { 90 rlang::abort("Reactive context was created in one process and accessed from another.") 91 } 92 93 if (.invalidated) 94 func() 95 else 96 .invalidateCallbacks <<- c(.invalidateCallbacks, func) 97 NULL 98 }, 99 addPendingFlush = function(priority) { 100 "Tell the reactive environment that this context should be flushed the 101 next time flushReact() called." 102 .getReactiveEnvironment()$addPendingFlush(self, priority) 103 }, 104 onFlush = function(func) { 105 "Register a function to be called when this context is flushed." 106 .flushCallbacks <<- c(.flushCallbacks, func) 107 }, 108 executeFlushCallbacks = function() { 109 "For internal use only." 110 111 lapply(.flushCallbacks, function(flushCallback) { 112 flushCallback() 113 }) 114 }, 115 isWeak = function() { 116 .weak 117 } 118 ) 119) 120 121ReactiveEnvironment <- R6Class( 122 'ReactiveEnvironment', 123 portable = FALSE, 124 class = FALSE, 125 public = list( 126 .currentContext = NULL, 127 .nextId = 0L, 128 .pendingFlush = 'PriorityQueue', 129 .inFlush = FALSE, 130 131 initialize = function() { 132 .pendingFlush <<- PriorityQueue$new() 133 }, 134 nextId = function() { 135 .nextId <<- .nextId + 1L 136 return(as.character(.nextId)) 137 }, 138 currentContext = function() { 139 if (is.null(.currentContext)) { 140 if (isTRUE(getOption('shiny.suppressMissingContextError'))) { 141 return(getDummyContext()) 142 } else { 143 rlang::abort(c( 144 'Operation not allowed without an active reactive context.', 145 paste0( 146 'You tried to do something that can only be done from inside a ', 147 'reactive consumer.' 148 ) 149 )) 150 } 151 } 152 return(.currentContext) 153 }, 154 runWith = function(ctx, contextFunc) { 155 old.ctx <- .currentContext 156 .currentContext <<- ctx 157 on.exit(.currentContext <<- old.ctx) 158 contextFunc() 159 }, 160 addPendingFlush = function(ctx, priority) { 161 .pendingFlush$enqueue(ctx, priority) 162 }, 163 hasPendingFlush = function() { 164 return(!.pendingFlush$isEmpty()) 165 }, 166 # Returns TRUE if anything was actually called 167 flush = function() { 168 # If nothing to flush, exit early 169 if (!hasPendingFlush()) return(invisible(FALSE)) 170 # If already in a flush, don't start another one 171 if (.inFlush) return(invisible(FALSE)) 172 .inFlush <<- TRUE 173 on.exit({ 174 .inFlush <<- FALSE 175 rLog$idle(domain = NULL) 176 }) 177 178 while (hasPendingFlush()) { 179 ctx <- .pendingFlush$dequeue() 180 ctx$executeFlushCallbacks() 181 } 182 183 invisible(TRUE) 184 } 185 ) 186) 187 188.getReactiveEnvironment <- local({ 189 reactiveEnvironment <- NULL 190 function() { 191 if (is.null(reactiveEnvironment)) 192 reactiveEnvironment <<- ReactiveEnvironment$new() 193 return(reactiveEnvironment) 194 } 195}) 196 197# Causes any pending invalidations to run. Returns TRUE if any invalidations 198# were pending (i.e. if work was actually done). 199flushReact <- function() { 200 return(.getReactiveEnvironment()$flush()) 201} 202 203# Retrieves the current reactive context, or errors if there is no reactive 204# context active at the moment. 205getCurrentContext <- function() { 206 .getReactiveEnvironment()$currentContext() 207} 208hasCurrentContext <- function() { 209 !is.null(.getReactiveEnvironment()$.currentContext) || 210 isTRUE(getOption("shiny.suppressMissingContextError")) 211} 212 213getDummyContext <- function() { 214 Context$new( 215 getDefaultReactiveDomain(), '[none]', type = 'isolate', 216 id = "Dummy", reactId = rLog$dummyReactId 217 ) 218} 219 220wrapForContext <- function(func, ctx) { 221 force(func) 222 force(ctx) 223 224 function(...) { 225 ctx$run(function() { 226 captureStackTraces( 227 func(...) 228 ) 229 }) 230 } 231} 232 233reactivePromiseDomain <- function() { 234 promises::new_promise_domain( 235 wrapOnFulfilled = function(onFulfilled) { 236 force(onFulfilled) 237 ctx <- getCurrentContext() 238 wrapForContext(onFulfilled, ctx) 239 }, 240 wrapOnRejected = function(onRejected) { 241 force(onRejected) 242 ctx <- getCurrentContext() 243 wrapForContext(onRejected, ctx) 244 } 245 ) 246} 247