1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "IPCBlobInputStreamChild.h"
8 #include "IPCBlobInputStreamThread.h"
9 
10 #include "mozilla/ipc/IPCStreamUtils.h"
11 #include "mozilla/dom/WorkerHolder.h"
12 #include "mozilla/dom/WorkerPrivate.h"
13 #include "mozilla/dom/WorkerRunnable.h"
14 
15 namespace mozilla {
16 namespace dom {
17 
18 namespace {
19 
20 // This runnable is used in case the last stream is forgotten on the 'wrong'
21 // thread.
22 class ShutdownRunnable final : public CancelableRunnable {
23  public:
ShutdownRunnable(IPCBlobInputStreamChild * aActor)24   explicit ShutdownRunnable(IPCBlobInputStreamChild* aActor)
25       : CancelableRunnable("dom::ShutdownRunnable"), mActor(aActor) {}
26 
27   NS_IMETHOD
Run()28   Run() override {
29     mActor->Shutdown();
30     return NS_OK;
31   }
32 
33  private:
34   RefPtr<IPCBlobInputStreamChild> mActor;
35 };
36 
37 // This runnable is used in case StreamNeeded() has been called on a non-owning
38 // thread.
39 class StreamNeededRunnable final : public CancelableRunnable {
40  public:
StreamNeededRunnable(IPCBlobInputStreamChild * aActor)41   explicit StreamNeededRunnable(IPCBlobInputStreamChild* aActor)
42       : CancelableRunnable("dom::StreamNeededRunnable"), mActor(aActor) {}
43 
44   NS_IMETHOD
Run()45   Run() override {
46     MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
47                mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
48     if (mActor->State() == IPCBlobInputStreamChild::eActive) {
49       mActor->SendStreamNeeded();
50     }
51     return NS_OK;
52   }
53 
54  private:
55   RefPtr<IPCBlobInputStreamChild> mActor;
56 };
57 
58 // When the stream has been received from the parent, we inform the
59 // IPCBlobInputStream.
60 class StreamReadyRunnable final : public CancelableRunnable {
61  public:
StreamReadyRunnable(IPCBlobInputStream * aDestinationStream,already_AddRefed<nsIInputStream> aCreatedStream)62   StreamReadyRunnable(IPCBlobInputStream* aDestinationStream,
63                       already_AddRefed<nsIInputStream> aCreatedStream)
64       : CancelableRunnable("dom::StreamReadyRunnable"),
65         mDestinationStream(aDestinationStream),
66         mCreatedStream(Move(aCreatedStream)) {
67     MOZ_ASSERT(mDestinationStream);
68     // mCreatedStream can be null.
69   }
70 
71   NS_IMETHOD
Run()72   Run() override {
73     mDestinationStream->StreamReady(mCreatedStream.forget());
74     return NS_OK;
75   }
76 
77  private:
78   RefPtr<IPCBlobInputStream> mDestinationStream;
79   nsCOMPtr<nsIInputStream> mCreatedStream;
80 };
81 
82 class IPCBlobInputStreamWorkerHolder final : public WorkerHolder {
83  public:
IPCBlobInputStreamWorkerHolder()84   IPCBlobInputStreamWorkerHolder()
85       : WorkerHolder("IPCBlobInputStreamWorkerHolder") {}
86 
Notify(WorkerStatus aStatus)87   bool Notify(WorkerStatus aStatus) override {
88     // We must keep the worker alive until the migration is completed.
89     return true;
90   }
91 };
92 
93 class ReleaseWorkerHolderRunnable final : public CancelableRunnable {
94  public:
ReleaseWorkerHolderRunnable(UniquePtr<WorkerHolder> && aWorkerHolder)95   explicit ReleaseWorkerHolderRunnable(UniquePtr<WorkerHolder>&& aWorkerHolder)
96       : CancelableRunnable("dom::ReleaseWorkerHolderRunnable"),
97         mWorkerHolder(Move(aWorkerHolder)) {}
98 
99   NS_IMETHOD
Run()100   Run() override {
101     mWorkerHolder = nullptr;
102     return NS_OK;
103   }
104 
Cancel()105   nsresult Cancel() override { return Run(); }
106 
107  private:
108   UniquePtr<WorkerHolder> mWorkerHolder;
109 };
110 
111 }  // namespace
112 
IPCBlobInputStreamChild(const nsID & aID,uint64_t aSize)113 IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
114                                                  uint64_t aSize)
115     : mMutex("IPCBlobInputStreamChild::mMutex"),
116       mID(aID),
117       mSize(aSize),
118       mState(eActive),
119       mOwningEventTarget(GetCurrentThreadSerialEventTarget()) {
120   // If we are running in a worker, we need to send a Close() to the parent side
121   // before the thread is released.
122   if (!NS_IsMainThread()) {
123     WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
124     if (workerPrivate) {
125       UniquePtr<WorkerHolder> workerHolder(
126           new IPCBlobInputStreamWorkerHolder());
127       if (workerHolder->HoldWorker(workerPrivate, Canceling)) {
128         mWorkerHolder.swap(workerHolder);
129       }
130     }
131   }
132 }
133 
~IPCBlobInputStreamChild()134 IPCBlobInputStreamChild::~IPCBlobInputStreamChild() {}
135 
Shutdown()136 void IPCBlobInputStreamChild::Shutdown() {
137   MutexAutoLock lock(mMutex);
138 
139   RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
140 
141   mWorkerHolder = nullptr;
142   mPendingOperations.Clear();
143 
144   if (mState == eActive) {
145     SendClose();
146     mState = eInactive;
147   }
148 }
149 
ActorDestroy(IProtocol::ActorDestroyReason aReason)150 void IPCBlobInputStreamChild::ActorDestroy(
151     IProtocol::ActorDestroyReason aReason) {
152   bool migrating = false;
153 
154   {
155     MutexAutoLock lock(mMutex);
156     migrating = mState == eActiveMigrating;
157     mState = migrating ? eInactiveMigrating : eInactive;
158   }
159 
160   if (migrating) {
161     // We were waiting for this! Now we can migrate the actor in the correct
162     // thread.
163     RefPtr<IPCBlobInputStreamThread> thread =
164         IPCBlobInputStreamThread::GetOrCreate();
165     MOZ_ASSERT(thread, "We cannot continue without DOMFile thread.");
166 
167     ResetManager();
168     thread->MigrateActor(this);
169     return;
170   }
171 
172   // Let's cleanup the workerHolder and the pending operation queue.
173   Shutdown();
174 }
175 
State()176 IPCBlobInputStreamChild::ActorState IPCBlobInputStreamChild::State() {
177   MutexAutoLock lock(mMutex);
178   return mState;
179 }
180 
CreateStream()181 already_AddRefed<IPCBlobInputStream> IPCBlobInputStreamChild::CreateStream() {
182   bool shouldMigrate = false;
183 
184   RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);
185 
186   {
187     MutexAutoLock lock(mMutex);
188 
189     if (mState == eInactive) {
190       return nullptr;
191     }
192 
193     // The stream is active but maybe it is not running in the DOM-File thread.
194     // We should migrate it there.
195     if (mState == eActive &&
196         !IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget)) {
197       MOZ_ASSERT(mStreams.IsEmpty());
198       shouldMigrate = true;
199       mState = eActiveMigrating;
200     }
201 
202     mStreams.AppendElement(stream);
203   }
204 
205   // Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
206   // time.
207   if (shouldMigrate) {
208     Send__delete__(this);
209   }
210 
211   return stream.forget();
212 }
213 
ForgetStream(IPCBlobInputStream * aStream)214 void IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream) {
215   MOZ_ASSERT(aStream);
216 
217   RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
218 
219   {
220     MutexAutoLock lock(mMutex);
221     mStreams.RemoveElement(aStream);
222 
223     if (!mStreams.IsEmpty() || mState != eActive) {
224       return;
225     }
226   }
227 
228   if (mOwningEventTarget->IsOnCurrentThread()) {
229     Shutdown();
230     return;
231   }
232 
233   RefPtr<ShutdownRunnable> runnable = new ShutdownRunnable(this);
234   mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
235 }
236 
StreamNeeded(IPCBlobInputStream * aStream,nsIEventTarget * aEventTarget)237 void IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
238                                            nsIEventTarget* aEventTarget) {
239   MutexAutoLock lock(mMutex);
240 
241   if (mState == eInactive) {
242     return;
243   }
244 
245   MOZ_ASSERT(mStreams.Contains(aStream));
246 
247   PendingOperation* opt = mPendingOperations.AppendElement();
248   opt->mStream = aStream;
249   opt->mEventTarget = aEventTarget;
250 
251   if (mState == eActiveMigrating || mState == eInactiveMigrating) {
252     // This operation will be continued when the migration is completed.
253     return;
254   }
255 
256   MOZ_ASSERT(mState == eActive);
257 
258   if (mOwningEventTarget->IsOnCurrentThread()) {
259     SendStreamNeeded();
260     return;
261   }
262 
263   RefPtr<StreamNeededRunnable> runnable = new StreamNeededRunnable(this);
264   mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
265 }
266 
RecvStreamReady(const OptionalIPCStream & aStream)267 mozilla::ipc::IPCResult IPCBlobInputStreamChild::RecvStreamReady(
268     const OptionalIPCStream& aStream) {
269   nsCOMPtr<nsIInputStream> stream = mozilla::ipc::DeserializeIPCStream(aStream);
270 
271   RefPtr<IPCBlobInputStream> pendingStream;
272   nsCOMPtr<nsIEventTarget> eventTarget;
273 
274   {
275     MutexAutoLock lock(mMutex);
276     MOZ_ASSERT(!mPendingOperations.IsEmpty());
277     MOZ_ASSERT(mState == eActive);
278 
279     pendingStream = mPendingOperations[0].mStream;
280     eventTarget = mPendingOperations[0].mEventTarget;
281 
282     mPendingOperations.RemoveElementAt(0);
283   }
284 
285   RefPtr<StreamReadyRunnable> runnable =
286       new StreamReadyRunnable(pendingStream, stream.forget());
287 
288   // If IPCBlobInputStream::AsyncWait() has been executed without passing an
289   // event target, we run the callback synchronous because any thread could be
290   // result to be the wrong one. See more in nsIAsyncInputStream::asyncWait
291   // documentation.
292   if (eventTarget) {
293     eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
294   } else {
295     runnable->Run();
296   }
297 
298   return IPC_OK();
299 }
300 
Migrated()301 void IPCBlobInputStreamChild::Migrated() {
302   MutexAutoLock lock(mMutex);
303   MOZ_ASSERT(mState == eInactiveMigrating);
304 
305   if (mWorkerHolder) {
306     RefPtr<ReleaseWorkerHolderRunnable> runnable =
307         new ReleaseWorkerHolderRunnable(Move(mWorkerHolder));
308     mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
309   }
310 
311   mOwningEventTarget = GetCurrentThreadSerialEventTarget();
312   MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget));
313 
314   // Maybe we have no reasons to keep this actor alive.
315   if (mStreams.IsEmpty()) {
316     mState = eInactive;
317     SendClose();
318     return;
319   }
320 
321   mState = eActive;
322 
323   // Let's processing the pending operations. We need a stream for each pending
324   // operation.
325   for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
326     SendStreamNeeded();
327   }
328 }
329 
330 }  // namespace dom
331 }  // namespace mozilla
332