1processId <- local({
2  # pid is not sufficient to uniquely identify a process, because
3  # distributed futures span machines which could introduce pid
4  # collisions.
5  cached <- NULL
6  function() {
7    if (is.null(cached)) {
8      cached <<- rlang::hash(list(
9        Sys.info(),
10        Sys.time()
11      ))
12    }
13    # Sys.getpid() cannot be cached because forked children will
14    # then have the same processId as their parents.
15    paste(cached, Sys.getpid())
16  }
17})
18
19#' @include graph.R
20Context <- R6Class(
21  'Context',
22  portable = FALSE,
23  class = FALSE,
24  public = list(
25    id = character(0),
26    .reactId = character(0),
27    .reactType = "other",
28    .label = character(0),      # For debug purposes
29    .invalidated = FALSE,
30    .invalidateCallbacks = list(),
31    .flushCallbacks = list(),
32    .domain = NULL,
33    .pid = NULL,
34    .weak = NULL,
35
36    initialize = function(
37      domain, label='', type='other', prevId='',
38      reactId = rLog$noReactId,
39      id = .getReactiveEnvironment()$nextId(), # For dummy context
40      weak = FALSE
41    ) {
42      id <<- id
43      .label <<- label
44      .domain <<- domain
45      .pid <<- processId()
46      .reactId <<- reactId
47      .reactType <<- type
48      .weak <<- weak
49      rLog$createContext(id, label, type, prevId, domain)
50    },
51    run = function(func) {
52      "Run the provided function under this context."
53
54      promises::with_promise_domain(reactivePromiseDomain(), {
55        withReactiveDomain(.domain, {
56          env <- .getReactiveEnvironment()
57          rLog$enter(.reactId, id, .reactType, .domain)
58          on.exit(rLog$exit(.reactId, id, .reactType, .domain), add = TRUE)
59          env$runWith(self, func)
60        })
61      })
62    },
63    invalidate = function() {
64      "Invalidate this context. It will immediately call the callbacks
65        that have been registered with onInvalidate()."
66
67      if (!identical(.pid, processId())) {
68        rlang::abort("Reactive context was created in one process and invalidated from another.")
69      }
70
71      if (.invalidated)
72        return()
73      .invalidated <<- TRUE
74
75      rLog$invalidateStart(.reactId, id, .reactType, .domain)
76      on.exit(rLog$invalidateEnd(.reactId, id, .reactType, .domain), add = TRUE)
77
78      lapply(.invalidateCallbacks, function(func) {
79        func()
80      })
81      .invalidateCallbacks <<- list()
82      NULL
83    },
84    onInvalidate = function(func) {
85      "Register a function to be called when this context is invalidated.
86        If this context is already invalidated, the function is called
87        immediately."
88
89      if (!identical(.pid, processId())) {
90        rlang::abort("Reactive context was created in one process and accessed from another.")
91      }
92
93      if (.invalidated)
94        func()
95      else
96        .invalidateCallbacks <<- c(.invalidateCallbacks, func)
97      NULL
98    },
99    addPendingFlush = function(priority) {
100      "Tell the reactive environment that this context should be flushed the
101        next time flushReact() called."
102      .getReactiveEnvironment()$addPendingFlush(self, priority)
103    },
104    onFlush = function(func) {
105      "Register a function to be called when this context is flushed."
106      .flushCallbacks <<- c(.flushCallbacks, func)
107    },
108    executeFlushCallbacks = function() {
109      "For internal use only."
110
111      lapply(.flushCallbacks, function(flushCallback) {
112        flushCallback()
113      })
114    },
115    isWeak = function() {
116      .weak
117    }
118  )
119)
120
121ReactiveEnvironment <- R6Class(
122  'ReactiveEnvironment',
123  portable = FALSE,
124  class = FALSE,
125  public = list(
126    .currentContext = NULL,
127    .nextId = 0L,
128    .pendingFlush = 'PriorityQueue',
129    .inFlush = FALSE,
130
131    initialize = function() {
132      .pendingFlush <<- PriorityQueue$new()
133    },
134    nextId = function() {
135      .nextId <<- .nextId + 1L
136      return(as.character(.nextId))
137    },
138    currentContext = function() {
139      if (is.null(.currentContext)) {
140        if (isTRUE(getOption('shiny.suppressMissingContextError'))) {
141          return(getDummyContext())
142        } else {
143          rlang::abort(c(
144            'Operation not allowed without an active reactive context.',
145            paste0(
146              'You tried to do something that can only be done from inside a ',
147              'reactive consumer.'
148            )
149          ))
150        }
151      }
152      return(.currentContext)
153    },
154    runWith = function(ctx, contextFunc) {
155      old.ctx <- .currentContext
156      .currentContext <<- ctx
157      on.exit(.currentContext <<- old.ctx)
158      contextFunc()
159    },
160    addPendingFlush = function(ctx, priority) {
161      .pendingFlush$enqueue(ctx, priority)
162    },
163    hasPendingFlush = function() {
164      return(!.pendingFlush$isEmpty())
165    },
166    # Returns TRUE if anything was actually called
167    flush = function() {
168      # If nothing to flush, exit early
169      if (!hasPendingFlush()) return(invisible(FALSE))
170      # If already in a flush, don't start another one
171      if (.inFlush) return(invisible(FALSE))
172      .inFlush <<- TRUE
173      on.exit({
174        .inFlush <<- FALSE
175        rLog$idle(domain = NULL)
176      })
177
178      while (hasPendingFlush()) {
179        ctx <- .pendingFlush$dequeue()
180        ctx$executeFlushCallbacks()
181      }
182
183      invisible(TRUE)
184    }
185  )
186)
187
188.getReactiveEnvironment <- local({
189  reactiveEnvironment <- NULL
190  function() {
191    if (is.null(reactiveEnvironment))
192      reactiveEnvironment <<- ReactiveEnvironment$new()
193    return(reactiveEnvironment)
194  }
195})
196
197# Causes any pending invalidations to run. Returns TRUE if any invalidations
198# were pending (i.e. if work was actually done).
199flushReact <- function() {
200  return(.getReactiveEnvironment()$flush())
201}
202
203# Retrieves the current reactive context, or errors if there is no reactive
204# context active at the moment.
205getCurrentContext <- function() {
206  .getReactiveEnvironment()$currentContext()
207}
208hasCurrentContext <- function() {
209  !is.null(.getReactiveEnvironment()$.currentContext) ||
210    isTRUE(getOption("shiny.suppressMissingContextError"))
211}
212
213getDummyContext <- function() {
214  Context$new(
215    getDefaultReactiveDomain(), '[none]', type = 'isolate',
216    id = "Dummy", reactId = rLog$dummyReactId
217  )
218}
219
220wrapForContext <- function(func, ctx) {
221  force(func)
222  force(ctx)
223
224  function(...) {
225    ctx$run(function() {
226      captureStackTraces(
227        func(...)
228      )
229    })
230  }
231}
232
233reactivePromiseDomain <- function() {
234  promises::new_promise_domain(
235    wrapOnFulfilled = function(onFulfilled) {
236      force(onFulfilled)
237      ctx <- getCurrentContext()
238      wrapForContext(onFulfilled, ctx)
239    },
240    wrapOnRejected = function(onRejected) {
241      force(onRejected)
242      ctx <- getCurrentContext()
243      wrapForContext(onRejected, ctx)
244    }
245  )
246}
247