1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "IPCStreamSource.h"
8
9 #include "BackgroundParent.h" // for AssertIsOnBackgroundThread
10 #include "mozilla/UniquePtr.h"
11 #include "mozilla/dom/RemoteWorkerService.h"
12 #include "mozilla/dom/WorkerCommon.h"
13 #include "mozilla/webrender/WebRenderTypes.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsIRunnable.h"
17 #include "nsISerialEventTarget.h"
18 #include "nsStreamUtils.h"
19 #include "nsThreadUtils.h"
20 #include "nsIThread.h"
21
22 using mozilla::wr::ByteBuffer;
23
24 namespace mozilla {
25 namespace ipc {
26
27 class IPCStreamSource::Callback final : public DiscardableRunnable,
28 public nsIInputStreamCallback {
29 public:
Callback(IPCStreamSource * aSource)30 explicit Callback(IPCStreamSource* aSource)
31 : DiscardableRunnable("IPCStreamSource::Callback"),
32 mSource(aSource),
33 mOwningEventTarget(GetCurrentSerialEventTarget()) {
34 MOZ_ASSERT(mSource);
35 }
36
37 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream * aStream)38 OnInputStreamReady(nsIAsyncInputStream* aStream) override {
39 // any thread
40 if (mOwningEventTarget->IsOnCurrentThread()) {
41 return Run();
42 }
43
44 // If this fails, then it means the owning thread is a Worker that has
45 // been shutdown. Its ok to lose the event in this case because the
46 // IPCStreamChild listens for this event through the WorkerRef.
47 nsresult rv =
48 mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
49 if (NS_FAILED(rv)) {
50 NS_WARNING("Failed to dispatch stream readable event to owning thread");
51 }
52
53 return NS_OK;
54 }
55
56 NS_IMETHOD
Run()57 Run() override {
58 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
59 if (mSource) {
60 mSource->OnStreamReady(this);
61 }
62 return NS_OK;
63 }
64
65 // OnDiscard() gets called when the Worker thread is being shutdown. We have
66 // nothing to do here because IPCStreamChild handles this case via
67 // the WorkerRef.
68
ClearSource()69 void ClearSource() {
70 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
71 MOZ_ASSERT(mSource);
72 mSource = nullptr;
73 }
74
75 private:
~Callback()76 ~Callback() {
77 // called on any thread
78
79 // ClearSource() should be called before the Callback is destroyed
80 MOZ_ASSERT(!mSource);
81 }
82
83 // This is a raw pointer because the source keeps alive the callback and,
84 // before beeing destroyed, it nullifies this pointer (this happens when
85 // ActorDestroyed() is called).
86 IPCStreamSource* mSource;
87
88 nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
89
90 NS_DECL_ISUPPORTS_INHERITED
91 };
92
93 NS_IMPL_ISUPPORTS_INHERITED(IPCStreamSource::Callback, DiscardableRunnable,
94 nsIInputStreamCallback);
95
IPCStreamSource(nsIAsyncInputStream * aInputStream)96 IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
97 : mStream(aInputStream), mState(ePending) {
98 MOZ_ASSERT(aInputStream);
99 }
100
~IPCStreamSource()101 IPCStreamSource::~IPCStreamSource() {
102 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
103 MOZ_ASSERT(mState == eClosed);
104 MOZ_ASSERT(!mCallback);
105 MOZ_ASSERT(!mWorkerRef);
106 }
107
Initialize()108 bool IPCStreamSource::Initialize() {
109 bool nonBlocking = false;
110 MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
111 // IPCStreamChild reads in the current thread, so it is only supported on
112 // non-blocking, async channels
113 if (!nonBlocking) {
114 return false;
115 }
116
117 // A source can be used on any thread, but we only support IPCStream on
118 // main thread, Workers, Worker Launcher, and PBackground thread right now.
119 // This is due to the requirement that the thread be guaranteed to live long
120 // enough to receive messages. We can enforce this guarantee with a
121 // StrongWorkerRef on worker threads, but not other threads. Main-thread,
122 // PBackground, and Worker Launcher threads do not need anything special in
123 // order to be kept alive.
124 if (!NS_IsMainThread()) {
125 if (const auto workerPrivate = dom::GetCurrentThreadWorkerPrivate()) {
126 RefPtr<dom::StrongWorkerRef> workerRef =
127 dom::StrongWorkerRef::CreateForcibly(workerPrivate,
128 "IPCStreamSource");
129 if (NS_WARN_IF(!workerRef)) {
130 return false;
131 }
132
133 mWorkerRef = std::move(workerRef);
134 } else {
135 MOZ_DIAGNOSTIC_ASSERT(
136 IsOnBackgroundThread() ||
137 dom::RemoteWorkerService::Thread()->IsOnCurrentThread());
138 }
139 }
140
141 return true;
142 }
143
ActorConstructed()144 void IPCStreamSource::ActorConstructed() {
145 MOZ_ASSERT(mState == ePending);
146 mState = eActorConstructed;
147 }
148
ActorDestroyed()149 void IPCStreamSource::ActorDestroyed() {
150 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
151
152 mState = eClosed;
153
154 if (mCallback) {
155 mCallback->ClearSource();
156 mCallback = nullptr;
157 }
158
159 mWorkerRef = nullptr;
160 }
161
Start()162 void IPCStreamSource::Start() {
163 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
164 DoRead();
165 }
166
StartDestroy()167 void IPCStreamSource::StartDestroy() {
168 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
169 OnEnd(NS_ERROR_ABORT);
170 }
171
DoRead()172 void IPCStreamSource::DoRead() {
173 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
174 MOZ_ASSERT(mState == eActorConstructed);
175 MOZ_ASSERT(!mCallback);
176
177 // The input stream (likely a pipe) probably uses a segment size of
178 // 4kb. If there is data already buffered it would be nice to aggregate
179 // multiple segments into a single IPC call. Conversely, don't send too
180 // too large of a buffer in a single call to avoid spiking memory.
181 static const uint64_t kMaxBytesPerMessage = 32 * 1024;
182 static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
183 "kMaxBytesPerMessage must cleanly cast to uint32_t");
184
185 UniquePtr<char[]> buffer(new char[kMaxBytesPerMessage]);
186
187 while (true) {
188 // It should not be possible to transition to closed state without
189 // this loop terminating via a return.
190 MOZ_ASSERT(mState == eActorConstructed);
191
192 // See if the stream is closed by checking the return of Available.
193 uint64_t dummy;
194 nsresult rv = mStream->Available(&dummy);
195 if (NS_FAILED(rv)) {
196 OnEnd(rv);
197 return;
198 }
199
200 uint32_t bytesRead = 0;
201 rv = mStream->Read(buffer.get(), kMaxBytesPerMessage, &bytesRead);
202
203 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
204 MOZ_ASSERT(bytesRead == 0);
205 Wait();
206 return;
207 }
208
209 if (NS_FAILED(rv)) {
210 MOZ_ASSERT(bytesRead == 0);
211 OnEnd(rv);
212 return;
213 }
214
215 // Zero-byte read indicates end-of-stream.
216 if (bytesRead == 0) {
217 OnEnd(NS_BASE_STREAM_CLOSED);
218 return;
219 }
220
221 // We read some data from the stream, send it across.
222 SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer.get())));
223 }
224 }
225
Wait()226 void IPCStreamSource::Wait() {
227 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
228 MOZ_ASSERT(mState == eActorConstructed);
229 MOZ_ASSERT(!mCallback);
230
231 // Set mCallback immediately instead of waiting for success. Its possible
232 // AsyncWait() will callback synchronously.
233 mCallback = new Callback(this);
234 nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
235 if (NS_FAILED(rv)) {
236 OnEnd(rv);
237 return;
238 }
239 }
240
OnStreamReady(Callback * aCallback)241 void IPCStreamSource::OnStreamReady(Callback* aCallback) {
242 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
243 MOZ_ASSERT(mCallback);
244 MOZ_ASSERT(aCallback == mCallback);
245 mCallback->ClearSource();
246 mCallback = nullptr;
247
248 // Possibly closed if this callback is (indirectly) called by
249 // IPCStreamSourceParent::RecvRequestClose().
250 if (mState == eClosed) {
251 return;
252 }
253
254 DoRead();
255 }
256
OnEnd(nsresult aRv)257 void IPCStreamSource::OnEnd(nsresult aRv) {
258 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
259 MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
260
261 if (mState == eClosed) {
262 return;
263 }
264
265 mState = eClosed;
266
267 mStream->CloseWithStatus(aRv);
268
269 if (aRv == NS_BASE_STREAM_CLOSED) {
270 aRv = NS_OK;
271 }
272
273 // This will trigger an ActorDestroy() from the other side
274 Close(aRv);
275 }
276
277 } // namespace ipc
278 } // namespace mozilla
279