1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsAutoPtr.h"
11 #include "nsCOMPtr.h"
12 #include "nsIPipe.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIEventTarget.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsISafeOutputStream.h"
17 #include "nsString.h"
18 #include "nsIAsyncInputStream.h"
19 #include "nsIAsyncOutputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsNetCID.h"
22 #include "nsServiceManagerUtils.h"
23 #include "nsThreadUtils.h"
24
25 using namespace mozilla;
26
27 //-----------------------------------------------------------------------------
28
29 // This is a nsICancelableRunnable because we can dispatch it to Workers and
30 // those can be shut down at any time, and in these cases, Cancel() is called
31 // instead of Run().
32 class nsInputStreamReadyEvent final
33 : public CancelableRunnable
34 , public nsIInputStreamCallback
35 {
36 public:
37 NS_DECL_ISUPPORTS_INHERITED
38
nsInputStreamReadyEvent(nsIInputStreamCallback * aCallback,nsIEventTarget * aTarget)39 nsInputStreamReadyEvent(nsIInputStreamCallback* aCallback,
40 nsIEventTarget* aTarget)
41 : mCallback(aCallback)
42 , mTarget(aTarget)
43 {
44 }
45
46 private:
~nsInputStreamReadyEvent()47 ~nsInputStreamReadyEvent()
48 {
49 if (!mCallback) {
50 return;
51 }
52 //
53 // whoa!! looks like we never posted this event. take care to
54 // release mCallback on the correct thread. if mTarget lives on the
55 // calling thread, then we are ok. otherwise, we have to try to
56 // proxy the Release over the right thread. if that thread is dead,
57 // then there's nothing we can do... better to leak than crash.
58 //
59 bool val;
60 nsresult rv = mTarget->IsOnCurrentThread(&val);
61 if (NS_FAILED(rv) || !val) {
62 nsCOMPtr<nsIInputStreamCallback> event =
63 NS_NewInputStreamReadyEvent(mCallback, mTarget);
64 mCallback = nullptr;
65 if (event) {
66 rv = event->OnInputStreamReady(nullptr);
67 if (NS_FAILED(rv)) {
68 NS_NOTREACHED("leaking stream event");
69 nsISupports* sup = event;
70 NS_ADDREF(sup);
71 }
72 }
73 }
74 }
75
76 public:
OnInputStreamReady(nsIAsyncInputStream * aStream)77 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override
78 {
79 mStream = aStream;
80
81 nsresult rv =
82 mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
83 if (NS_FAILED(rv)) {
84 NS_WARNING("Dispatch failed");
85 return NS_ERROR_FAILURE;
86 }
87
88 return NS_OK;
89 }
90
Run()91 NS_IMETHOD Run() override
92 {
93 if (mCallback) {
94 if (mStream) {
95 mCallback->OnInputStreamReady(mStream);
96 }
97 mCallback = nullptr;
98 }
99 return NS_OK;
100 }
101
Cancel()102 nsresult Cancel() override
103 {
104 mCallback = nullptr;
105 return NS_OK;
106 }
107
108 private:
109 nsCOMPtr<nsIAsyncInputStream> mStream;
110 nsCOMPtr<nsIInputStreamCallback> mCallback;
111 nsCOMPtr<nsIEventTarget> mTarget;
112 };
113
114 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
115 nsIInputStreamCallback)
116
117 //-----------------------------------------------------------------------------
118
119 // This is a nsICancelableRunnable because we can dispatch it to Workers and
120 // those can be shut down at any time, and in these cases, Cancel() is called
121 // instead of Run().
122 class nsOutputStreamReadyEvent final
123 : public CancelableRunnable
124 , public nsIOutputStreamCallback
125 {
126 public:
127 NS_DECL_ISUPPORTS_INHERITED
128
nsOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)129 nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
130 nsIEventTarget* aTarget)
131 : mCallback(aCallback)
132 , mTarget(aTarget)
133 {
134 }
135
136 private:
~nsOutputStreamReadyEvent()137 ~nsOutputStreamReadyEvent()
138 {
139 if (!mCallback) {
140 return;
141 }
142 //
143 // whoa!! looks like we never posted this event. take care to
144 // release mCallback on the correct thread. if mTarget lives on the
145 // calling thread, then we are ok. otherwise, we have to try to
146 // proxy the Release over the right thread. if that thread is dead,
147 // then there's nothing we can do... better to leak than crash.
148 //
149 bool val;
150 nsresult rv = mTarget->IsOnCurrentThread(&val);
151 if (NS_FAILED(rv) || !val) {
152 nsCOMPtr<nsIOutputStreamCallback> event =
153 NS_NewOutputStreamReadyEvent(mCallback, mTarget);
154 mCallback = nullptr;
155 if (event) {
156 rv = event->OnOutputStreamReady(nullptr);
157 if (NS_FAILED(rv)) {
158 NS_NOTREACHED("leaking stream event");
159 nsISupports* sup = event;
160 NS_ADDREF(sup);
161 }
162 }
163 }
164 }
165
166 public:
OnOutputStreamReady(nsIAsyncOutputStream * aStream)167 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override
168 {
169 mStream = aStream;
170
171 nsresult rv =
172 mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
173 if (NS_FAILED(rv)) {
174 NS_WARNING("PostEvent failed");
175 return NS_ERROR_FAILURE;
176 }
177
178 return NS_OK;
179 }
180
Run()181 NS_IMETHOD Run() override
182 {
183 if (mCallback) {
184 if (mStream) {
185 mCallback->OnOutputStreamReady(mStream);
186 }
187 mCallback = nullptr;
188 }
189 return NS_OK;
190 }
191
Cancel()192 nsresult Cancel() override
193 {
194 mCallback = nullptr;
195 return NS_OK;
196 }
197
198 private:
199 nsCOMPtr<nsIAsyncOutputStream> mStream;
200 nsCOMPtr<nsIOutputStreamCallback> mCallback;
201 nsCOMPtr<nsIEventTarget> mTarget;
202 };
203
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent,CancelableRunnable,nsIOutputStreamCallback)204 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
205 nsIOutputStreamCallback)
206
207 //-----------------------------------------------------------------------------
208
209 already_AddRefed<nsIInputStreamCallback>
210 NS_NewInputStreamReadyEvent(nsIInputStreamCallback* aCallback,
211 nsIEventTarget* aTarget)
212 {
213 NS_ASSERTION(aCallback, "null callback");
214 NS_ASSERTION(aTarget, "null target");
215 RefPtr<nsInputStreamReadyEvent> ev =
216 new nsInputStreamReadyEvent(aCallback, aTarget);
217 return ev.forget();
218 }
219
220 already_AddRefed<nsIOutputStreamCallback>
NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)221 NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
222 nsIEventTarget* aTarget)
223 {
224 NS_ASSERTION(aCallback, "null callback");
225 NS_ASSERTION(aTarget, "null target");
226 RefPtr<nsOutputStreamReadyEvent> ev =
227 new nsOutputStreamReadyEvent(aCallback, aTarget);
228 return ev.forget();
229 }
230
231 //-----------------------------------------------------------------------------
232 // NS_AsyncCopy implementation
233
234 // abstract stream copier...
235 class nsAStreamCopier
236 : public nsIInputStreamCallback
237 , public nsIOutputStreamCallback
238 , public CancelableRunnable
239 {
240 public:
241 NS_DECL_ISUPPORTS_INHERITED
242
nsAStreamCopier()243 nsAStreamCopier()
244 : mLock("nsAStreamCopier.mLock")
245 , mCallback(nullptr)
246 , mProgressCallback(nullptr)
247 , mClosure(nullptr)
248 , mChunkSize(0)
249 , mEventInProcess(false)
250 , mEventIsPending(false)
251 , mCloseSource(true)
252 , mCloseSink(true)
253 , mCanceled(false)
254 , mCancelStatus(NS_OK)
255 {
256 }
257
258 // kick off the async copy...
Start(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyCallbackFun aCallback,void * aClosure,uint32_t aChunksize,bool aCloseSource,bool aCloseSink,nsAsyncCopyProgressFun aProgressCallback)259 nsresult Start(nsIInputStream* aSource,
260 nsIOutputStream* aSink,
261 nsIEventTarget* aTarget,
262 nsAsyncCopyCallbackFun aCallback,
263 void* aClosure,
264 uint32_t aChunksize,
265 bool aCloseSource,
266 bool aCloseSink,
267 nsAsyncCopyProgressFun aProgressCallback)
268 {
269 mSource = aSource;
270 mSink = aSink;
271 mTarget = aTarget;
272 mCallback = aCallback;
273 mClosure = aClosure;
274 mChunkSize = aChunksize;
275 mCloseSource = aCloseSource;
276 mCloseSink = aCloseSink;
277 mProgressCallback = aProgressCallback;
278
279 mAsyncSource = do_QueryInterface(mSource);
280 mAsyncSink = do_QueryInterface(mSink);
281
282 return PostContinuationEvent();
283 }
284
285 // implemented by subclasses, returns number of bytes copied and
286 // sets source and sink condition before returning.
287 virtual uint32_t DoCopy(nsresult* aSourceCondition,
288 nsresult* aSinkCondition) = 0;
289
Process()290 void Process()
291 {
292 if (!mSource || !mSink) {
293 return;
294 }
295
296 nsresult cancelStatus;
297 bool canceled;
298 {
299 MutexAutoLock lock(mLock);
300 canceled = mCanceled;
301 cancelStatus = mCancelStatus;
302 }
303
304 // If the copy was canceled before Process() was even called, then
305 // sourceCondition and sinkCondition should be set to error results to
306 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
307 MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
308 nsresult sourceCondition = cancelStatus;
309 nsresult sinkCondition = cancelStatus;
310
311 // Copy data from the source to the sink until we hit failure or have
312 // copied all the data.
313 for (;;) {
314 // Note: copyFailed will be true if the source or the sink have
315 // reported an error, or if we failed to write any bytes
316 // because we have consumed all of our data.
317 bool copyFailed = false;
318 if (!canceled) {
319 uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
320 if (n > 0 && mProgressCallback) {
321 mProgressCallback(mClosure, n);
322 }
323 copyFailed = NS_FAILED(sourceCondition) ||
324 NS_FAILED(sinkCondition) || n == 0;
325
326 MutexAutoLock lock(mLock);
327 canceled = mCanceled;
328 cancelStatus = mCancelStatus;
329 }
330 if (copyFailed && !canceled) {
331 if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
332 // need to wait for more data from source. while waiting for
333 // more source data, be sure to observe failures on output end.
334 mAsyncSource->AsyncWait(this, 0, 0, nullptr);
335
336 if (mAsyncSink)
337 mAsyncSink->AsyncWait(this,
338 nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
339 0, nullptr);
340 break;
341 } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
342 // need to wait for more room in the sink. while waiting for
343 // more room in the sink, be sure to observer failures on the
344 // input end.
345 mAsyncSink->AsyncWait(this, 0, 0, nullptr);
346
347 if (mAsyncSource)
348 mAsyncSource->AsyncWait(this,
349 nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
350 0, nullptr);
351 break;
352 }
353 }
354 if (copyFailed || canceled) {
355 if (mCloseSource) {
356 // close source
357 if (mAsyncSource)
358 mAsyncSource->CloseWithStatus(
359 canceled ? cancelStatus : sinkCondition);
360 else {
361 mSource->Close();
362 }
363 }
364 mAsyncSource = nullptr;
365 mSource = nullptr;
366
367 if (mCloseSink) {
368 // close sink
369 if (mAsyncSink)
370 mAsyncSink->CloseWithStatus(
371 canceled ? cancelStatus : sourceCondition);
372 else {
373 // If we have an nsISafeOutputStream, and our
374 // sourceCondition and sinkCondition are not set to a
375 // failure state, finish writing.
376 nsCOMPtr<nsISafeOutputStream> sostream =
377 do_QueryInterface(mSink);
378 if (sostream && NS_SUCCEEDED(sourceCondition) &&
379 NS_SUCCEEDED(sinkCondition)) {
380 sostream->Finish();
381 } else {
382 mSink->Close();
383 }
384 }
385 }
386 mAsyncSink = nullptr;
387 mSink = nullptr;
388
389 // notify state complete...
390 if (mCallback) {
391 nsresult status;
392 if (!canceled) {
393 status = sourceCondition;
394 if (NS_SUCCEEDED(status)) {
395 status = sinkCondition;
396 }
397 if (status == NS_BASE_STREAM_CLOSED) {
398 status = NS_OK;
399 }
400 } else {
401 status = cancelStatus;
402 }
403 mCallback(mClosure, status);
404 }
405 break;
406 }
407 }
408 }
409
Cancel(nsresult aReason)410 nsresult Cancel(nsresult aReason)
411 {
412 MutexAutoLock lock(mLock);
413 if (mCanceled) {
414 return NS_ERROR_FAILURE;
415 }
416
417 if (NS_SUCCEEDED(aReason)) {
418 NS_WARNING("cancel with non-failure status code");
419 aReason = NS_BASE_STREAM_CLOSED;
420 }
421
422 mCanceled = true;
423 mCancelStatus = aReason;
424 return NS_OK;
425 }
426
OnInputStreamReady(nsIAsyncInputStream * aSource)427 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override
428 {
429 PostContinuationEvent();
430 return NS_OK;
431 }
432
OnOutputStreamReady(nsIAsyncOutputStream * aSink)433 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override
434 {
435 PostContinuationEvent();
436 return NS_OK;
437 }
438
439 // continuation event handler
Run()440 NS_IMETHOD Run() override
441 {
442 Process();
443
444 // clear "in process" flag and post any pending continuation event
445 MutexAutoLock lock(mLock);
446 mEventInProcess = false;
447 if (mEventIsPending) {
448 mEventIsPending = false;
449 PostContinuationEvent_Locked();
450 }
451
452 return NS_OK;
453 }
454
455 nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
456
PostContinuationEvent()457 nsresult PostContinuationEvent()
458 {
459 // we cannot post a continuation event if there is currently
460 // an event in process. doing so could result in Process being
461 // run simultaneously on multiple threads, so we mark the event
462 // as pending, and if an event is already in process then we
463 // just let that existing event take care of posting the real
464 // continuation event.
465
466 MutexAutoLock lock(mLock);
467 return PostContinuationEvent_Locked();
468 }
469
PostContinuationEvent_Locked()470 nsresult PostContinuationEvent_Locked()
471 {
472 nsresult rv = NS_OK;
473 if (mEventInProcess) {
474 mEventIsPending = true;
475 } else {
476 rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
477 if (NS_SUCCEEDED(rv)) {
478 mEventInProcess = true;
479 } else {
480 NS_WARNING("unable to post continuation event");
481 }
482 }
483 return rv;
484 }
485
486 protected:
487 nsCOMPtr<nsIInputStream> mSource;
488 nsCOMPtr<nsIOutputStream> mSink;
489 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
490 nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
491 nsCOMPtr<nsIEventTarget> mTarget;
492 Mutex mLock;
493 nsAsyncCopyCallbackFun mCallback;
494 nsAsyncCopyProgressFun mProgressCallback;
495 void* mClosure;
496 uint32_t mChunkSize;
497 bool mEventInProcess;
498 bool mEventIsPending;
499 bool mCloseSource;
500 bool mCloseSink;
501 bool mCanceled;
502 nsresult mCancelStatus;
503
504 // virtual since subclasses call superclass Release()
~nsAStreamCopier()505 virtual ~nsAStreamCopier()
506 {
507 }
508 };
509
510 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier,
511 CancelableRunnable,
512 nsIInputStreamCallback,
513 nsIOutputStreamCallback)
514
515 class nsStreamCopierIB final : public nsAStreamCopier
516 {
517 public:
nsStreamCopierIB()518 nsStreamCopierIB() : nsAStreamCopier()
519 {
520 }
~nsStreamCopierIB()521 virtual ~nsStreamCopierIB()
522 {
523 }
524
525 struct MOZ_STACK_CLASS ReadSegmentsState
526 {
527 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
528 nsIOutputStream* MOZ_NON_OWNING_REF mSink;
529 nsresult mSinkCondition;
530 };
531
ConsumeInputBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)532 static nsresult ConsumeInputBuffer(nsIInputStream* aInStr,
533 void* aClosure,
534 const char* aBuffer,
535 uint32_t aOffset,
536 uint32_t aCount,
537 uint32_t* aCountWritten)
538 {
539 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
540
541 nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
542 if (NS_FAILED(rv)) {
543 state->mSinkCondition = rv;
544 } else if (*aCountWritten == 0) {
545 state->mSinkCondition = NS_BASE_STREAM_CLOSED;
546 }
547
548 return state->mSinkCondition;
549 }
550
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)551 uint32_t DoCopy(nsresult* aSourceCondition,
552 nsresult* aSinkCondition) override
553 {
554 ReadSegmentsState state;
555 state.mSink = mSink;
556 state.mSinkCondition = NS_OK;
557
558 uint32_t n;
559 *aSourceCondition =
560 mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
561 *aSinkCondition = state.mSinkCondition;
562 return n;
563 }
564
Cancel()565 nsresult Cancel() override
566 {
567 return NS_OK;
568 }
569 };
570
571 class nsStreamCopierOB final : public nsAStreamCopier
572 {
573 public:
nsStreamCopierOB()574 nsStreamCopierOB() : nsAStreamCopier()
575 {
576 }
~nsStreamCopierOB()577 virtual ~nsStreamCopierOB()
578 {
579 }
580
581 struct MOZ_STACK_CLASS WriteSegmentsState
582 {
583 // the nsIInputStream will outlive the WriteSegmentsState on the stack
584 nsIInputStream* MOZ_NON_OWNING_REF mSource;
585 nsresult mSourceCondition;
586 };
587
FillOutputBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)588 static nsresult FillOutputBuffer(nsIOutputStream* aOutStr,
589 void* aClosure,
590 char* aBuffer,
591 uint32_t aOffset,
592 uint32_t aCount,
593 uint32_t* aCountRead)
594 {
595 WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
596
597 nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
598 if (NS_FAILED(rv)) {
599 state->mSourceCondition = rv;
600 } else if (*aCountRead == 0) {
601 state->mSourceCondition = NS_BASE_STREAM_CLOSED;
602 }
603
604 return state->mSourceCondition;
605 }
606
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)607 uint32_t DoCopy(nsresult* aSourceCondition,
608 nsresult* aSinkCondition) override
609 {
610 WriteSegmentsState state;
611 state.mSource = mSource;
612 state.mSourceCondition = NS_OK;
613
614 uint32_t n;
615 *aSinkCondition =
616 mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
617 *aSourceCondition = state.mSourceCondition;
618 return n;
619 }
620
Cancel()621 nsresult Cancel() override
622 {
623 return NS_OK;
624 }
625 };
626
627 //-----------------------------------------------------------------------------
628
629 nsresult
NS_AsyncCopy(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyMode aMode,uint32_t aChunkSize,nsAsyncCopyCallbackFun aCallback,void * aClosure,bool aCloseSource,bool aCloseSink,nsISupports ** aCopierCtx,nsAsyncCopyProgressFun aProgressCallback)630 NS_AsyncCopy(nsIInputStream* aSource,
631 nsIOutputStream* aSink,
632 nsIEventTarget* aTarget,
633 nsAsyncCopyMode aMode,
634 uint32_t aChunkSize,
635 nsAsyncCopyCallbackFun aCallback,
636 void* aClosure,
637 bool aCloseSource,
638 bool aCloseSink,
639 nsISupports** aCopierCtx,
640 nsAsyncCopyProgressFun aProgressCallback)
641 {
642 NS_ASSERTION(aTarget, "non-null target required");
643
644 nsresult rv;
645 nsAStreamCopier* copier;
646
647 if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
648 copier = new nsStreamCopierIB();
649 } else {
650 copier = new nsStreamCopierOB();
651 }
652
653 // Start() takes an owning ref to the copier...
654 NS_ADDREF(copier);
655 rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
656 aCloseSource, aCloseSink, aProgressCallback);
657
658 if (aCopierCtx) {
659 *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
660 NS_ADDREF(*aCopierCtx);
661 }
662 NS_RELEASE(copier);
663
664 return rv;
665 }
666
667 //-----------------------------------------------------------------------------
668
669 nsresult
NS_CancelAsyncCopy(nsISupports * aCopierCtx,nsresult aReason)670 NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason)
671 {
672 nsAStreamCopier* copier =
673 static_cast<nsAStreamCopier*>(static_cast<nsIRunnable *>(aCopierCtx));
674 return copier->Cancel(aReason);
675 }
676
677 //-----------------------------------------------------------------------------
678
679 nsresult
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsACString & aResult)680 NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
681 nsACString& aResult)
682 {
683 nsresult rv = NS_OK;
684 aResult.Truncate();
685
686 while (aMaxCount) {
687 uint64_t avail64;
688 rv = aStream->Available(&avail64);
689 if (NS_FAILED(rv)) {
690 if (rv == NS_BASE_STREAM_CLOSED) {
691 rv = NS_OK;
692 }
693 break;
694 }
695 if (avail64 == 0) {
696 break;
697 }
698
699 uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
700
701 // resize aResult buffer
702 uint32_t length = aResult.Length();
703 if (avail > UINT32_MAX - length) {
704 return NS_ERROR_FILE_TOO_BIG;
705 }
706
707 aResult.SetLength(length + avail);
708 if (aResult.Length() != (length + avail)) {
709 return NS_ERROR_OUT_OF_MEMORY;
710 }
711 char* buf = aResult.BeginWriting() + length;
712
713 uint32_t n;
714 rv = aStream->Read(buf, avail, &n);
715 if (NS_FAILED(rv)) {
716 break;
717 }
718 if (n != avail) {
719 aResult.SetLength(length + n);
720 }
721 if (n == 0) {
722 break;
723 }
724 aMaxCount -= n;
725 }
726
727 return rv;
728 }
729
730 //-----------------------------------------------------------------------------
731
732 static nsresult
TestInputStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)733 TestInputStream(nsIInputStream* aInStr,
734 void* aClosure,
735 const char* aBuffer,
736 uint32_t aOffset,
737 uint32_t aCount,
738 uint32_t* aCountWritten)
739 {
740 bool* result = static_cast<bool*>(aClosure);
741 *result = true;
742 return NS_ERROR_ABORT; // don't call me anymore
743 }
744
745 bool
NS_InputStreamIsBuffered(nsIInputStream * aStream)746 NS_InputStreamIsBuffered(nsIInputStream* aStream)
747 {
748 nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
749 if (bufferedIn) {
750 return true;
751 }
752
753 bool result = false;
754 uint32_t n;
755 nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
756 return result || NS_SUCCEEDED(rv);
757 }
758
759 static nsresult
TestOutputStream(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)760 TestOutputStream(nsIOutputStream* aOutStr,
761 void* aClosure,
762 char* aBuffer,
763 uint32_t aOffset,
764 uint32_t aCount,
765 uint32_t* aCountRead)
766 {
767 bool* result = static_cast<bool*>(aClosure);
768 *result = true;
769 return NS_ERROR_ABORT; // don't call me anymore
770 }
771
772 bool
NS_OutputStreamIsBuffered(nsIOutputStream * aStream)773 NS_OutputStreamIsBuffered(nsIOutputStream* aStream)
774 {
775 nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
776 if (bufferedOut) {
777 return true;
778 }
779
780 bool result = false;
781 uint32_t n;
782 aStream->WriteSegments(TestOutputStream, &result, 1, &n);
783 return result;
784 }
785
786 //-----------------------------------------------------------------------------
787
788 nsresult
NS_CopySegmentToStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)789 NS_CopySegmentToStream(nsIInputStream* aInStr,
790 void* aClosure,
791 const char* aBuffer,
792 uint32_t aOffset,
793 uint32_t aCount,
794 uint32_t* aCountWritten)
795 {
796 nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
797 *aCountWritten = 0;
798 while (aCount) {
799 uint32_t n;
800 nsresult rv = outStr->Write(aBuffer, aCount, &n);
801 if (NS_FAILED(rv)) {
802 return rv;
803 }
804 aBuffer += n;
805 aCount -= n;
806 *aCountWritten += n;
807 }
808 return NS_OK;
809 }
810
811 nsresult
NS_CopySegmentToBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)812 NS_CopySegmentToBuffer(nsIInputStream* aInStr,
813 void* aClosure,
814 const char* aBuffer,
815 uint32_t aOffset,
816 uint32_t aCount,
817 uint32_t* aCountWritten)
818 {
819 char* toBuf = static_cast<char*>(aClosure);
820 memcpy(&toBuf[aOffset], aBuffer, aCount);
821 *aCountWritten = aCount;
822 return NS_OK;
823 }
824
825 nsresult
NS_CopySegmentToBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)826 NS_CopySegmentToBuffer(nsIOutputStream* aOutStr,
827 void* aClosure,
828 char* aBuffer,
829 uint32_t aOffset,
830 uint32_t aCount,
831 uint32_t* aCountRead)
832 {
833 const char* fromBuf = static_cast<const char*>(aClosure);
834 memcpy(aBuffer, &fromBuf[aOffset], aCount);
835 *aCountRead = aCount;
836 return NS_OK;
837 }
838
839 nsresult
NS_DiscardSegment(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)840 NS_DiscardSegment(nsIInputStream* aInStr,
841 void* aClosure,
842 const char* aBuffer,
843 uint32_t aOffset,
844 uint32_t aCount,
845 uint32_t* aCountWritten)
846 {
847 *aCountWritten = aCount;
848 return NS_OK;
849 }
850
851 //-----------------------------------------------------------------------------
852
853 nsresult
NS_WriteSegmentThunk(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)854 NS_WriteSegmentThunk(nsIInputStream* aInStr,
855 void* aClosure,
856 const char* aBuffer,
857 uint32_t aOffset,
858 uint32_t aCount,
859 uint32_t* aCountWritten)
860 {
861 nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
862 return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
863 aCountWritten);
864 }
865
866 nsresult
NS_FillArray(FallibleTArray<char> & aDest,nsIInputStream * aInput,uint32_t aKeep,uint32_t * aNewBytes)867 NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
868 uint32_t aKeep, uint32_t* aNewBytes)
869 {
870 MOZ_ASSERT(aInput, "null stream");
871 MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
872
873 char* aBuffer = aDest.Elements();
874 int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
875 if (aKeep != 0 && keepOffset > 0) {
876 memmove(aBuffer, aBuffer + keepOffset, aKeep);
877 }
878
879 nsresult rv =
880 aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
881 if (NS_FAILED(rv)) {
882 *aNewBytes = 0;
883 }
884 // NOTE: we rely on the fact that the new slots are NOT initialized by
885 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
886 // in nsTArray.h:
887 aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
888
889 MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
890 return rv;
891 }
892
893 bool
NS_InputStreamIsCloneable(nsIInputStream * aSource)894 NS_InputStreamIsCloneable(nsIInputStream* aSource)
895 {
896 if (!aSource) {
897 return false;
898 }
899
900 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
901 return cloneable && cloneable->GetCloneable();
902 }
903
904 nsresult
NS_CloneInputStream(nsIInputStream * aSource,nsIInputStream ** aCloneOut,nsIInputStream ** aReplacementOut)905 NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
906 nsIInputStream** aReplacementOut)
907 {
908 if (NS_WARN_IF(!aSource)) {
909 return NS_ERROR_FAILURE;
910 }
911
912 // Attempt to perform the clone directly on the source stream
913 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
914 if (cloneable && cloneable->GetCloneable()) {
915 if (aReplacementOut) {
916 *aReplacementOut = nullptr;
917 }
918 return cloneable->Clone(aCloneOut);
919 }
920
921 // If we failed the clone and the caller does not want to replace their
922 // original stream, then we are done. Return error.
923 if (!aReplacementOut) {
924 return NS_ERROR_FAILURE;
925 }
926
927 // The caller has opted-in to the fallback clone support that replaces
928 // the original stream. Copy the data to a pipe and return two cloned
929 // input streams.
930
931 nsCOMPtr<nsIInputStream> reader;
932 nsCOMPtr<nsIInputStream> readerClone;
933 nsCOMPtr<nsIOutputStream> writer;
934
935 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
936 0, 0, // default segment size and max size
937 true, true); // non-blocking
938 if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
939
940 cloneable = do_QueryInterface(reader);
941 MOZ_ASSERT(cloneable && cloneable->GetCloneable());
942
943 rv = cloneable->Clone(getter_AddRefs(readerClone));
944 if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
945
946 nsCOMPtr<nsIEventTarget> target =
947 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
948 if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
949
950 rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
951 if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
952
953 readerClone.forget(aCloneOut);
954 reader.forget(aReplacementOut);
955
956 return NS_OK;
957 }
958