1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "IPCStreamDestination.h"
8 #include "mozilla/InputStreamLengthWrapper.h"
9 #include "mozilla/Mutex.h"
10 #include "nsIAsyncInputStream.h"
11 #include "nsIAsyncOutputStream.h"
12 #include "nsIBufferedStreams.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIPipe.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/webrender/WebRenderTypes.h"
17 
18 namespace mozilla {
19 namespace ipc {
20 
21 // ----------------------------------------------------------------------------
22 // IPCStreamDestination::DelayedStartInputStream
23 //
24 // When AutoIPCStream is used with delayedStart, we need to ask for data at the
25 // first real use of the nsIInputStream. In order to do so, we wrap the
26 // nsIInputStream, created by the nsIPipe, with this wrapper.
27 
28 class IPCStreamDestination::DelayedStartInputStream final
29     : public nsIAsyncInputStream,
30       public nsIInputStreamCallback,
31       public nsISearchableInputStream,
32       public nsICloneableInputStream,
33       public nsIBufferedInputStream {
34  public:
35   NS_DECL_THREADSAFE_ISUPPORTS
36 
DelayedStartInputStream(IPCStreamDestination * aDestination,nsCOMPtr<nsIAsyncInputStream> && aStream)37   DelayedStartInputStream(IPCStreamDestination* aDestination,
38                           nsCOMPtr<nsIAsyncInputStream>&& aStream)
39       : mDestination(aDestination),
40         mStream(std::move(aStream)),
41         mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") {
42     MOZ_ASSERT(mDestination);
43     MOZ_ASSERT(mStream);
44   }
45 
DestinationShutdown()46   void DestinationShutdown() {
47     MutexAutoLock lock(mMutex);
48     mDestination = nullptr;
49   }
50 
51   // nsIInputStream interface
52 
53   NS_IMETHOD
Close()54   Close() override {
55     MaybeCloseDestination();
56     return mStream->Close();
57   }
58 
59   NS_IMETHOD
Available(uint64_t * aLength)60   Available(uint64_t* aLength) override {
61     MaybeStartReading();
62     return mStream->Available(aLength);
63   }
64 
65   NS_IMETHOD
Read(char * aBuffer,uint32_t aCount,uint32_t * aReadCount)66   Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
67     MaybeStartReading();
68     return mStream->Read(aBuffer, aCount, aReadCount);
69   }
70 
71   NS_IMETHOD
ReadSegments(nsWriteSegmentFun aWriter,void * aClosure,uint32_t aCount,uint32_t * aResult)72   ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
73                uint32_t* aResult) override {
74     MaybeStartReading();
75     return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
76   }
77 
78   NS_IMETHOD
IsNonBlocking(bool * aNonBlocking)79   IsNonBlocking(bool* aNonBlocking) override {
80     MaybeStartReading();
81     return mStream->IsNonBlocking(aNonBlocking);
82   }
83 
84   // nsIAsyncInputStream interface
85 
86   NS_IMETHOD
CloseWithStatus(nsresult aReason)87   CloseWithStatus(nsresult aReason) override {
88     MaybeCloseDestination();
89     return mStream->CloseWithStatus(aReason);
90   }
91 
92   NS_IMETHOD
AsyncWait(nsIInputStreamCallback * aCallback,uint32_t aFlags,uint32_t aRequestedCount,nsIEventTarget * aTarget)93   AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
94             uint32_t aRequestedCount, nsIEventTarget* aTarget) override {
95     {
96       MutexAutoLock lock(mMutex);
97       if (mAsyncWaitCallback && aCallback) {
98         return NS_ERROR_FAILURE;
99       }
100 
101       mAsyncWaitCallback = aCallback;
102 
103       MaybeStartReading(lock);
104     }
105 
106     nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
107     return mStream->AsyncWait(callback, aFlags, aRequestedCount, aTarget);
108   }
109 
110   NS_IMETHOD
Search(const char * aForString,bool aIgnoreCase,bool * aFound,uint32_t * aOffsetSearchedTo)111   Search(const char* aForString, bool aIgnoreCase, bool* aFound,
112          uint32_t* aOffsetSearchedTo) override {
113     MaybeStartReading();
114     nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
115     MOZ_ASSERT(searchable);
116     return searchable->Search(aForString, aIgnoreCase, aFound,
117                               aOffsetSearchedTo);
118   }
119 
120   // nsICloneableInputStream interface
121 
122   NS_IMETHOD
GetCloneable(bool * aCloneable)123   GetCloneable(bool* aCloneable) override {
124     MaybeStartReading();
125     nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
126     MOZ_ASSERT(cloneable);
127     return cloneable->GetCloneable(aCloneable);
128   }
129 
130   NS_IMETHOD
Clone(nsIInputStream ** aResult)131   Clone(nsIInputStream** aResult) override {
132     MaybeStartReading();
133     nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
134     MOZ_ASSERT(cloneable);
135     return cloneable->Clone(aResult);
136   }
137 
138   // nsIBufferedInputStream
139 
140   NS_IMETHOD
Init(nsIInputStream * aStream,uint32_t aBufferSize)141   Init(nsIInputStream* aStream, uint32_t aBufferSize) override {
142     MaybeStartReading();
143     nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
144     MOZ_ASSERT(stream);
145     return stream->Init(aStream, aBufferSize);
146   }
147 
148   NS_IMETHODIMP
GetData(nsIInputStream ** aResult)149   GetData(nsIInputStream** aResult) override {
150     return NS_ERROR_NOT_IMPLEMENTED;
151   }
152 
153   // nsIInputStreamCallback
154 
155   NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream * aStream)156   OnInputStreamReady(nsIAsyncInputStream* aStream) override {
157     nsCOMPtr<nsIInputStreamCallback> callback;
158 
159     {
160       MutexAutoLock lock(mMutex);
161 
162       // We have been canceled in the meanwhile.
163       if (!mAsyncWaitCallback) {
164         return NS_OK;
165       }
166 
167       callback.swap(mAsyncWaitCallback);
168     }
169 
170     callback->OnInputStreamReady(this);
171     return NS_OK;
172   }
173 
174   void MaybeStartReading();
175   void MaybeStartReading(const MutexAutoLock& aProofOfLook);
176 
177   void MaybeCloseDestination();
178 
179  private:
180   ~DelayedStartInputStream() = default;
181 
182   IPCStreamDestination* mDestination;
183   nsCOMPtr<nsIAsyncInputStream> mStream;
184 
185   nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
186 
187   // This protects mDestination: any method can be called by any thread.
188   Mutex mMutex;
189 
190   class HelperRunnable;
191 };
192 
193 class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
194     : public Runnable {
195  public:
196   enum Op {
197     eStartReading,
198     eCloseDestination,
199   };
200 
HelperRunnable(IPCStreamDestination::DelayedStartInputStream * aDelayedStartInputStream,Op aOp)201   HelperRunnable(
202       IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
203       Op aOp)
204       : Runnable(
205             "ipc::IPCStreamDestination::DelayedStartInputStream::"
206             "HelperRunnable"),
207         mDelayedStartInputStream(aDelayedStartInputStream),
208         mOp(aOp) {
209     MOZ_ASSERT(aDelayedStartInputStream);
210   }
211 
212   NS_IMETHOD
Run()213   Run() override {
214     switch (mOp) {
215       case eStartReading:
216         mDelayedStartInputStream->MaybeStartReading();
217         break;
218       case eCloseDestination:
219         mDelayedStartInputStream->MaybeCloseDestination();
220         break;
221     }
222 
223     return NS_OK;
224   }
225 
226  private:
227   RefPtr<IPCStreamDestination::DelayedStartInputStream>
228       mDelayedStartInputStream;
229   Op mOp;
230 };
231 
MaybeStartReading()232 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() {
233   MutexAutoLock lock(mMutex);
234   MaybeStartReading(lock);
235 }
236 
MaybeStartReading(const MutexAutoLock & aProofOfLook)237 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading(
238     const MutexAutoLock& aProofOfLook) {
239   if (!mDestination) {
240     return;
241   }
242 
243   if (mDestination->IsOnOwningThread()) {
244     mDestination->StartReading();
245     mDestination = nullptr;
246     return;
247   }
248 
249   RefPtr<Runnable> runnable =
250       new HelperRunnable(this, HelperRunnable::eStartReading);
251   mDestination->DispatchRunnable(runnable.forget());
252 }
253 
MaybeCloseDestination()254 void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() {
255   MutexAutoLock lock(mMutex);
256   if (!mDestination) {
257     return;
258   }
259 
260   if (mDestination->IsOnOwningThread()) {
261     mDestination->RequestClose(NS_ERROR_ABORT);
262     mDestination = nullptr;
263     return;
264   }
265 
266   RefPtr<Runnable> runnable =
267       new HelperRunnable(this, HelperRunnable::eCloseDestination);
268   mDestination->DispatchRunnable(runnable.forget());
269 }
270 
271 NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
272 NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
273 
274 NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)275   NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
276   NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
277   NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
278   NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
279   NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
280   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
281   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
282 NS_INTERFACE_MAP_END
283 
284 // ----------------------------------------------------------------------------
285 // IPCStreamDestination
286 
287 IPCStreamDestination::IPCStreamDestination()
288     : mOwningThread(NS_GetCurrentThread()),
289       mDelayedStart(false)
290 #ifdef MOZ_DEBUG
291       ,
292       mLengthSet(false)
293 #endif
294 {
295 }
296 
297 IPCStreamDestination::~IPCStreamDestination() = default;
298 
Initialize()299 nsresult IPCStreamDestination::Initialize() {
300   MOZ_ASSERT(!mReader);
301   MOZ_ASSERT(!mWriter);
302 
303   // use async versions for both reader and writer even though we are
304   // opening the writer as an infinite stream.  We want to be able to
305   // use CloseWithStatus() to communicate errors through the pipe.
306 
307   // Use an "infinite" pipe because we cannot apply back-pressure through
308   // the async IPC layer at the moment.  Blocking the IPC worker thread
309   // is not desirable, either.
310   nsresult rv = NS_NewPipe2(getter_AddRefs(mReader), getter_AddRefs(mWriter),
311                             true, true,   // non-blocking
312                             0,            // segment size
313                             UINT32_MAX);  // "infinite" pipe
314   if (NS_WARN_IF(NS_FAILED(rv))) {
315     return rv;
316   }
317 
318   return NS_OK;
319 }
320 
SetDelayedStart(bool aDelayedStart)321 void IPCStreamDestination::SetDelayedStart(bool aDelayedStart) {
322   mDelayedStart = aDelayedStart;
323 }
324 
SetLength(int64_t aLength)325 void IPCStreamDestination::SetLength(int64_t aLength) {
326   MOZ_ASSERT(mReader);
327   MOZ_ASSERT(!mLengthSet);
328 
329 #ifdef DEBUG
330   mLengthSet = true;
331 #endif
332 
333   mLength = aLength;
334 }
335 
TakeReader()336 already_AddRefed<nsIInputStream> IPCStreamDestination::TakeReader() {
337   MOZ_ASSERT(mReader);
338   MOZ_ASSERT(!mDelayedStartInputStream);
339 
340   nsCOMPtr<nsIAsyncInputStream> reader{mReader.forget()};
341   if (mDelayedStart) {
342     mDelayedStartInputStream =
343         new DelayedStartInputStream(this, std::move(reader));
344     reader = mDelayedStartInputStream;
345     MOZ_ASSERT(reader);
346   }
347 
348   if (mLength != -1) {
349     MOZ_ASSERT(mLengthSet);
350     nsCOMPtr<nsIInputStream> finalStream =
351         new InputStreamLengthWrapper(reader.forget(), mLength);
352     reader = do_QueryInterface(finalStream);
353     MOZ_ASSERT(reader);
354   }
355 
356   return reader.forget();
357 }
358 
IsOnOwningThread() const359 bool IPCStreamDestination::IsOnOwningThread() const {
360   return mOwningThread == NS_GetCurrentThread();
361 }
362 
DispatchRunnable(already_AddRefed<nsIRunnable> && aRunnable)363 void IPCStreamDestination::DispatchRunnable(
364     already_AddRefed<nsIRunnable>&& aRunnable) {
365   nsCOMPtr<nsIRunnable> runnable = aRunnable;
366   mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
367 }
368 
ActorDestroyed()369 void IPCStreamDestination::ActorDestroyed() {
370   MOZ_ASSERT(mWriter);
371 
372   // If we were gracefully closed we should have gotten RecvClose().  In
373   // that case, the writer will already be closed and this will have no
374   // effect.  This just aborts the writer in the case where the child process
375   // crashes.
376   mWriter->CloseWithStatus(NS_ERROR_ABORT);
377 
378   if (mDelayedStartInputStream) {
379     mDelayedStartInputStream->DestinationShutdown();
380     mDelayedStartInputStream = nullptr;
381   }
382 }
383 
BufferReceived(const wr::ByteBuffer & aBuffer)384 void IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer) {
385   MOZ_ASSERT(mWriter);
386 
387   uint32_t numWritten = 0;
388 
389   // This should only fail if we hit an OOM condition.
390   nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData),
391                                aBuffer.mLength, &numWritten);
392   if (NS_WARN_IF(NS_FAILED(rv))) {
393     RequestClose(rv);
394   }
395 }
396 
CloseReceived(nsresult aRv)397 void IPCStreamDestination::CloseReceived(nsresult aRv) {
398   MOZ_ASSERT(mWriter);
399   mWriter->CloseWithStatus(aRv);
400   TerminateDestination();
401 }
402 
403 }  // namespace ipc
404 }  // namespace mozilla
405