1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsAutoPtr.h"
11 #include "nsCOMPtr.h"
12 #include "nsIPipe.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIEventTarget.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsISafeOutputStream.h"
17 #include "nsString.h"
18 #include "nsIAsyncInputStream.h"
19 #include "nsIAsyncOutputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsNetCID.h"
22 #include "nsServiceManagerUtils.h"
23 #include "nsThreadUtils.h"
24 
25 using namespace mozilla;
26 
27 //-----------------------------------------------------------------------------
28 
29 // This is a nsICancelableRunnable because we can dispatch it to Workers and
30 // those can be shut down at any time, and in these cases, Cancel() is called
31 // instead of Run().
32 class nsInputStreamReadyEvent final
33   : public CancelableRunnable
34   , public nsIInputStreamCallback
35 {
36 public:
37   NS_DECL_ISUPPORTS_INHERITED
38 
nsInputStreamReadyEvent(nsIInputStreamCallback * aCallback,nsIEventTarget * aTarget)39   nsInputStreamReadyEvent(nsIInputStreamCallback* aCallback,
40                           nsIEventTarget* aTarget)
41     : mCallback(aCallback)
42     , mTarget(aTarget)
43   {
44   }
45 
46 private:
~nsInputStreamReadyEvent()47   ~nsInputStreamReadyEvent()
48   {
49     if (!mCallback) {
50       return;
51     }
52     //
53     // whoa!!  looks like we never posted this event.  take care to
54     // release mCallback on the correct thread.  if mTarget lives on the
55     // calling thread, then we are ok.  otherwise, we have to try to
56     // proxy the Release over the right thread.  if that thread is dead,
57     // then there's nothing we can do... better to leak than crash.
58     //
59     bool val;
60     nsresult rv = mTarget->IsOnCurrentThread(&val);
61     if (NS_FAILED(rv) || !val) {
62       nsCOMPtr<nsIInputStreamCallback> event =
63         NS_NewInputStreamReadyEvent(mCallback, mTarget);
64       mCallback = nullptr;
65       if (event) {
66         rv = event->OnInputStreamReady(nullptr);
67         if (NS_FAILED(rv)) {
68           NS_NOTREACHED("leaking stream event");
69           nsISupports* sup = event;
70           NS_ADDREF(sup);
71         }
72       }
73     }
74   }
75 
76 public:
OnInputStreamReady(nsIAsyncInputStream * aStream)77   NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override
78   {
79     mStream = aStream;
80 
81     nsresult rv =
82       mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
83     if (NS_FAILED(rv)) {
84       NS_WARNING("Dispatch failed");
85       return NS_ERROR_FAILURE;
86     }
87 
88     return NS_OK;
89   }
90 
Run()91   NS_IMETHOD Run() override
92   {
93     if (mCallback) {
94       if (mStream) {
95         mCallback->OnInputStreamReady(mStream);
96       }
97       mCallback = nullptr;
98     }
99     return NS_OK;
100   }
101 
Cancel()102   nsresult Cancel() override
103   {
104     mCallback = nullptr;
105     return NS_OK;
106   }
107 
108 private:
109   nsCOMPtr<nsIAsyncInputStream>    mStream;
110   nsCOMPtr<nsIInputStreamCallback> mCallback;
111   nsCOMPtr<nsIEventTarget>         mTarget;
112 };
113 
114 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
115                             nsIInputStreamCallback)
116 
117 //-----------------------------------------------------------------------------
118 
119 // This is a nsICancelableRunnable because we can dispatch it to Workers and
120 // those can be shut down at any time, and in these cases, Cancel() is called
121 // instead of Run().
122 class nsOutputStreamReadyEvent final
123   : public CancelableRunnable
124   , public nsIOutputStreamCallback
125 {
126 public:
127   NS_DECL_ISUPPORTS_INHERITED
128 
nsOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)129   nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
130                            nsIEventTarget* aTarget)
131     : mCallback(aCallback)
132     , mTarget(aTarget)
133   {
134   }
135 
136 private:
~nsOutputStreamReadyEvent()137   ~nsOutputStreamReadyEvent()
138   {
139     if (!mCallback) {
140       return;
141     }
142     //
143     // whoa!!  looks like we never posted this event.  take care to
144     // release mCallback on the correct thread.  if mTarget lives on the
145     // calling thread, then we are ok.  otherwise, we have to try to
146     // proxy the Release over the right thread.  if that thread is dead,
147     // then there's nothing we can do... better to leak than crash.
148     //
149     bool val;
150     nsresult rv = mTarget->IsOnCurrentThread(&val);
151     if (NS_FAILED(rv) || !val) {
152       nsCOMPtr<nsIOutputStreamCallback> event =
153         NS_NewOutputStreamReadyEvent(mCallback, mTarget);
154       mCallback = nullptr;
155       if (event) {
156         rv = event->OnOutputStreamReady(nullptr);
157         if (NS_FAILED(rv)) {
158           NS_NOTREACHED("leaking stream event");
159           nsISupports* sup = event;
160           NS_ADDREF(sup);
161         }
162       }
163     }
164   }
165 
166 public:
OnOutputStreamReady(nsIAsyncOutputStream * aStream)167   NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override
168   {
169     mStream = aStream;
170 
171     nsresult rv =
172       mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
173     if (NS_FAILED(rv)) {
174       NS_WARNING("PostEvent failed");
175       return NS_ERROR_FAILURE;
176     }
177 
178     return NS_OK;
179   }
180 
Run()181   NS_IMETHOD Run() override
182   {
183     if (mCallback) {
184       if (mStream) {
185         mCallback->OnOutputStreamReady(mStream);
186       }
187       mCallback = nullptr;
188     }
189     return NS_OK;
190   }
191 
Cancel()192   nsresult Cancel() override
193   {
194     mCallback = nullptr;
195     return NS_OK;
196   }
197 
198 private:
199   nsCOMPtr<nsIAsyncOutputStream>    mStream;
200   nsCOMPtr<nsIOutputStreamCallback> mCallback;
201   nsCOMPtr<nsIEventTarget>          mTarget;
202 };
203 
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent,CancelableRunnable,nsIOutputStreamCallback)204 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
205                             nsIOutputStreamCallback)
206 
207 //-----------------------------------------------------------------------------
208 
209 already_AddRefed<nsIInputStreamCallback>
210 NS_NewInputStreamReadyEvent(nsIInputStreamCallback* aCallback,
211                             nsIEventTarget* aTarget)
212 {
213   NS_ASSERTION(aCallback, "null callback");
214   NS_ASSERTION(aTarget, "null target");
215   RefPtr<nsInputStreamReadyEvent> ev =
216     new nsInputStreamReadyEvent(aCallback, aTarget);
217   return ev.forget();
218 }
219 
220 already_AddRefed<nsIOutputStreamCallback>
NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)221 NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
222                              nsIEventTarget* aTarget)
223 {
224   NS_ASSERTION(aCallback, "null callback");
225   NS_ASSERTION(aTarget, "null target");
226   RefPtr<nsOutputStreamReadyEvent> ev =
227     new nsOutputStreamReadyEvent(aCallback, aTarget);
228   return ev.forget();
229 }
230 
231 //-----------------------------------------------------------------------------
232 // NS_AsyncCopy implementation
233 
234 // abstract stream copier...
235 class nsAStreamCopier
236   : public nsIInputStreamCallback
237   , public nsIOutputStreamCallback
238   , public CancelableRunnable
239 {
240 public:
241   NS_DECL_ISUPPORTS_INHERITED
242 
nsAStreamCopier()243   nsAStreamCopier()
244     : mLock("nsAStreamCopier.mLock")
245     , mCallback(nullptr)
246     , mProgressCallback(nullptr)
247     , mClosure(nullptr)
248     , mChunkSize(0)
249     , mEventInProcess(false)
250     , mEventIsPending(false)
251     , mCloseSource(true)
252     , mCloseSink(true)
253     , mCanceled(false)
254     , mCancelStatus(NS_OK)
255   {
256   }
257 
258   // kick off the async copy...
Start(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyCallbackFun aCallback,void * aClosure,uint32_t aChunksize,bool aCloseSource,bool aCloseSink,nsAsyncCopyProgressFun aProgressCallback)259   nsresult Start(nsIInputStream* aSource,
260                  nsIOutputStream* aSink,
261                  nsIEventTarget* aTarget,
262                  nsAsyncCopyCallbackFun aCallback,
263                  void* aClosure,
264                  uint32_t aChunksize,
265                  bool aCloseSource,
266                  bool aCloseSink,
267                  nsAsyncCopyProgressFun aProgressCallback)
268   {
269     mSource = aSource;
270     mSink = aSink;
271     mTarget = aTarget;
272     mCallback = aCallback;
273     mClosure = aClosure;
274     mChunkSize = aChunksize;
275     mCloseSource = aCloseSource;
276     mCloseSink = aCloseSink;
277     mProgressCallback = aProgressCallback;
278 
279     mAsyncSource = do_QueryInterface(mSource);
280     mAsyncSink = do_QueryInterface(mSink);
281 
282     return PostContinuationEvent();
283   }
284 
285   // implemented by subclasses, returns number of bytes copied and
286   // sets source and sink condition before returning.
287   virtual uint32_t DoCopy(nsresult* aSourceCondition,
288                           nsresult* aSinkCondition) = 0;
289 
Process()290   void Process()
291   {
292     if (!mSource || !mSink) {
293       return;
294     }
295 
296     nsresult cancelStatus;
297     bool canceled;
298     {
299       MutexAutoLock lock(mLock);
300       canceled = mCanceled;
301       cancelStatus = mCancelStatus;
302     }
303 
304     // If the copy was canceled before Process() was even called, then
305     // sourceCondition and sinkCondition should be set to error results to
306     // ensure we don't call Finish() on a canceled nsISafeOutputStream.
307     MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
308     nsresult sourceCondition = cancelStatus;
309     nsresult sinkCondition = cancelStatus;
310 
311     // Copy data from the source to the sink until we hit failure or have
312     // copied all the data.
313     for (;;) {
314       // Note: copyFailed will be true if the source or the sink have
315       //       reported an error, or if we failed to write any bytes
316       //       because we have consumed all of our data.
317       bool copyFailed = false;
318       if (!canceled) {
319         uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
320         if (n > 0 && mProgressCallback) {
321           mProgressCallback(mClosure, n);
322         }
323         copyFailed = NS_FAILED(sourceCondition) ||
324                      NS_FAILED(sinkCondition) || n == 0;
325 
326         MutexAutoLock lock(mLock);
327         canceled = mCanceled;
328         cancelStatus = mCancelStatus;
329       }
330       if (copyFailed && !canceled) {
331         if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
332           // need to wait for more data from source.  while waiting for
333           // more source data, be sure to observe failures on output end.
334           mAsyncSource->AsyncWait(this, 0, 0, nullptr);
335 
336           if (mAsyncSink)
337             mAsyncSink->AsyncWait(this,
338                                   nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
339                                   0, nullptr);
340           break;
341         } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
342           // need to wait for more room in the sink.  while waiting for
343           // more room in the sink, be sure to observer failures on the
344           // input end.
345           mAsyncSink->AsyncWait(this, 0, 0, nullptr);
346 
347           if (mAsyncSource)
348             mAsyncSource->AsyncWait(this,
349                                     nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
350                                     0, nullptr);
351           break;
352         }
353       }
354       if (copyFailed || canceled) {
355         if (mCloseSource) {
356           // close source
357           if (mAsyncSource)
358             mAsyncSource->CloseWithStatus(
359               canceled ? cancelStatus : sinkCondition);
360           else {
361             mSource->Close();
362           }
363         }
364         mAsyncSource = nullptr;
365         mSource = nullptr;
366 
367         if (mCloseSink) {
368           // close sink
369           if (mAsyncSink)
370             mAsyncSink->CloseWithStatus(
371               canceled ? cancelStatus : sourceCondition);
372           else {
373             // If we have an nsISafeOutputStream, and our
374             // sourceCondition and sinkCondition are not set to a
375             // failure state, finish writing.
376             nsCOMPtr<nsISafeOutputStream> sostream =
377               do_QueryInterface(mSink);
378             if (sostream && NS_SUCCEEDED(sourceCondition) &&
379                 NS_SUCCEEDED(sinkCondition)) {
380               sostream->Finish();
381             } else {
382               mSink->Close();
383             }
384           }
385         }
386         mAsyncSink = nullptr;
387         mSink = nullptr;
388 
389         // notify state complete...
390         if (mCallback) {
391           nsresult status;
392           if (!canceled) {
393             status = sourceCondition;
394             if (NS_SUCCEEDED(status)) {
395               status = sinkCondition;
396             }
397             if (status == NS_BASE_STREAM_CLOSED) {
398               status = NS_OK;
399             }
400           } else {
401             status = cancelStatus;
402           }
403           mCallback(mClosure, status);
404         }
405         break;
406       }
407     }
408   }
409 
Cancel(nsresult aReason)410   nsresult Cancel(nsresult aReason)
411   {
412     MutexAutoLock lock(mLock);
413     if (mCanceled) {
414       return NS_ERROR_FAILURE;
415     }
416 
417     if (NS_SUCCEEDED(aReason)) {
418       NS_WARNING("cancel with non-failure status code");
419       aReason = NS_BASE_STREAM_CLOSED;
420     }
421 
422     mCanceled = true;
423     mCancelStatus = aReason;
424     return NS_OK;
425   }
426 
OnInputStreamReady(nsIAsyncInputStream * aSource)427   NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override
428   {
429     PostContinuationEvent();
430     return NS_OK;
431   }
432 
OnOutputStreamReady(nsIAsyncOutputStream * aSink)433   NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override
434   {
435     PostContinuationEvent();
436     return NS_OK;
437   }
438 
439   // continuation event handler
Run()440   NS_IMETHOD Run() override
441   {
442     Process();
443 
444     // clear "in process" flag and post any pending continuation event
445     MutexAutoLock lock(mLock);
446     mEventInProcess = false;
447     if (mEventIsPending) {
448       mEventIsPending = false;
449       PostContinuationEvent_Locked();
450     }
451 
452     return NS_OK;
453   }
454 
455   nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
456 
PostContinuationEvent()457   nsresult PostContinuationEvent()
458   {
459     // we cannot post a continuation event if there is currently
460     // an event in process.  doing so could result in Process being
461     // run simultaneously on multiple threads, so we mark the event
462     // as pending, and if an event is already in process then we
463     // just let that existing event take care of posting the real
464     // continuation event.
465 
466     MutexAutoLock lock(mLock);
467     return PostContinuationEvent_Locked();
468   }
469 
PostContinuationEvent_Locked()470   nsresult PostContinuationEvent_Locked()
471   {
472     nsresult rv = NS_OK;
473     if (mEventInProcess) {
474       mEventIsPending = true;
475     } else {
476       rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
477       if (NS_SUCCEEDED(rv)) {
478         mEventInProcess = true;
479       } else {
480         NS_WARNING("unable to post continuation event");
481       }
482     }
483     return rv;
484   }
485 
486 protected:
487   nsCOMPtr<nsIInputStream>       mSource;
488   nsCOMPtr<nsIOutputStream>      mSink;
489   nsCOMPtr<nsIAsyncInputStream>  mAsyncSource;
490   nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
491   nsCOMPtr<nsIEventTarget>       mTarget;
492   Mutex                          mLock;
493   nsAsyncCopyCallbackFun         mCallback;
494   nsAsyncCopyProgressFun         mProgressCallback;
495   void*                          mClosure;
496   uint32_t                       mChunkSize;
497   bool                           mEventInProcess;
498   bool                           mEventIsPending;
499   bool                           mCloseSource;
500   bool                           mCloseSink;
501   bool                           mCanceled;
502   nsresult                       mCancelStatus;
503 
504   // virtual since subclasses call superclass Release()
~nsAStreamCopier()505   virtual ~nsAStreamCopier()
506   {
507   }
508 };
509 
510 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier,
511                             CancelableRunnable,
512                             nsIInputStreamCallback,
513                             nsIOutputStreamCallback)
514 
515 class nsStreamCopierIB final : public nsAStreamCopier
516 {
517 public:
nsStreamCopierIB()518   nsStreamCopierIB() : nsAStreamCopier()
519   {
520   }
~nsStreamCopierIB()521   virtual ~nsStreamCopierIB()
522   {
523   }
524 
525   struct MOZ_STACK_CLASS ReadSegmentsState
526   {
527     // the nsIOutputStream will outlive the ReadSegmentsState on the stack
528     nsIOutputStream* MOZ_NON_OWNING_REF mSink;
529     nsresult         mSinkCondition;
530   };
531 
ConsumeInputBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)532   static nsresult ConsumeInputBuffer(nsIInputStream* aInStr,
533                                      void* aClosure,
534                                      const char* aBuffer,
535                                      uint32_t aOffset,
536                                      uint32_t aCount,
537                                      uint32_t* aCountWritten)
538   {
539     ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
540 
541     nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
542     if (NS_FAILED(rv)) {
543       state->mSinkCondition = rv;
544     } else if (*aCountWritten == 0) {
545       state->mSinkCondition = NS_BASE_STREAM_CLOSED;
546     }
547 
548     return state->mSinkCondition;
549   }
550 
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)551   uint32_t DoCopy(nsresult* aSourceCondition,
552                   nsresult* aSinkCondition) override
553   {
554     ReadSegmentsState state;
555     state.mSink = mSink;
556     state.mSinkCondition = NS_OK;
557 
558     uint32_t n;
559     *aSourceCondition =
560       mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
561     *aSinkCondition = state.mSinkCondition;
562     return n;
563   }
564 
Cancel()565   nsresult Cancel() override
566   {
567     return NS_OK;
568   }
569 };
570 
571 class nsStreamCopierOB final : public nsAStreamCopier
572 {
573 public:
nsStreamCopierOB()574   nsStreamCopierOB() : nsAStreamCopier()
575   {
576   }
~nsStreamCopierOB()577   virtual ~nsStreamCopierOB()
578   {
579   }
580 
581   struct MOZ_STACK_CLASS WriteSegmentsState
582   {
583     // the nsIInputStream will outlive the WriteSegmentsState on the stack
584     nsIInputStream* MOZ_NON_OWNING_REF mSource;
585     nsresult        mSourceCondition;
586   };
587 
FillOutputBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)588   static nsresult FillOutputBuffer(nsIOutputStream* aOutStr,
589                                    void* aClosure,
590                                    char* aBuffer,
591                                    uint32_t aOffset,
592                                    uint32_t aCount,
593                                    uint32_t* aCountRead)
594   {
595     WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
596 
597     nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
598     if (NS_FAILED(rv)) {
599       state->mSourceCondition = rv;
600     } else if (*aCountRead == 0) {
601       state->mSourceCondition = NS_BASE_STREAM_CLOSED;
602     }
603 
604     return state->mSourceCondition;
605   }
606 
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)607   uint32_t DoCopy(nsresult* aSourceCondition,
608                   nsresult* aSinkCondition) override
609   {
610     WriteSegmentsState state;
611     state.mSource = mSource;
612     state.mSourceCondition = NS_OK;
613 
614     uint32_t n;
615     *aSinkCondition =
616       mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
617     *aSourceCondition = state.mSourceCondition;
618     return n;
619   }
620 
Cancel()621   nsresult Cancel() override
622   {
623     return NS_OK;
624   }
625 };
626 
627 //-----------------------------------------------------------------------------
628 
629 nsresult
NS_AsyncCopy(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyMode aMode,uint32_t aChunkSize,nsAsyncCopyCallbackFun aCallback,void * aClosure,bool aCloseSource,bool aCloseSink,nsISupports ** aCopierCtx,nsAsyncCopyProgressFun aProgressCallback)630 NS_AsyncCopy(nsIInputStream*         aSource,
631              nsIOutputStream*        aSink,
632              nsIEventTarget*         aTarget,
633              nsAsyncCopyMode         aMode,
634              uint32_t                aChunkSize,
635              nsAsyncCopyCallbackFun  aCallback,
636              void*                   aClosure,
637              bool                    aCloseSource,
638              bool                    aCloseSink,
639              nsISupports**           aCopierCtx,
640              nsAsyncCopyProgressFun  aProgressCallback)
641 {
642   NS_ASSERTION(aTarget, "non-null target required");
643 
644   nsresult rv;
645   nsAStreamCopier* copier;
646 
647   if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
648     copier = new nsStreamCopierIB();
649   } else {
650     copier = new nsStreamCopierOB();
651   }
652 
653   // Start() takes an owning ref to the copier...
654   NS_ADDREF(copier);
655   rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
656                      aCloseSource, aCloseSink, aProgressCallback);
657 
658   if (aCopierCtx) {
659     *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
660     NS_ADDREF(*aCopierCtx);
661   }
662   NS_RELEASE(copier);
663 
664   return rv;
665 }
666 
667 //-----------------------------------------------------------------------------
668 
669 nsresult
NS_CancelAsyncCopy(nsISupports * aCopierCtx,nsresult aReason)670 NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason)
671 {
672   nsAStreamCopier* copier =
673     static_cast<nsAStreamCopier*>(static_cast<nsIRunnable *>(aCopierCtx));
674   return copier->Cancel(aReason);
675 }
676 
677 //-----------------------------------------------------------------------------
678 
679 nsresult
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsACString & aResult)680 NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
681                  nsACString& aResult)
682 {
683   nsresult rv = NS_OK;
684   aResult.Truncate();
685 
686   while (aMaxCount) {
687     uint64_t avail64;
688     rv = aStream->Available(&avail64);
689     if (NS_FAILED(rv)) {
690       if (rv == NS_BASE_STREAM_CLOSED) {
691         rv = NS_OK;
692       }
693       break;
694     }
695     if (avail64 == 0) {
696       break;
697     }
698 
699     uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
700 
701     // resize aResult buffer
702     uint32_t length = aResult.Length();
703     if (avail > UINT32_MAX - length) {
704       return NS_ERROR_FILE_TOO_BIG;
705     }
706 
707     aResult.SetLength(length + avail);
708     if (aResult.Length() != (length + avail)) {
709       return NS_ERROR_OUT_OF_MEMORY;
710     }
711     char* buf = aResult.BeginWriting() + length;
712 
713     uint32_t n;
714     rv = aStream->Read(buf, avail, &n);
715     if (NS_FAILED(rv)) {
716       break;
717     }
718     if (n != avail) {
719       aResult.SetLength(length + n);
720     }
721     if (n == 0) {
722       break;
723     }
724     aMaxCount -= n;
725   }
726 
727   return rv;
728 }
729 
730 //-----------------------------------------------------------------------------
731 
732 static nsresult
TestInputStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)733 TestInputStream(nsIInputStream* aInStr,
734                 void* aClosure,
735                 const char* aBuffer,
736                 uint32_t aOffset,
737                 uint32_t aCount,
738                 uint32_t* aCountWritten)
739 {
740   bool* result = static_cast<bool*>(aClosure);
741   *result = true;
742   return NS_ERROR_ABORT;  // don't call me anymore
743 }
744 
745 bool
NS_InputStreamIsBuffered(nsIInputStream * aStream)746 NS_InputStreamIsBuffered(nsIInputStream* aStream)
747 {
748   nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
749   if (bufferedIn) {
750     return true;
751   }
752 
753   bool result = false;
754   uint32_t n;
755   nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
756   return result || NS_SUCCEEDED(rv);
757 }
758 
759 static nsresult
TestOutputStream(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)760 TestOutputStream(nsIOutputStream* aOutStr,
761                  void* aClosure,
762                  char* aBuffer,
763                  uint32_t aOffset,
764                  uint32_t aCount,
765                  uint32_t* aCountRead)
766 {
767   bool* result = static_cast<bool*>(aClosure);
768   *result = true;
769   return NS_ERROR_ABORT;  // don't call me anymore
770 }
771 
772 bool
NS_OutputStreamIsBuffered(nsIOutputStream * aStream)773 NS_OutputStreamIsBuffered(nsIOutputStream* aStream)
774 {
775   nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
776   if (bufferedOut) {
777     return true;
778   }
779 
780   bool result = false;
781   uint32_t n;
782   aStream->WriteSegments(TestOutputStream, &result, 1, &n);
783   return result;
784 }
785 
786 //-----------------------------------------------------------------------------
787 
788 nsresult
NS_CopySegmentToStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)789 NS_CopySegmentToStream(nsIInputStream* aInStr,
790                        void* aClosure,
791                        const char* aBuffer,
792                        uint32_t aOffset,
793                        uint32_t aCount,
794                        uint32_t* aCountWritten)
795 {
796   nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
797   *aCountWritten = 0;
798   while (aCount) {
799     uint32_t n;
800     nsresult rv = outStr->Write(aBuffer, aCount, &n);
801     if (NS_FAILED(rv)) {
802       return rv;
803     }
804     aBuffer += n;
805     aCount -= n;
806     *aCountWritten += n;
807   }
808   return NS_OK;
809 }
810 
811 nsresult
NS_CopySegmentToBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)812 NS_CopySegmentToBuffer(nsIInputStream* aInStr,
813                        void* aClosure,
814                        const char* aBuffer,
815                        uint32_t aOffset,
816                        uint32_t aCount,
817                        uint32_t* aCountWritten)
818 {
819   char* toBuf = static_cast<char*>(aClosure);
820   memcpy(&toBuf[aOffset], aBuffer, aCount);
821   *aCountWritten = aCount;
822   return NS_OK;
823 }
824 
825 nsresult
NS_CopySegmentToBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)826 NS_CopySegmentToBuffer(nsIOutputStream* aOutStr,
827                        void* aClosure,
828                        char* aBuffer,
829                        uint32_t aOffset,
830                        uint32_t aCount,
831                        uint32_t* aCountRead)
832 {
833   const char* fromBuf = static_cast<const char*>(aClosure);
834   memcpy(aBuffer, &fromBuf[aOffset], aCount);
835   *aCountRead = aCount;
836   return NS_OK;
837 }
838 
839 nsresult
NS_DiscardSegment(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)840 NS_DiscardSegment(nsIInputStream* aInStr,
841                   void* aClosure,
842                   const char* aBuffer,
843                   uint32_t aOffset,
844                   uint32_t aCount,
845                   uint32_t* aCountWritten)
846 {
847   *aCountWritten = aCount;
848   return NS_OK;
849 }
850 
851 //-----------------------------------------------------------------------------
852 
853 nsresult
NS_WriteSegmentThunk(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)854 NS_WriteSegmentThunk(nsIInputStream* aInStr,
855                      void* aClosure,
856                      const char* aBuffer,
857                      uint32_t aOffset,
858                      uint32_t aCount,
859                      uint32_t* aCountWritten)
860 {
861   nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
862   return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
863                      aCountWritten);
864 }
865 
866 nsresult
NS_FillArray(FallibleTArray<char> & aDest,nsIInputStream * aInput,uint32_t aKeep,uint32_t * aNewBytes)867 NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
868              uint32_t aKeep, uint32_t* aNewBytes)
869 {
870   MOZ_ASSERT(aInput, "null stream");
871   MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
872 
873   char* aBuffer = aDest.Elements();
874   int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
875   if (aKeep != 0 && keepOffset > 0) {
876     memmove(aBuffer, aBuffer + keepOffset, aKeep);
877   }
878 
879   nsresult rv =
880     aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
881   if (NS_FAILED(rv)) {
882     *aNewBytes = 0;
883   }
884   // NOTE: we rely on the fact that the new slots are NOT initialized by
885   // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
886   // in nsTArray.h:
887   aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
888 
889   MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
890   return rv;
891 }
892 
893 bool
NS_InputStreamIsCloneable(nsIInputStream * aSource)894 NS_InputStreamIsCloneable(nsIInputStream* aSource)
895 {
896   if (!aSource) {
897     return false;
898   }
899 
900   nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
901   return cloneable && cloneable->GetCloneable();
902 }
903 
904 nsresult
NS_CloneInputStream(nsIInputStream * aSource,nsIInputStream ** aCloneOut,nsIInputStream ** aReplacementOut)905 NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
906                     nsIInputStream** aReplacementOut)
907 {
908   if (NS_WARN_IF(!aSource)) {
909     return NS_ERROR_FAILURE;
910   }
911 
912   // Attempt to perform the clone directly on the source stream
913   nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
914   if (cloneable && cloneable->GetCloneable()) {
915     if (aReplacementOut) {
916       *aReplacementOut = nullptr;
917     }
918     return cloneable->Clone(aCloneOut);
919   }
920 
921   // If we failed the clone and the caller does not want to replace their
922   // original stream, then we are done.  Return error.
923   if (!aReplacementOut) {
924     return NS_ERROR_FAILURE;
925   }
926 
927   // The caller has opted-in to the fallback clone support that replaces
928   // the original stream.  Copy the data to a pipe and return two cloned
929   // input streams.
930 
931   nsCOMPtr<nsIInputStream> reader;
932   nsCOMPtr<nsIInputStream> readerClone;
933   nsCOMPtr<nsIOutputStream> writer;
934 
935   nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
936                            0, 0,        // default segment size and max size
937                            true, true); // non-blocking
938   if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
939 
940   cloneable = do_QueryInterface(reader);
941   MOZ_ASSERT(cloneable && cloneable->GetCloneable());
942 
943   rv = cloneable->Clone(getter_AddRefs(readerClone));
944   if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
945 
946   nsCOMPtr<nsIEventTarget> target =
947     do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
948   if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
949 
950   rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
951   if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
952 
953   readerClone.forget(aCloneOut);
954   reader.forget(aReplacementOut);
955 
956   return NS_OK;
957 }
958