1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 * vim: set sw=2 ts=8 et tw=80 :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7
8 #ifndef mozilla_net_ChannelEventQueue_h
9 #define mozilla_net_ChannelEventQueue_h
10
11 #include "nsTArray.h"
12 #include "nsIEventTarget.h"
13 #include "nsThreadUtils.h"
14 #include "nsXULAppAPI.h"
15 #include "mozilla/DebugOnly.h"
16 #include "mozilla/Mutex.h"
17 #include "mozilla/RecursiveMutex.h"
18 #include "mozilla/UniquePtr.h"
19 #include "mozilla/Unused.h"
20
21 class nsISupports;
22
23 namespace mozilla {
24 namespace net {
25
26 class ChannelEvent {
27 public:
28 MOZ_COUNTED_DEFAULT_CTOR(ChannelEvent)
29 MOZ_COUNTED_DTOR_VIRTUAL(ChannelEvent) virtual void Run() = 0;
30 virtual already_AddRefed<nsIEventTarget> GetEventTarget() = 0;
31 };
32
33 // Note that MainThreadChannelEvent should not be used in child process since
34 // GetEventTarget() directly returns an unlabeled event target.
35 class MainThreadChannelEvent : public ChannelEvent {
36 public:
MOZ_COUNTED_DEFAULT_CTOR(MainThreadChannelEvent)37 MOZ_COUNTED_DEFAULT_CTOR(MainThreadChannelEvent)
38 MOZ_COUNTED_DTOR_OVERRIDE(MainThreadChannelEvent)
39
40 already_AddRefed<nsIEventTarget> GetEventTarget() override {
41 MOZ_ASSERT(XRE_IsParentProcess());
42
43 return do_AddRef(GetMainThreadEventTarget());
44 }
45 };
46
47 class ChannelFunctionEvent : public ChannelEvent {
48 public:
ChannelFunctionEvent(std::function<already_AddRefed<nsIEventTarget> ()> && aGetEventTarget,std::function<void ()> && aCallback)49 ChannelFunctionEvent(
50 std::function<already_AddRefed<nsIEventTarget>()>&& aGetEventTarget,
51 std::function<void()>&& aCallback)
52 : mGetEventTarget(std::move(aGetEventTarget)),
53 mCallback(std::move(aCallback)) {}
54
Run()55 void Run() override { mCallback(); }
GetEventTarget()56 already_AddRefed<nsIEventTarget> GetEventTarget() override {
57 return mGetEventTarget();
58 }
59
60 private:
61 const std::function<already_AddRefed<nsIEventTarget>()> mGetEventTarget;
62 const std::function<void()> mCallback;
63 };
64
65 // UnsafePtr is a work-around our static analyzer that requires all
66 // ref-counted objects to be captured in lambda via a RefPtr
67 // The ChannelEventQueue makes it safe to capture "this" by pointer only.
68 // This is required as work-around to prevent cycles until bug 1596295
69 // is resolved.
70 template <typename T>
71 class UnsafePtr {
72 public:
UnsafePtr(T * aPtr)73 explicit UnsafePtr(T* aPtr) : mPtr(aPtr) {}
74
75 T& operator*() const { return *mPtr; }
76 T* operator->() const {
77 MOZ_ASSERT(mPtr, "dereferencing a null pointer");
78 return mPtr;
79 }
80 operator T*() const& { return mPtr; }
81 explicit operator bool() const { return mPtr != nullptr; }
82
83 private:
84 T* const mPtr;
85 };
86
87 class NeckoTargetChannelFunctionEvent : public ChannelFunctionEvent {
88 public:
89 template <typename T>
NeckoTargetChannelFunctionEvent(T * aChild,std::function<void ()> && aCallback)90 NeckoTargetChannelFunctionEvent(T* aChild, std::function<void()>&& aCallback)
91 : ChannelFunctionEvent(
92 [child = UnsafePtr<T>(aChild)]() {
93 MOZ_ASSERT(child);
94 return child->GetNeckoTarget();
95 },
96 std::move(aCallback)) {}
97 };
98
99 // Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a
100 // queue if still dispatching previous one(s) to listeners/observers.
101 // Otherwise synchronous XMLHttpRequests and/or other code that spins the
102 // event loop (ex: IPDL rpc) could cause listener->OnDataAvailable (for
103 // instance) to be dispatched and called before mListener->OnStartRequest has
104 // completed.
105
106 class ChannelEventQueue final {
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue)107 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue)
108
109 public:
110 explicit ChannelEventQueue(nsISupports* owner)
111 : mSuspendCount(0),
112 mSuspended(false),
113 mForcedCount(0),
114 mFlushing(false),
115 mHasCheckedForXMLHttpRequest(false),
116 mForXMLHttpRequest(false),
117 mOwner(owner),
118 mMutex("ChannelEventQueue::mMutex"),
119 mRunningMutex("ChannelEventQueue::mRunningMutex") {}
120
121 // Puts IPDL-generated channel event into queue, to be run later
122 // automatically when EndForcedQueueing and/or Resume is called.
123 //
124 // @param aCallback - the ChannelEvent
125 // @param aAssertionWhenNotQueued - this optional param will be used in an
126 // assertion when the event is executed directly.
127 inline void RunOrEnqueue(ChannelEvent* aCallback,
128 bool aAssertionWhenNotQueued = false);
129
130 // Append ChannelEvent in front of the event queue.
131 inline void PrependEvent(UniquePtr<ChannelEvent>&& aEvent);
132 inline void PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents);
133
134 // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing
135 // events that will be run/flushed when EndForcedQueueing is called.
136 // - Note: queueing may still be required after EndForcedQueueing() (if the
137 // queue is suspended, etc): always call RunOrEnqueue() to avoid race
138 // conditions.
139 inline void StartForcedQueueing();
140 inline void EndForcedQueueing();
141
142 // Suspend/resume event queue. RunOrEnqueue() will start enqueuing
143 // events and they will be run/flushed when resume is called. These should be
144 // called when the channel owning the event queue is suspended/resumed.
145 void Suspend();
146 // Resume flushes the queue asynchronously, i.e. items in queue will be
147 // dispatched in a new event on the current thread.
148 void Resume();
149
NotifyReleasingOwner()150 void NotifyReleasingOwner() {
151 MutexAutoLock lock(mMutex);
152 mOwner = nullptr;
153 }
154
155 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
IsEmpty()156 bool IsEmpty() const { return mEventQueue.IsEmpty(); }
157 #endif
158
159 private:
160 // Private destructor, to discourage deletion outside of Release():
161 ~ChannelEventQueue() = default;
162
163 void SuspendInternal();
164 void ResumeInternal();
165
166 bool MaybeSuspendIfEventsAreSuppressed();
167
168 inline void MaybeFlushQueue();
169 void FlushQueue();
170 inline void CompleteResume();
171
172 ChannelEvent* TakeEvent();
173
174 nsTArray<UniquePtr<ChannelEvent>> mEventQueue;
175
176 uint32_t mSuspendCount;
177 bool mSuspended;
178 uint32_t mForcedCount; // Support ForcedQueueing on multiple thread.
179 bool mFlushing;
180
181 // Whether the queue is associated with an XHR. This is lazily instantiated
182 // the first time it is needed.
183 bool mHasCheckedForXMLHttpRequest;
184 bool mForXMLHttpRequest;
185
186 // Keep ptr to avoid refcount cycle: only grab ref during flushing.
187 nsISupports* mOwner;
188
189 // For atomic mEventQueue operation and state update
190 Mutex mMutex;
191
192 // To guarantee event execution order among threads
193 RecursiveMutex mRunningMutex;
194
195 friend class AutoEventEnqueuer;
196 };
197
RunOrEnqueue(ChannelEvent * aCallback,bool aAssertionWhenNotQueued)198 inline void ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback,
199 bool aAssertionWhenNotQueued) {
200 MOZ_ASSERT(aCallback);
201
202 // Events execution could be a destruction of the channel (and our own
203 // destructor) unless we make sure its refcount doesn't drop to 0 while this
204 // method is running.
205 nsCOMPtr<nsISupports> kungFuDeathGrip(mOwner);
206 Unused << kungFuDeathGrip; // Not used in this function
207
208 // To avoid leaks.
209 UniquePtr<ChannelEvent> event(aCallback);
210
211 // To guarantee that the running event and all the events generated within
212 // it will be finished before events on other threads.
213 RecursiveMutexAutoLock lock(mRunningMutex);
214
215 {
216 MutexAutoLock lock(mMutex);
217
218 bool enqueue = !!mForcedCount || mSuspended || mFlushing ||
219 !mEventQueue.IsEmpty() ||
220 MaybeSuspendIfEventsAreSuppressed();
221
222 if (enqueue) {
223 mEventQueue.AppendElement(std::move(event));
224 return;
225 }
226
227 nsCOMPtr<nsIEventTarget> target = event->GetEventTarget();
228 MOZ_ASSERT(target);
229
230 bool isCurrentThread = false;
231 DebugOnly<nsresult> rv = target->IsOnCurrentThread(&isCurrentThread);
232 MOZ_ASSERT(NS_SUCCEEDED(rv));
233
234 if (!isCurrentThread) {
235 // Leverage Suspend/Resume mechanism to trigger flush procedure without
236 // creating a new one.
237 SuspendInternal();
238 mEventQueue.AppendElement(std::move(event));
239 ResumeInternal();
240 return;
241 }
242 }
243
244 MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued);
245 event->Run();
246 }
247
StartForcedQueueing()248 inline void ChannelEventQueue::StartForcedQueueing() {
249 MutexAutoLock lock(mMutex);
250 ++mForcedCount;
251 }
252
EndForcedQueueing()253 inline void ChannelEventQueue::EndForcedQueueing() {
254 bool tryFlush = false;
255 {
256 MutexAutoLock lock(mMutex);
257 MOZ_ASSERT(mForcedCount > 0);
258 if (!--mForcedCount) {
259 tryFlush = true;
260 }
261 }
262
263 if (tryFlush) {
264 MaybeFlushQueue();
265 }
266 }
267
PrependEvent(UniquePtr<ChannelEvent> && aEvent)268 inline void ChannelEventQueue::PrependEvent(UniquePtr<ChannelEvent>&& aEvent) {
269 MutexAutoLock lock(mMutex);
270
271 // Prepending event while no queue flush foreseen might cause the following
272 // channel events not run. This assertion here guarantee there must be a
273 // queue flush, either triggered by Resume or EndForcedQueueing, to execute
274 // the added event.
275 MOZ_ASSERT(mSuspended || !!mForcedCount);
276
277 mEventQueue.InsertElementAt(0, std::move(aEvent));
278 }
279
PrependEvents(nsTArray<UniquePtr<ChannelEvent>> & aEvents)280 inline void ChannelEventQueue::PrependEvents(
281 nsTArray<UniquePtr<ChannelEvent>>& aEvents) {
282 MutexAutoLock lock(mMutex);
283
284 // Prepending event while no queue flush foreseen might cause the following
285 // channel events not run. This assertion here guarantee there must be a
286 // queue flush, either triggered by Resume or EndForcedQueueing, to execute
287 // the added events.
288 MOZ_ASSERT(mSuspended || !!mForcedCount);
289
290 mEventQueue.InsertElementsAt(0, aEvents.Length());
291
292 for (uint32_t i = 0; i < aEvents.Length(); i++) {
293 mEventQueue[i] = std::move(aEvents[i]);
294 }
295 }
296
CompleteResume()297 inline void ChannelEventQueue::CompleteResume() {
298 bool tryFlush = false;
299 {
300 MutexAutoLock lock(mMutex);
301
302 // channel may have been suspended again since Resume fired event to call
303 // this.
304 if (!mSuspendCount) {
305 // we need to remain logically suspended (for purposes of queuing incoming
306 // messages) until this point, else new incoming messages could run before
307 // queued ones.
308 mSuspended = false;
309 tryFlush = true;
310 }
311 }
312
313 if (tryFlush) {
314 MaybeFlushQueue();
315 }
316 }
317
MaybeFlushQueue()318 inline void ChannelEventQueue::MaybeFlushQueue() {
319 // Don't flush if forced queuing on, we're already being flushed, or
320 // suspended, or there's nothing to flush
321 bool flushQueue = false;
322
323 {
324 MutexAutoLock lock(mMutex);
325 flushQueue = !mForcedCount && !mFlushing && !mSuspended &&
326 !mEventQueue.IsEmpty() && !MaybeSuspendIfEventsAreSuppressed();
327
328 // Only one thread is allowed to run FlushQueue at a time.
329 if (flushQueue) {
330 mFlushing = true;
331 }
332 }
333
334 if (flushQueue) {
335 FlushQueue();
336 }
337 }
338
339 // Ensures that RunOrEnqueue() will be collecting events during its lifetime
340 // (letting caller know incoming IPDL msgs should be queued). Flushes the queue
341 // when it goes out of scope.
342 class MOZ_STACK_CLASS AutoEventEnqueuer {
343 public:
AutoEventEnqueuer(ChannelEventQueue * queue)344 explicit AutoEventEnqueuer(ChannelEventQueue* queue)
345 : mEventQueue(queue), mOwner(queue->mOwner) {
346 mEventQueue->StartForcedQueueing();
347 }
~AutoEventEnqueuer()348 ~AutoEventEnqueuer() { mEventQueue->EndForcedQueueing(); }
349
350 private:
351 RefPtr<ChannelEventQueue> mEventQueue;
352 // Ensure channel object lives longer than ChannelEventQueue.
353 nsCOMPtr<nsISupports> mOwner;
354 };
355
356 } // namespace net
357 } // namespace mozilla
358
359 #endif
360