1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "IPCStreamSource.h"
8 
9 #include "BackgroundParent.h"  // for AssertIsOnBackgroundThread
10 #include "mozilla/UniquePtr.h"
11 #include "mozilla/dom/RemoteWorkerService.h"
12 #include "mozilla/dom/WorkerCommon.h"
13 #include "mozilla/webrender/WebRenderTypes.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsIRunnable.h"
17 #include "nsISerialEventTarget.h"
18 #include "nsStreamUtils.h"
19 #include "nsThreadUtils.h"
20 #include "nsIThread.h"
21 
22 using mozilla::wr::ByteBuffer;
23 
24 namespace mozilla {
25 namespace ipc {
26 
27 class IPCStreamSource::Callback final : public DiscardableRunnable,
28                                         public nsIInputStreamCallback {
29  public:
Callback(IPCStreamSource * aSource)30   explicit Callback(IPCStreamSource* aSource)
31       : DiscardableRunnable("IPCStreamSource::Callback"),
32         mSource(aSource),
33         mOwningEventTarget(GetCurrentSerialEventTarget()) {
34     MOZ_ASSERT(mSource);
35   }
36 
37   NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream * aStream)38   OnInputStreamReady(nsIAsyncInputStream* aStream) override {
39     // any thread
40     if (mOwningEventTarget->IsOnCurrentThread()) {
41       return Run();
42     }
43 
44     // If this fails, then it means the owning thread is a Worker that has
45     // been shutdown.  Its ok to lose the event in this case because the
46     // IPCStreamChild listens for this event through the WorkerRef.
47     nsresult rv =
48         mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
49     if (NS_FAILED(rv)) {
50       NS_WARNING("Failed to dispatch stream readable event to owning thread");
51     }
52 
53     return NS_OK;
54   }
55 
56   NS_IMETHOD
Run()57   Run() override {
58     MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
59     if (mSource) {
60       mSource->OnStreamReady(this);
61     }
62     return NS_OK;
63   }
64 
65   // OnDiscard() gets called when the Worker thread is being shutdown.  We have
66   // nothing to do here because IPCStreamChild handles this case via
67   // the WorkerRef.
68 
ClearSource()69   void ClearSource() {
70     MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
71     MOZ_ASSERT(mSource);
72     mSource = nullptr;
73   }
74 
75  private:
~Callback()76   ~Callback() {
77     // called on any thread
78 
79     // ClearSource() should be called before the Callback is destroyed
80     MOZ_ASSERT(!mSource);
81   }
82 
83   // This is a raw pointer because the source keeps alive the callback and,
84   // before beeing destroyed, it nullifies this pointer (this happens when
85   // ActorDestroyed() is called).
86   IPCStreamSource* mSource;
87 
88   nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
89 
90   NS_DECL_ISUPPORTS_INHERITED
91 };
92 
93 NS_IMPL_ISUPPORTS_INHERITED(IPCStreamSource::Callback, DiscardableRunnable,
94                             nsIInputStreamCallback);
95 
IPCStreamSource(nsIAsyncInputStream * aInputStream)96 IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
97     : mStream(aInputStream), mState(ePending) {
98   MOZ_ASSERT(aInputStream);
99 }
100 
~IPCStreamSource()101 IPCStreamSource::~IPCStreamSource() {
102   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
103   MOZ_ASSERT(mState == eClosed);
104   MOZ_ASSERT(!mCallback);
105   MOZ_ASSERT(!mWorkerRef);
106 }
107 
Initialize()108 bool IPCStreamSource::Initialize() {
109   bool nonBlocking = false;
110   MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
111   // IPCStreamChild reads in the current thread, so it is only supported on
112   // non-blocking, async channels
113   if (!nonBlocking) {
114     return false;
115   }
116 
117   // A source can be used on any thread, but we only support IPCStream on
118   // main thread, Workers, Worker Launcher, and PBackground thread right now.
119   // This is due to the requirement  that the thread be guaranteed to live long
120   // enough to receive messages. We can enforce this guarantee with a
121   // StrongWorkerRef on worker threads, but not other threads. Main-thread,
122   // PBackground, and Worker Launcher threads do not need anything special in
123   // order to be kept alive.
124   if (!NS_IsMainThread()) {
125     if (const auto workerPrivate = dom::GetCurrentThreadWorkerPrivate()) {
126       RefPtr<dom::StrongWorkerRef> workerRef =
127           dom::StrongWorkerRef::CreateForcibly(workerPrivate,
128                                                "IPCStreamSource");
129       if (NS_WARN_IF(!workerRef)) {
130         return false;
131       }
132 
133       mWorkerRef = std::move(workerRef);
134     } else {
135       MOZ_DIAGNOSTIC_ASSERT(
136           IsOnBackgroundThread() ||
137           dom::RemoteWorkerService::Thread()->IsOnCurrentThread());
138     }
139   }
140 
141   return true;
142 }
143 
ActorConstructed()144 void IPCStreamSource::ActorConstructed() {
145   MOZ_ASSERT(mState == ePending);
146   mState = eActorConstructed;
147 }
148 
ActorDestroyed()149 void IPCStreamSource::ActorDestroyed() {
150   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
151 
152   mState = eClosed;
153 
154   if (mCallback) {
155     mCallback->ClearSource();
156     mCallback = nullptr;
157   }
158 
159   mWorkerRef = nullptr;
160 }
161 
Start()162 void IPCStreamSource::Start() {
163   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
164   DoRead();
165 }
166 
StartDestroy()167 void IPCStreamSource::StartDestroy() {
168   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
169   OnEnd(NS_ERROR_ABORT);
170 }
171 
DoRead()172 void IPCStreamSource::DoRead() {
173   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
174   MOZ_ASSERT(mState == eActorConstructed);
175   MOZ_ASSERT(!mCallback);
176 
177   // The input stream (likely a pipe) probably uses a segment size of
178   // 4kb.  If there is data already buffered it would be nice to aggregate
179   // multiple segments into a single IPC call.  Conversely, don't send too
180   // too large of a buffer in a single call to avoid spiking memory.
181   static const uint64_t kMaxBytesPerMessage = 32 * 1024;
182   static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
183                 "kMaxBytesPerMessage must cleanly cast to uint32_t");
184 
185   UniquePtr<char[]> buffer(new char[kMaxBytesPerMessage]);
186 
187   while (true) {
188     // It should not be possible to transition to closed state without
189     // this loop terminating via a return.
190     MOZ_ASSERT(mState == eActorConstructed);
191 
192     // See if the stream is closed by checking the return of Available.
193     uint64_t dummy;
194     nsresult rv = mStream->Available(&dummy);
195     if (NS_FAILED(rv)) {
196       OnEnd(rv);
197       return;
198     }
199 
200     uint32_t bytesRead = 0;
201     rv = mStream->Read(buffer.get(), kMaxBytesPerMessage, &bytesRead);
202 
203     if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
204       MOZ_ASSERT(bytesRead == 0);
205       Wait();
206       return;
207     }
208 
209     if (NS_FAILED(rv)) {
210       MOZ_ASSERT(bytesRead == 0);
211       OnEnd(rv);
212       return;
213     }
214 
215     // Zero-byte read indicates end-of-stream.
216     if (bytesRead == 0) {
217       OnEnd(NS_BASE_STREAM_CLOSED);
218       return;
219     }
220 
221     // We read some data from the stream, send it across.
222     SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer.get())));
223   }
224 }
225 
Wait()226 void IPCStreamSource::Wait() {
227   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
228   MOZ_ASSERT(mState == eActorConstructed);
229   MOZ_ASSERT(!mCallback);
230 
231   // Set mCallback immediately instead of waiting for success.  Its possible
232   // AsyncWait() will callback synchronously.
233   mCallback = new Callback(this);
234   nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
235   if (NS_FAILED(rv)) {
236     OnEnd(rv);
237     return;
238   }
239 }
240 
OnStreamReady(Callback * aCallback)241 void IPCStreamSource::OnStreamReady(Callback* aCallback) {
242   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
243   MOZ_ASSERT(mCallback);
244   MOZ_ASSERT(aCallback == mCallback);
245   mCallback->ClearSource();
246   mCallback = nullptr;
247 
248   // Possibly closed if this callback is (indirectly) called by
249   // IPCStreamSourceParent::RecvRequestClose().
250   if (mState == eClosed) {
251     return;
252   }
253 
254   DoRead();
255 }
256 
OnEnd(nsresult aRv)257 void IPCStreamSource::OnEnd(nsresult aRv) {
258   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
259   MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
260 
261   if (mState == eClosed) {
262     return;
263   }
264 
265   mState = eClosed;
266 
267   mStream->CloseWithStatus(aRv);
268 
269   if (aRv == NS_BASE_STREAM_CLOSED) {
270     aRv = NS_OK;
271   }
272 
273   // This will trigger an ActorDestroy() from the other side
274   Close(aRv);
275 }
276 
277 }  // namespace ipc
278 }  // namespace mozilla
279