1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "IPCBlobInputStreamChild.h"
8 #include "IPCBlobInputStreamThread.h"
9
10 #include "mozilla/ipc/IPCStreamUtils.h"
11 #include "mozilla/dom/WorkerHolder.h"
12 #include "mozilla/dom/WorkerPrivate.h"
13 #include "mozilla/dom/WorkerRunnable.h"
14
15 namespace mozilla {
16 namespace dom {
17
18 namespace {
19
20 // This runnable is used in case the last stream is forgotten on the 'wrong'
21 // thread.
22 class ShutdownRunnable final : public CancelableRunnable {
23 public:
ShutdownRunnable(IPCBlobInputStreamChild * aActor)24 explicit ShutdownRunnable(IPCBlobInputStreamChild* aActor)
25 : CancelableRunnable("dom::ShutdownRunnable"), mActor(aActor) {}
26
27 NS_IMETHOD
Run()28 Run() override {
29 mActor->Shutdown();
30 return NS_OK;
31 }
32
33 private:
34 RefPtr<IPCBlobInputStreamChild> mActor;
35 };
36
37 // This runnable is used in case StreamNeeded() has been called on a non-owning
38 // thread.
39 class StreamNeededRunnable final : public CancelableRunnable {
40 public:
StreamNeededRunnable(IPCBlobInputStreamChild * aActor)41 explicit StreamNeededRunnable(IPCBlobInputStreamChild* aActor)
42 : CancelableRunnable("dom::StreamNeededRunnable"), mActor(aActor) {}
43
44 NS_IMETHOD
Run()45 Run() override {
46 MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
47 mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
48 if (mActor->State() == IPCBlobInputStreamChild::eActive) {
49 mActor->SendStreamNeeded();
50 }
51 return NS_OK;
52 }
53
54 private:
55 RefPtr<IPCBlobInputStreamChild> mActor;
56 };
57
58 // When the stream has been received from the parent, we inform the
59 // IPCBlobInputStream.
60 class StreamReadyRunnable final : public CancelableRunnable {
61 public:
StreamReadyRunnable(IPCBlobInputStream * aDestinationStream,already_AddRefed<nsIInputStream> aCreatedStream)62 StreamReadyRunnable(IPCBlobInputStream* aDestinationStream,
63 already_AddRefed<nsIInputStream> aCreatedStream)
64 : CancelableRunnable("dom::StreamReadyRunnable"),
65 mDestinationStream(aDestinationStream),
66 mCreatedStream(Move(aCreatedStream)) {
67 MOZ_ASSERT(mDestinationStream);
68 // mCreatedStream can be null.
69 }
70
71 NS_IMETHOD
Run()72 Run() override {
73 mDestinationStream->StreamReady(mCreatedStream.forget());
74 return NS_OK;
75 }
76
77 private:
78 RefPtr<IPCBlobInputStream> mDestinationStream;
79 nsCOMPtr<nsIInputStream> mCreatedStream;
80 };
81
82 class IPCBlobInputStreamWorkerHolder final : public WorkerHolder {
83 public:
IPCBlobInputStreamWorkerHolder()84 IPCBlobInputStreamWorkerHolder()
85 : WorkerHolder("IPCBlobInputStreamWorkerHolder") {}
86
Notify(WorkerStatus aStatus)87 bool Notify(WorkerStatus aStatus) override {
88 // We must keep the worker alive until the migration is completed.
89 return true;
90 }
91 };
92
93 class ReleaseWorkerHolderRunnable final : public CancelableRunnable {
94 public:
ReleaseWorkerHolderRunnable(UniquePtr<WorkerHolder> && aWorkerHolder)95 explicit ReleaseWorkerHolderRunnable(UniquePtr<WorkerHolder>&& aWorkerHolder)
96 : CancelableRunnable("dom::ReleaseWorkerHolderRunnable"),
97 mWorkerHolder(Move(aWorkerHolder)) {}
98
99 NS_IMETHOD
Run()100 Run() override {
101 mWorkerHolder = nullptr;
102 return NS_OK;
103 }
104
Cancel()105 nsresult Cancel() override { return Run(); }
106
107 private:
108 UniquePtr<WorkerHolder> mWorkerHolder;
109 };
110
111 } // namespace
112
IPCBlobInputStreamChild(const nsID & aID,uint64_t aSize)113 IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
114 uint64_t aSize)
115 : mMutex("IPCBlobInputStreamChild::mMutex"),
116 mID(aID),
117 mSize(aSize),
118 mState(eActive),
119 mOwningEventTarget(GetCurrentThreadSerialEventTarget()) {
120 // If we are running in a worker, we need to send a Close() to the parent side
121 // before the thread is released.
122 if (!NS_IsMainThread()) {
123 WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
124 if (workerPrivate) {
125 UniquePtr<WorkerHolder> workerHolder(
126 new IPCBlobInputStreamWorkerHolder());
127 if (workerHolder->HoldWorker(workerPrivate, Canceling)) {
128 mWorkerHolder.swap(workerHolder);
129 }
130 }
131 }
132 }
133
~IPCBlobInputStreamChild()134 IPCBlobInputStreamChild::~IPCBlobInputStreamChild() {}
135
Shutdown()136 void IPCBlobInputStreamChild::Shutdown() {
137 MutexAutoLock lock(mMutex);
138
139 RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
140
141 mWorkerHolder = nullptr;
142 mPendingOperations.Clear();
143
144 if (mState == eActive) {
145 SendClose();
146 mState = eInactive;
147 }
148 }
149
ActorDestroy(IProtocol::ActorDestroyReason aReason)150 void IPCBlobInputStreamChild::ActorDestroy(
151 IProtocol::ActorDestroyReason aReason) {
152 bool migrating = false;
153
154 {
155 MutexAutoLock lock(mMutex);
156 migrating = mState == eActiveMigrating;
157 mState = migrating ? eInactiveMigrating : eInactive;
158 }
159
160 if (migrating) {
161 // We were waiting for this! Now we can migrate the actor in the correct
162 // thread.
163 RefPtr<IPCBlobInputStreamThread> thread =
164 IPCBlobInputStreamThread::GetOrCreate();
165 MOZ_ASSERT(thread, "We cannot continue without DOMFile thread.");
166
167 ResetManager();
168 thread->MigrateActor(this);
169 return;
170 }
171
172 // Let's cleanup the workerHolder and the pending operation queue.
173 Shutdown();
174 }
175
State()176 IPCBlobInputStreamChild::ActorState IPCBlobInputStreamChild::State() {
177 MutexAutoLock lock(mMutex);
178 return mState;
179 }
180
CreateStream()181 already_AddRefed<IPCBlobInputStream> IPCBlobInputStreamChild::CreateStream() {
182 bool shouldMigrate = false;
183
184 RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);
185
186 {
187 MutexAutoLock lock(mMutex);
188
189 if (mState == eInactive) {
190 return nullptr;
191 }
192
193 // The stream is active but maybe it is not running in the DOM-File thread.
194 // We should migrate it there.
195 if (mState == eActive &&
196 !IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget)) {
197 MOZ_ASSERT(mStreams.IsEmpty());
198 shouldMigrate = true;
199 mState = eActiveMigrating;
200 }
201
202 mStreams.AppendElement(stream);
203 }
204
205 // Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
206 // time.
207 if (shouldMigrate) {
208 Send__delete__(this);
209 }
210
211 return stream.forget();
212 }
213
ForgetStream(IPCBlobInputStream * aStream)214 void IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream) {
215 MOZ_ASSERT(aStream);
216
217 RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
218
219 {
220 MutexAutoLock lock(mMutex);
221 mStreams.RemoveElement(aStream);
222
223 if (!mStreams.IsEmpty() || mState != eActive) {
224 return;
225 }
226 }
227
228 if (mOwningEventTarget->IsOnCurrentThread()) {
229 Shutdown();
230 return;
231 }
232
233 RefPtr<ShutdownRunnable> runnable = new ShutdownRunnable(this);
234 mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
235 }
236
StreamNeeded(IPCBlobInputStream * aStream,nsIEventTarget * aEventTarget)237 void IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
238 nsIEventTarget* aEventTarget) {
239 MutexAutoLock lock(mMutex);
240
241 if (mState == eInactive) {
242 return;
243 }
244
245 MOZ_ASSERT(mStreams.Contains(aStream));
246
247 PendingOperation* opt = mPendingOperations.AppendElement();
248 opt->mStream = aStream;
249 opt->mEventTarget = aEventTarget;
250
251 if (mState == eActiveMigrating || mState == eInactiveMigrating) {
252 // This operation will be continued when the migration is completed.
253 return;
254 }
255
256 MOZ_ASSERT(mState == eActive);
257
258 if (mOwningEventTarget->IsOnCurrentThread()) {
259 SendStreamNeeded();
260 return;
261 }
262
263 RefPtr<StreamNeededRunnable> runnable = new StreamNeededRunnable(this);
264 mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
265 }
266
RecvStreamReady(const OptionalIPCStream & aStream)267 mozilla::ipc::IPCResult IPCBlobInputStreamChild::RecvStreamReady(
268 const OptionalIPCStream& aStream) {
269 nsCOMPtr<nsIInputStream> stream = mozilla::ipc::DeserializeIPCStream(aStream);
270
271 RefPtr<IPCBlobInputStream> pendingStream;
272 nsCOMPtr<nsIEventTarget> eventTarget;
273
274 {
275 MutexAutoLock lock(mMutex);
276 MOZ_ASSERT(!mPendingOperations.IsEmpty());
277 MOZ_ASSERT(mState == eActive);
278
279 pendingStream = mPendingOperations[0].mStream;
280 eventTarget = mPendingOperations[0].mEventTarget;
281
282 mPendingOperations.RemoveElementAt(0);
283 }
284
285 RefPtr<StreamReadyRunnable> runnable =
286 new StreamReadyRunnable(pendingStream, stream.forget());
287
288 // If IPCBlobInputStream::AsyncWait() has been executed without passing an
289 // event target, we run the callback synchronous because any thread could be
290 // result to be the wrong one. See more in nsIAsyncInputStream::asyncWait
291 // documentation.
292 if (eventTarget) {
293 eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
294 } else {
295 runnable->Run();
296 }
297
298 return IPC_OK();
299 }
300
Migrated()301 void IPCBlobInputStreamChild::Migrated() {
302 MutexAutoLock lock(mMutex);
303 MOZ_ASSERT(mState == eInactiveMigrating);
304
305 if (mWorkerHolder) {
306 RefPtr<ReleaseWorkerHolderRunnable> runnable =
307 new ReleaseWorkerHolderRunnable(Move(mWorkerHolder));
308 mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
309 }
310
311 mOwningEventTarget = GetCurrentThreadSerialEventTarget();
312 MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget));
313
314 // Maybe we have no reasons to keep this actor alive.
315 if (mStreams.IsEmpty()) {
316 mState = eInactive;
317 SendClose();
318 return;
319 }
320
321 mState = eActive;
322
323 // Let's processing the pending operations. We need a stream for each pending
324 // operation.
325 for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
326 SendStreamNeeded();
327 }
328 }
329
330 } // namespace dom
331 } // namespace mozilla
332