1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsCOMPtr.h"
11 #include "nsICloneableInputStream.h"
12 #include "nsIEventTarget.h"
13 #include "nsICancelableRunnable.h"
14 #include "nsISafeOutputStream.h"
15 #include "nsString.h"
16 #include "nsIAsyncInputStream.h"
17 #include "nsIAsyncOutputStream.h"
18 #include "nsIBufferedStreams.h"
19 #include "nsIPipe.h"
20 #include "nsNetCID.h"
21 #include "nsServiceManagerUtils.h"
22 #include "nsThreadUtils.h"
23 #include "nsITransport.h"
24 #include "nsIStreamTransportService.h"
25 #include "NonBlockingAsyncInputStream.h"
26 
27 using namespace mozilla;
28 
29 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
30 
31 //-----------------------------------------------------------------------------
32 
33 // This is a nsICancelableRunnable because we can dispatch it to Workers and
34 // those can be shut down at any time, and in these cases, Cancel() is called
35 // instead of Run().
36 class nsInputStreamReadyEvent final : public CancelableRunnable,
37                                       public nsIInputStreamCallback,
38                                       public nsIRunnablePriority {
39  public:
40   NS_DECL_ISUPPORTS_INHERITED
41 
nsInputStreamReadyEvent(const char * aName,nsIInputStreamCallback * aCallback,nsIEventTarget * aTarget,uint32_t aPriority)42   nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
43                           nsIEventTarget* aTarget, uint32_t aPriority)
44       : CancelableRunnable(aName),
45         mCallback(aCallback),
46         mTarget(aTarget),
47         mPriority(aPriority) {}
48 
49  private:
~nsInputStreamReadyEvent()50   ~nsInputStreamReadyEvent() {
51     if (!mCallback) {
52       return;
53     }
54     //
55     // whoa!!  looks like we never posted this event.  take care to
56     // release mCallback on the correct thread.  if mTarget lives on the
57     // calling thread, then we are ok.  otherwise, we have to try to
58     // proxy the Release over the right thread.  if that thread is dead,
59     // then there's nothing we can do... better to leak than crash.
60     //
61     bool val;
62     nsresult rv = mTarget->IsOnCurrentThread(&val);
63     if (NS_FAILED(rv) || !val) {
64       nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
65           "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
66       mCallback = nullptr;
67       if (event) {
68         rv = event->OnInputStreamReady(nullptr);
69         if (NS_FAILED(rv)) {
70           MOZ_ASSERT_UNREACHABLE("leaking stream event");
71           nsISupports* sup = event;
72           NS_ADDREF(sup);
73         }
74       }
75     }
76   }
77 
78  public:
OnInputStreamReady(nsIAsyncInputStream * aStream)79   NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
80     mStream = aStream;
81 
82     nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
83     if (NS_FAILED(rv)) {
84       NS_WARNING("Dispatch failed");
85       return NS_ERROR_FAILURE;
86     }
87 
88     return NS_OK;
89   }
90 
Run()91   NS_IMETHOD Run() override {
92     if (mCallback) {
93       if (mStream) {
94         mCallback->OnInputStreamReady(mStream);
95       }
96       mCallback = nullptr;
97     }
98     return NS_OK;
99   }
100 
Cancel()101   nsresult Cancel() override {
102     mCallback = nullptr;
103     return NS_OK;
104   }
105 
GetPriority(uint32_t * aPriority)106   NS_IMETHOD GetPriority(uint32_t* aPriority) override {
107     *aPriority = mPriority;
108     return NS_OK;
109   }
110 
111  private:
112   nsCOMPtr<nsIAsyncInputStream> mStream;
113   nsCOMPtr<nsIInputStreamCallback> mCallback;
114   nsCOMPtr<nsIEventTarget> mTarget;
115   uint32_t mPriority;
116 };
117 
118 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
119                             nsIInputStreamCallback, nsIRunnablePriority)
120 
121 //-----------------------------------------------------------------------------
122 
123 // This is a nsICancelableRunnable because we can dispatch it to Workers and
124 // those can be shut down at any time, and in these cases, Cancel() is called
125 // instead of Run().
126 class nsOutputStreamReadyEvent final : public CancelableRunnable,
127                                        public nsIOutputStreamCallback {
128  public:
129   NS_DECL_ISUPPORTS_INHERITED
130 
nsOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)131   nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
132                            nsIEventTarget* aTarget)
133       : CancelableRunnable("nsOutputStreamReadyEvent"),
134         mCallback(aCallback),
135         mTarget(aTarget) {}
136 
137  private:
~nsOutputStreamReadyEvent()138   ~nsOutputStreamReadyEvent() {
139     if (!mCallback) {
140       return;
141     }
142     //
143     // whoa!!  looks like we never posted this event.  take care to
144     // release mCallback on the correct thread.  if mTarget lives on the
145     // calling thread, then we are ok.  otherwise, we have to try to
146     // proxy the Release over the right thread.  if that thread is dead,
147     // then there's nothing we can do... better to leak than crash.
148     //
149     bool val;
150     nsresult rv = mTarget->IsOnCurrentThread(&val);
151     if (NS_FAILED(rv) || !val) {
152       nsCOMPtr<nsIOutputStreamCallback> event =
153           NS_NewOutputStreamReadyEvent(mCallback, mTarget);
154       mCallback = nullptr;
155       if (event) {
156         rv = event->OnOutputStreamReady(nullptr);
157         if (NS_FAILED(rv)) {
158           MOZ_ASSERT_UNREACHABLE("leaking stream event");
159           nsISupports* sup = event;
160           NS_ADDREF(sup);
161         }
162       }
163     }
164   }
165 
166  public:
OnOutputStreamReady(nsIAsyncOutputStream * aStream)167   NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
168     mStream = aStream;
169 
170     nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
171     if (NS_FAILED(rv)) {
172       NS_WARNING("PostEvent failed");
173       return NS_ERROR_FAILURE;
174     }
175 
176     return NS_OK;
177   }
178 
Run()179   NS_IMETHOD Run() override {
180     if (mCallback) {
181       if (mStream) {
182         mCallback->OnOutputStreamReady(mStream);
183       }
184       mCallback = nullptr;
185     }
186     return NS_OK;
187   }
188 
Cancel()189   nsresult Cancel() override {
190     mCallback = nullptr;
191     return NS_OK;
192   }
193 
194  private:
195   nsCOMPtr<nsIAsyncOutputStream> mStream;
196   nsCOMPtr<nsIOutputStreamCallback> mCallback;
197   nsCOMPtr<nsIEventTarget> mTarget;
198 };
199 
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent,CancelableRunnable,nsIOutputStreamCallback)200 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
201                             nsIOutputStreamCallback)
202 
203 //-----------------------------------------------------------------------------
204 
205 already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
206     const char* aName, nsIInputStreamCallback* aCallback,
207     nsIEventTarget* aTarget, uint32_t aPriority) {
208   NS_ASSERTION(aCallback, "null callback");
209   NS_ASSERTION(aTarget, "null target");
210   RefPtr<nsInputStreamReadyEvent> ev =
211       new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority);
212   return ev.forget();
213 }
214 
NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)215 already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
216     nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
217   NS_ASSERTION(aCallback, "null callback");
218   NS_ASSERTION(aTarget, "null target");
219   RefPtr<nsOutputStreamReadyEvent> ev =
220       new nsOutputStreamReadyEvent(aCallback, aTarget);
221   return ev.forget();
222 }
223 
224 //-----------------------------------------------------------------------------
225 // NS_AsyncCopy implementation
226 
227 // abstract stream copier...
228 class nsAStreamCopier : public nsIInputStreamCallback,
229                         public nsIOutputStreamCallback,
230                         public CancelableRunnable {
231  public:
232   NS_DECL_ISUPPORTS_INHERITED
233 
nsAStreamCopier()234   nsAStreamCopier()
235       : CancelableRunnable("nsAStreamCopier"),
236         mLock("nsAStreamCopier.mLock"),
237         mCallback(nullptr),
238         mProgressCallback(nullptr),
239         mClosure(nullptr),
240         mChunkSize(0),
241         mEventInProcess(false),
242         mEventIsPending(false),
243         mCloseSource(true),
244         mCloseSink(true),
245         mCanceled(false),
246         mCancelStatus(NS_OK) {}
247 
248   // kick off the async copy...
Start(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyCallbackFun aCallback,void * aClosure,uint32_t aChunksize,bool aCloseSource,bool aCloseSink,nsAsyncCopyProgressFun aProgressCallback)249   nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
250                  nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
251                  void* aClosure, uint32_t aChunksize, bool aCloseSource,
252                  bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
253     mSource = aSource;
254     mSink = aSink;
255     mTarget = aTarget;
256     mCallback = aCallback;
257     mClosure = aClosure;
258     mChunkSize = aChunksize;
259     mCloseSource = aCloseSource;
260     mCloseSink = aCloseSink;
261     mProgressCallback = aProgressCallback;
262 
263     mAsyncSource = do_QueryInterface(mSource);
264     mAsyncSink = do_QueryInterface(mSink);
265 
266     return PostContinuationEvent();
267   }
268 
269   // implemented by subclasses, returns number of bytes copied and
270   // sets source and sink condition before returning.
271   virtual uint32_t DoCopy(nsresult* aSourceCondition,
272                           nsresult* aSinkCondition) = 0;
273 
Process()274   void Process() {
275     if (!mSource || !mSink) {
276       return;
277     }
278 
279     nsresult cancelStatus;
280     bool canceled;
281     {
282       MutexAutoLock lock(mLock);
283       canceled = mCanceled;
284       cancelStatus = mCancelStatus;
285     }
286 
287     // If the copy was canceled before Process() was even called, then
288     // sourceCondition and sinkCondition should be set to error results to
289     // ensure we don't call Finish() on a canceled nsISafeOutputStream.
290     MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
291     nsresult sourceCondition = cancelStatus;
292     nsresult sinkCondition = cancelStatus;
293 
294     // Copy data from the source to the sink until we hit failure or have
295     // copied all the data.
296     for (;;) {
297       // Note: copyFailed will be true if the source or the sink have
298       //       reported an error, or if we failed to write any bytes
299       //       because we have consumed all of our data.
300       bool copyFailed = false;
301       if (!canceled) {
302         uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
303         if (n > 0 && mProgressCallback) {
304           mProgressCallback(mClosure, n);
305         }
306         copyFailed =
307             NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
308 
309         MutexAutoLock lock(mLock);
310         canceled = mCanceled;
311         cancelStatus = mCancelStatus;
312       }
313       if (copyFailed && !canceled) {
314         if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
315           // need to wait for more data from source.  while waiting for
316           // more source data, be sure to observe failures on output end.
317           mAsyncSource->AsyncWait(this, 0, 0, nullptr);
318 
319           if (mAsyncSink)
320             mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
321                                   0, nullptr);
322           break;
323         } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
324           // need to wait for more room in the sink.  while waiting for
325           // more room in the sink, be sure to observer failures on the
326           // input end.
327           mAsyncSink->AsyncWait(this, 0, 0, nullptr);
328 
329           if (mAsyncSource)
330             mAsyncSource->AsyncWait(
331                 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
332           break;
333         }
334       }
335       if (copyFailed || canceled) {
336         if (mCloseSource) {
337           // close source
338           if (mAsyncSource)
339             mAsyncSource->CloseWithStatus(canceled ? cancelStatus
340                                                    : sinkCondition);
341           else {
342             mSource->Close();
343           }
344         }
345         mAsyncSource = nullptr;
346         mSource = nullptr;
347 
348         if (mCloseSink) {
349           // close sink
350           if (mAsyncSink)
351             mAsyncSink->CloseWithStatus(canceled ? cancelStatus
352                                                  : sourceCondition);
353           else {
354             // If we have an nsISafeOutputStream, and our
355             // sourceCondition and sinkCondition are not set to a
356             // failure state, finish writing.
357             nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
358             if (sostream && NS_SUCCEEDED(sourceCondition) &&
359                 NS_SUCCEEDED(sinkCondition)) {
360               sostream->Finish();
361             } else {
362               mSink->Close();
363             }
364           }
365         }
366         mAsyncSink = nullptr;
367         mSink = nullptr;
368 
369         // notify state complete...
370         if (mCallback) {
371           nsresult status;
372           if (!canceled) {
373             status = sourceCondition;
374             if (NS_SUCCEEDED(status)) {
375               status = sinkCondition;
376             }
377             if (status == NS_BASE_STREAM_CLOSED) {
378               status = NS_OK;
379             }
380           } else {
381             status = cancelStatus;
382           }
383           mCallback(mClosure, status);
384         }
385         break;
386       }
387     }
388   }
389 
Cancel(nsresult aReason)390   nsresult Cancel(nsresult aReason) {
391     MutexAutoLock lock(mLock);
392     if (mCanceled) {
393       return NS_ERROR_FAILURE;
394     }
395 
396     if (NS_SUCCEEDED(aReason)) {
397       NS_WARNING("cancel with non-failure status code");
398       aReason = NS_BASE_STREAM_CLOSED;
399     }
400 
401     mCanceled = true;
402     mCancelStatus = aReason;
403     return NS_OK;
404   }
405 
OnInputStreamReady(nsIAsyncInputStream * aSource)406   NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
407     PostContinuationEvent();
408     return NS_OK;
409   }
410 
OnOutputStreamReady(nsIAsyncOutputStream * aSink)411   NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
412     PostContinuationEvent();
413     return NS_OK;
414   }
415 
416   // continuation event handler
Run()417   NS_IMETHOD Run() override {
418     Process();
419 
420     // clear "in process" flag and post any pending continuation event
421     MutexAutoLock lock(mLock);
422     mEventInProcess = false;
423     if (mEventIsPending) {
424       mEventIsPending = false;
425       PostContinuationEvent_Locked();
426     }
427 
428     return NS_OK;
429   }
430 
431   nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
432 
PostContinuationEvent()433   nsresult PostContinuationEvent() {
434     // we cannot post a continuation event if there is currently
435     // an event in process.  doing so could result in Process being
436     // run simultaneously on multiple threads, so we mark the event
437     // as pending, and if an event is already in process then we
438     // just let that existing event take care of posting the real
439     // continuation event.
440 
441     MutexAutoLock lock(mLock);
442     return PostContinuationEvent_Locked();
443   }
444 
PostContinuationEvent_Locked()445   nsresult PostContinuationEvent_Locked() {
446     nsresult rv = NS_OK;
447     if (mEventInProcess) {
448       mEventIsPending = true;
449     } else {
450       rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
451       if (NS_SUCCEEDED(rv)) {
452         mEventInProcess = true;
453       } else {
454         NS_WARNING("unable to post continuation event");
455       }
456     }
457     return rv;
458   }
459 
460  protected:
461   nsCOMPtr<nsIInputStream> mSource;
462   nsCOMPtr<nsIOutputStream> mSink;
463   nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
464   nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
465   nsCOMPtr<nsIEventTarget> mTarget;
466   Mutex mLock;
467   nsAsyncCopyCallbackFun mCallback;
468   nsAsyncCopyProgressFun mProgressCallback;
469   void* mClosure;
470   uint32_t mChunkSize;
471   bool mEventInProcess;
472   bool mEventIsPending;
473   bool mCloseSource;
474   bool mCloseSink;
475   bool mCanceled;
476   nsresult mCancelStatus;
477 
478   // virtual since subclasses call superclass Release()
479   virtual ~nsAStreamCopier() = default;
480 };
481 
482 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
483                             nsIInputStreamCallback, nsIOutputStreamCallback)
484 
485 class nsStreamCopierIB final : public nsAStreamCopier {
486  public:
nsStreamCopierIB()487   nsStreamCopierIB() : nsAStreamCopier() {}
488   virtual ~nsStreamCopierIB() = default;
489 
490   struct MOZ_STACK_CLASS ReadSegmentsState {
491     // the nsIOutputStream will outlive the ReadSegmentsState on the stack
492     nsIOutputStream* MOZ_NON_OWNING_REF mSink;
493     nsresult mSinkCondition;
494   };
495 
ConsumeInputBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)496   static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
497                                      const char* aBuffer, uint32_t aOffset,
498                                      uint32_t aCount, uint32_t* aCountWritten) {
499     ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
500 
501     nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
502     if (NS_FAILED(rv)) {
503       state->mSinkCondition = rv;
504     } else if (*aCountWritten == 0) {
505       state->mSinkCondition = NS_BASE_STREAM_CLOSED;
506     }
507 
508     return state->mSinkCondition;
509   }
510 
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)511   uint32_t DoCopy(nsresult* aSourceCondition,
512                   nsresult* aSinkCondition) override {
513     ReadSegmentsState state;
514     state.mSink = mSink;
515     state.mSinkCondition = NS_OK;
516 
517     uint32_t n;
518     *aSourceCondition =
519         mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
520     *aSinkCondition = state.mSinkCondition;
521     return n;
522   }
523 
Cancel()524   nsresult Cancel() override { return NS_OK; }
525 };
526 
527 class nsStreamCopierOB final : public nsAStreamCopier {
528  public:
nsStreamCopierOB()529   nsStreamCopierOB() : nsAStreamCopier() {}
530   virtual ~nsStreamCopierOB() = default;
531 
532   struct MOZ_STACK_CLASS WriteSegmentsState {
533     // the nsIInputStream will outlive the WriteSegmentsState on the stack
534     nsIInputStream* MOZ_NON_OWNING_REF mSource;
535     nsresult mSourceCondition;
536   };
537 
FillOutputBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)538   static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
539                                    char* aBuffer, uint32_t aOffset,
540                                    uint32_t aCount, uint32_t* aCountRead) {
541     WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
542 
543     nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
544     if (NS_FAILED(rv)) {
545       state->mSourceCondition = rv;
546     } else if (*aCountRead == 0) {
547       state->mSourceCondition = NS_BASE_STREAM_CLOSED;
548     }
549 
550     return state->mSourceCondition;
551   }
552 
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)553   uint32_t DoCopy(nsresult* aSourceCondition,
554                   nsresult* aSinkCondition) override {
555     WriteSegmentsState state;
556     state.mSource = mSource;
557     state.mSourceCondition = NS_OK;
558 
559     uint32_t n;
560     *aSinkCondition =
561         mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
562     *aSourceCondition = state.mSourceCondition;
563     return n;
564   }
565 
Cancel()566   nsresult Cancel() override { return NS_OK; }
567 };
568 
569 //-----------------------------------------------------------------------------
570 
NS_AsyncCopy(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyMode aMode,uint32_t aChunkSize,nsAsyncCopyCallbackFun aCallback,void * aClosure,bool aCloseSource,bool aCloseSink,nsISupports ** aCopierCtx,nsAsyncCopyProgressFun aProgressCallback)571 nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
572                       nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
573                       uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
574                       void* aClosure, bool aCloseSource, bool aCloseSink,
575                       nsISupports** aCopierCtx,
576                       nsAsyncCopyProgressFun aProgressCallback) {
577   NS_ASSERTION(aTarget, "non-null target required");
578 
579   nsresult rv;
580   nsAStreamCopier* copier;
581 
582   if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
583     copier = new nsStreamCopierIB();
584   } else {
585     copier = new nsStreamCopierOB();
586   }
587 
588   // Start() takes an owning ref to the copier...
589   NS_ADDREF(copier);
590   rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
591                      aCloseSource, aCloseSink, aProgressCallback);
592 
593   if (aCopierCtx) {
594     *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
595     NS_ADDREF(*aCopierCtx);
596   }
597   NS_RELEASE(copier);
598 
599   return rv;
600 }
601 
602 //-----------------------------------------------------------------------------
603 
NS_CancelAsyncCopy(nsISupports * aCopierCtx,nsresult aReason)604 nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
605   nsAStreamCopier* copier =
606       static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
607   return copier->Cancel(aReason);
608 }
609 
610 //-----------------------------------------------------------------------------
611 
612 namespace {
613 template <typename T>
614 struct ResultTraits {};
615 
616 template <>
617 struct ResultTraits<nsACString> {
Clear__anonae98e3570111::ResultTraits618   static void Clear(nsACString& aString) { aString.Truncate(); }
619 
GetStorage__anonae98e3570111::ResultTraits620   static char* GetStorage(nsACString& aString) {
621     return aString.BeginWriting();
622   }
623 };
624 
625 template <>
626 struct ResultTraits<nsTArray<uint8_t>> {
Clear__anonae98e3570111::ResultTraits627   static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); }
628 
GetStorage__anonae98e3570111::ResultTraits629   static char* GetStorage(nsTArray<uint8_t>& aArray) {
630     return reinterpret_cast<char*>(aArray.Elements());
631   }
632 };
633 }  // namespace
634 
635 template <typename T>
DoConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,T & aResult)636 nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
637                          T& aResult) {
638   nsresult rv = NS_OK;
639   ResultTraits<T>::Clear(aResult);
640 
641   while (aMaxCount) {
642     uint64_t avail64;
643     rv = aStream->Available(&avail64);
644     if (NS_FAILED(rv)) {
645       if (rv == NS_BASE_STREAM_CLOSED) {
646         rv = NS_OK;
647       }
648       break;
649     }
650     if (avail64 == 0) {
651       break;
652     }
653 
654     uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
655 
656     // resize aResult buffer
657     uint32_t length = aResult.Length();
658     CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail;
659     if (!newLength.isValid()) {
660       return NS_ERROR_FILE_TOO_BIG;
661     }
662 
663     if (!aResult.SetLength(newLength.value(), fallible)) {
664       return NS_ERROR_OUT_OF_MEMORY;
665     }
666     char* buf = ResultTraits<T>::GetStorage(aResult) + length;
667 
668     uint32_t n;
669     rv = aStream->Read(buf, avail, &n);
670     if (NS_FAILED(rv)) {
671       break;
672     }
673     if (n != avail) {
674       MOZ_ASSERT(n < avail, "What happened there???");
675       aResult.SetLength(length + n);
676     }
677     if (n == 0) {
678       break;
679     }
680     aMaxCount -= n;
681   }
682 
683   return rv;
684 }
685 
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsACString & aResult)686 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
687                           nsACString& aResult) {
688   return DoConsumeStream(aStream, aMaxCount, aResult);
689 }
690 
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsTArray<uint8_t> & aResult)691 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
692                           nsTArray<uint8_t>& aResult) {
693   return DoConsumeStream(aStream, aMaxCount, aResult);
694 }
695 
696 //-----------------------------------------------------------------------------
697 
TestInputStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)698 static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
699                                 const char* aBuffer, uint32_t aOffset,
700                                 uint32_t aCount, uint32_t* aCountWritten) {
701   bool* result = static_cast<bool*>(aClosure);
702   *result = true;
703   *aCountWritten = 0;
704   return NS_ERROR_ABORT;  // don't call me anymore
705 }
706 
NS_InputStreamIsBuffered(nsIInputStream * aStream)707 bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
708   nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
709   if (bufferedIn) {
710     return true;
711   }
712 
713   bool result = false;
714   uint32_t n;
715   nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
716   return result || rv != NS_ERROR_NOT_IMPLEMENTED;
717 }
718 
TestOutputStream(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)719 static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
720                                  char* aBuffer, uint32_t aOffset,
721                                  uint32_t aCount, uint32_t* aCountRead) {
722   bool* result = static_cast<bool*>(aClosure);
723   *result = true;
724   *aCountRead = 0;
725   return NS_ERROR_ABORT;  // don't call me anymore
726 }
727 
NS_OutputStreamIsBuffered(nsIOutputStream * aStream)728 bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
729   nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
730   if (bufferedOut) {
731     return true;
732   }
733 
734   bool result = false;
735   uint32_t n;
736   aStream->WriteSegments(TestOutputStream, &result, 1, &n);
737   return result;
738 }
739 
740 //-----------------------------------------------------------------------------
741 
NS_CopySegmentToStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)742 nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
743                                 const char* aBuffer, uint32_t aOffset,
744                                 uint32_t aCount, uint32_t* aCountWritten) {
745   nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
746   *aCountWritten = 0;
747   while (aCount) {
748     uint32_t n;
749     nsresult rv = outStr->Write(aBuffer, aCount, &n);
750     if (NS_FAILED(rv)) {
751       return rv;
752     }
753     aBuffer += n;
754     aCount -= n;
755     *aCountWritten += n;
756   }
757   return NS_OK;
758 }
759 
NS_CopySegmentToBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)760 nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
761                                 const char* aBuffer, uint32_t aOffset,
762                                 uint32_t aCount, uint32_t* aCountWritten) {
763   char* toBuf = static_cast<char*>(aClosure);
764   memcpy(&toBuf[aOffset], aBuffer, aCount);
765   *aCountWritten = aCount;
766   return NS_OK;
767 }
768 
NS_CopySegmentToBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)769 nsresult NS_CopySegmentToBuffer(nsIOutputStream* aOutStr, void* aClosure,
770                                 char* aBuffer, uint32_t aOffset,
771                                 uint32_t aCount, uint32_t* aCountRead) {
772   const char* fromBuf = static_cast<const char*>(aClosure);
773   memcpy(aBuffer, &fromBuf[aOffset], aCount);
774   *aCountRead = aCount;
775   return NS_OK;
776 }
777 
NS_DiscardSegment(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)778 nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
779                            const char* aBuffer, uint32_t aOffset,
780                            uint32_t aCount, uint32_t* aCountWritten) {
781   *aCountWritten = aCount;
782   return NS_OK;
783 }
784 
785 //-----------------------------------------------------------------------------
786 
NS_WriteSegmentThunk(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)787 nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
788                               const char* aBuffer, uint32_t aOffset,
789                               uint32_t aCount, uint32_t* aCountWritten) {
790   nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
791   return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
792                      aCountWritten);
793 }
794 
NS_FillArray(FallibleTArray<char> & aDest,nsIInputStream * aInput,uint32_t aKeep,uint32_t * aNewBytes)795 nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
796                       uint32_t aKeep, uint32_t* aNewBytes) {
797   MOZ_ASSERT(aInput, "null stream");
798   MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
799 
800   char* aBuffer = aDest.Elements();
801   int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
802   if (aKeep != 0 && keepOffset > 0) {
803     memmove(aBuffer, aBuffer + keepOffset, aKeep);
804   }
805 
806   nsresult rv =
807       aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
808   if (NS_FAILED(rv)) {
809     *aNewBytes = 0;
810   }
811   // NOTE: we rely on the fact that the new slots are NOT initialized by
812   // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
813   // in nsTArray.h:
814   aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
815 
816   MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
817   return rv;
818 }
819 
NS_InputStreamIsCloneable(nsIInputStream * aSource)820 bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
821   if (!aSource) {
822     return false;
823   }
824 
825   nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
826   return cloneable && cloneable->GetCloneable();
827 }
828 
NS_CloneInputStream(nsIInputStream * aSource,nsIInputStream ** aCloneOut,nsIInputStream ** aReplacementOut)829 nsresult NS_CloneInputStream(nsIInputStream* aSource,
830                              nsIInputStream** aCloneOut,
831                              nsIInputStream** aReplacementOut) {
832   if (NS_WARN_IF(!aSource)) {
833     return NS_ERROR_FAILURE;
834   }
835 
836   // Attempt to perform the clone directly on the source stream
837   nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
838   if (cloneable && cloneable->GetCloneable()) {
839     if (aReplacementOut) {
840       *aReplacementOut = nullptr;
841     }
842     return cloneable->Clone(aCloneOut);
843   }
844 
845   // If we failed the clone and the caller does not want to replace their
846   // original stream, then we are done.  Return error.
847   if (!aReplacementOut) {
848     return NS_ERROR_FAILURE;
849   }
850 
851   // The caller has opted-in to the fallback clone support that replaces
852   // the original stream.  Copy the data to a pipe and return two cloned
853   // input streams.
854 
855   nsCOMPtr<nsIInputStream> reader;
856   nsCOMPtr<nsIInputStream> readerClone;
857   nsCOMPtr<nsIOutputStream> writer;
858 
859   nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
860                            0,            // default segment size and max size
861                            true, true);  // non-blocking
862   if (NS_WARN_IF(NS_FAILED(rv))) {
863     return rv;
864   }
865 
866   cloneable = do_QueryInterface(reader);
867   MOZ_ASSERT(cloneable && cloneable->GetCloneable());
868 
869   rv = cloneable->Clone(getter_AddRefs(readerClone));
870   if (NS_WARN_IF(NS_FAILED(rv))) {
871     return rv;
872   }
873 
874   nsCOMPtr<nsIEventTarget> target =
875       do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
876   if (NS_WARN_IF(NS_FAILED(rv))) {
877     return rv;
878   }
879 
880   rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
881   if (NS_WARN_IF(NS_FAILED(rv))) {
882     return rv;
883   }
884 
885   readerClone.forget(aCloneOut);
886   reader.forget(aReplacementOut);
887 
888   return NS_OK;
889 }
890 
NS_MakeAsyncNonBlockingInputStream(already_AddRefed<nsIInputStream> aSource,nsIAsyncInputStream ** aAsyncInputStream,bool aCloseWhenDone,uint32_t aFlags,uint32_t aSegmentSize,uint32_t aSegmentCount)891 nsresult NS_MakeAsyncNonBlockingInputStream(
892     already_AddRefed<nsIInputStream> aSource,
893     nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone,
894     uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) {
895   nsCOMPtr<nsIInputStream> source = std::move(aSource);
896   if (NS_WARN_IF(!aAsyncInputStream)) {
897     return NS_ERROR_FAILURE;
898   }
899 
900   bool nonBlocking = false;
901   nsresult rv = source->IsNonBlocking(&nonBlocking);
902   if (NS_WARN_IF(NS_FAILED(rv))) {
903     return rv;
904   }
905 
906   nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
907 
908   if (nonBlocking && asyncStream) {
909     // This stream is perfect!
910     asyncStream.forget(aAsyncInputStream);
911     return NS_OK;
912   }
913 
914   if (nonBlocking) {
915     // If the stream is non-blocking but not async, we wrap it.
916     return NonBlockingAsyncInputStream::Create(source.forget(),
917                                                aAsyncInputStream);
918   }
919 
920   nsCOMPtr<nsIStreamTransportService> sts =
921       do_GetService(kStreamTransportServiceCID, &rv);
922   if (NS_WARN_IF(NS_FAILED(rv))) {
923     return rv;
924   }
925 
926   nsCOMPtr<nsITransport> transport;
927   rv = sts->CreateInputTransport(source, aCloseWhenDone,
928                                  getter_AddRefs(transport));
929   if (NS_WARN_IF(NS_FAILED(rv))) {
930     return rv;
931   }
932 
933   nsCOMPtr<nsIInputStream> wrapper;
934   rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount,
935                                   getter_AddRefs(wrapper));
936   if (NS_WARN_IF(NS_FAILED(rv))) {
937     return rv;
938   }
939 
940   asyncStream = do_QueryInterface(wrapper);
941   MOZ_ASSERT(asyncStream);
942 
943   asyncStream.forget(aAsyncInputStream);
944   return NS_OK;
945 }
946