1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsCOMPtr.h"
11 #include "nsICloneableInputStream.h"
12 #include "nsIEventTarget.h"
13 #include "nsICancelableRunnable.h"
14 #include "nsISafeOutputStream.h"
15 #include "nsString.h"
16 #include "nsIAsyncInputStream.h"
17 #include "nsIAsyncOutputStream.h"
18 #include "nsIBufferedStreams.h"
19 #include "nsIPipe.h"
20 #include "nsNetCID.h"
21 #include "nsServiceManagerUtils.h"
22 #include "nsThreadUtils.h"
23 #include "nsITransport.h"
24 #include "nsIStreamTransportService.h"
25 #include "NonBlockingAsyncInputStream.h"
26
27 using namespace mozilla;
28
29 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
30
31 //-----------------------------------------------------------------------------
32
33 // This is a nsICancelableRunnable because we can dispatch it to Workers and
34 // those can be shut down at any time, and in these cases, Cancel() is called
35 // instead of Run().
36 class nsInputStreamReadyEvent final : public CancelableRunnable,
37 public nsIInputStreamCallback,
38 public nsIRunnablePriority {
39 public:
40 NS_DECL_ISUPPORTS_INHERITED
41
nsInputStreamReadyEvent(const char * aName,nsIInputStreamCallback * aCallback,nsIEventTarget * aTarget,uint32_t aPriority)42 nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
43 nsIEventTarget* aTarget, uint32_t aPriority)
44 : CancelableRunnable(aName),
45 mCallback(aCallback),
46 mTarget(aTarget),
47 mPriority(aPriority) {}
48
49 private:
~nsInputStreamReadyEvent()50 ~nsInputStreamReadyEvent() {
51 if (!mCallback) {
52 return;
53 }
54 //
55 // whoa!! looks like we never posted this event. take care to
56 // release mCallback on the correct thread. if mTarget lives on the
57 // calling thread, then we are ok. otherwise, we have to try to
58 // proxy the Release over the right thread. if that thread is dead,
59 // then there's nothing we can do... better to leak than crash.
60 //
61 bool val;
62 nsresult rv = mTarget->IsOnCurrentThread(&val);
63 if (NS_FAILED(rv) || !val) {
64 nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
65 "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
66 mCallback = nullptr;
67 if (event) {
68 rv = event->OnInputStreamReady(nullptr);
69 if (NS_FAILED(rv)) {
70 MOZ_ASSERT_UNREACHABLE("leaking stream event");
71 nsISupports* sup = event;
72 NS_ADDREF(sup);
73 }
74 }
75 }
76 }
77
78 public:
OnInputStreamReady(nsIAsyncInputStream * aStream)79 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
80 mStream = aStream;
81
82 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
83 if (NS_FAILED(rv)) {
84 NS_WARNING("Dispatch failed");
85 return NS_ERROR_FAILURE;
86 }
87
88 return NS_OK;
89 }
90
Run()91 NS_IMETHOD Run() override {
92 if (mCallback) {
93 if (mStream) {
94 mCallback->OnInputStreamReady(mStream);
95 }
96 mCallback = nullptr;
97 }
98 return NS_OK;
99 }
100
Cancel()101 nsresult Cancel() override {
102 mCallback = nullptr;
103 return NS_OK;
104 }
105
GetPriority(uint32_t * aPriority)106 NS_IMETHOD GetPriority(uint32_t* aPriority) override {
107 *aPriority = mPriority;
108 return NS_OK;
109 }
110
111 private:
112 nsCOMPtr<nsIAsyncInputStream> mStream;
113 nsCOMPtr<nsIInputStreamCallback> mCallback;
114 nsCOMPtr<nsIEventTarget> mTarget;
115 uint32_t mPriority;
116 };
117
118 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
119 nsIInputStreamCallback, nsIRunnablePriority)
120
121 //-----------------------------------------------------------------------------
122
123 // This is a nsICancelableRunnable because we can dispatch it to Workers and
124 // those can be shut down at any time, and in these cases, Cancel() is called
125 // instead of Run().
126 class nsOutputStreamReadyEvent final : public CancelableRunnable,
127 public nsIOutputStreamCallback {
128 public:
129 NS_DECL_ISUPPORTS_INHERITED
130
nsOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)131 nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
132 nsIEventTarget* aTarget)
133 : CancelableRunnable("nsOutputStreamReadyEvent"),
134 mCallback(aCallback),
135 mTarget(aTarget) {}
136
137 private:
~nsOutputStreamReadyEvent()138 ~nsOutputStreamReadyEvent() {
139 if (!mCallback) {
140 return;
141 }
142 //
143 // whoa!! looks like we never posted this event. take care to
144 // release mCallback on the correct thread. if mTarget lives on the
145 // calling thread, then we are ok. otherwise, we have to try to
146 // proxy the Release over the right thread. if that thread is dead,
147 // then there's nothing we can do... better to leak than crash.
148 //
149 bool val;
150 nsresult rv = mTarget->IsOnCurrentThread(&val);
151 if (NS_FAILED(rv) || !val) {
152 nsCOMPtr<nsIOutputStreamCallback> event =
153 NS_NewOutputStreamReadyEvent(mCallback, mTarget);
154 mCallback = nullptr;
155 if (event) {
156 rv = event->OnOutputStreamReady(nullptr);
157 if (NS_FAILED(rv)) {
158 MOZ_ASSERT_UNREACHABLE("leaking stream event");
159 nsISupports* sup = event;
160 NS_ADDREF(sup);
161 }
162 }
163 }
164 }
165
166 public:
OnOutputStreamReady(nsIAsyncOutputStream * aStream)167 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
168 mStream = aStream;
169
170 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
171 if (NS_FAILED(rv)) {
172 NS_WARNING("PostEvent failed");
173 return NS_ERROR_FAILURE;
174 }
175
176 return NS_OK;
177 }
178
Run()179 NS_IMETHOD Run() override {
180 if (mCallback) {
181 if (mStream) {
182 mCallback->OnOutputStreamReady(mStream);
183 }
184 mCallback = nullptr;
185 }
186 return NS_OK;
187 }
188
Cancel()189 nsresult Cancel() override {
190 mCallback = nullptr;
191 return NS_OK;
192 }
193
194 private:
195 nsCOMPtr<nsIAsyncOutputStream> mStream;
196 nsCOMPtr<nsIOutputStreamCallback> mCallback;
197 nsCOMPtr<nsIEventTarget> mTarget;
198 };
199
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent,CancelableRunnable,nsIOutputStreamCallback)200 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
201 nsIOutputStreamCallback)
202
203 //-----------------------------------------------------------------------------
204
205 already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
206 const char* aName, nsIInputStreamCallback* aCallback,
207 nsIEventTarget* aTarget, uint32_t aPriority) {
208 NS_ASSERTION(aCallback, "null callback");
209 NS_ASSERTION(aTarget, "null target");
210 RefPtr<nsInputStreamReadyEvent> ev =
211 new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority);
212 return ev.forget();
213 }
214
NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback * aCallback,nsIEventTarget * aTarget)215 already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
216 nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
217 NS_ASSERTION(aCallback, "null callback");
218 NS_ASSERTION(aTarget, "null target");
219 RefPtr<nsOutputStreamReadyEvent> ev =
220 new nsOutputStreamReadyEvent(aCallback, aTarget);
221 return ev.forget();
222 }
223
224 //-----------------------------------------------------------------------------
225 // NS_AsyncCopy implementation
226
227 // abstract stream copier...
228 class nsAStreamCopier : public nsIInputStreamCallback,
229 public nsIOutputStreamCallback,
230 public CancelableRunnable {
231 public:
232 NS_DECL_ISUPPORTS_INHERITED
233
nsAStreamCopier()234 nsAStreamCopier()
235 : CancelableRunnable("nsAStreamCopier"),
236 mLock("nsAStreamCopier.mLock"),
237 mCallback(nullptr),
238 mProgressCallback(nullptr),
239 mClosure(nullptr),
240 mChunkSize(0),
241 mEventInProcess(false),
242 mEventIsPending(false),
243 mCloseSource(true),
244 mCloseSink(true),
245 mCanceled(false),
246 mCancelStatus(NS_OK) {}
247
248 // kick off the async copy...
Start(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyCallbackFun aCallback,void * aClosure,uint32_t aChunksize,bool aCloseSource,bool aCloseSink,nsAsyncCopyProgressFun aProgressCallback)249 nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
250 nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
251 void* aClosure, uint32_t aChunksize, bool aCloseSource,
252 bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
253 mSource = aSource;
254 mSink = aSink;
255 mTarget = aTarget;
256 mCallback = aCallback;
257 mClosure = aClosure;
258 mChunkSize = aChunksize;
259 mCloseSource = aCloseSource;
260 mCloseSink = aCloseSink;
261 mProgressCallback = aProgressCallback;
262
263 mAsyncSource = do_QueryInterface(mSource);
264 mAsyncSink = do_QueryInterface(mSink);
265
266 return PostContinuationEvent();
267 }
268
269 // implemented by subclasses, returns number of bytes copied and
270 // sets source and sink condition before returning.
271 virtual uint32_t DoCopy(nsresult* aSourceCondition,
272 nsresult* aSinkCondition) = 0;
273
Process()274 void Process() {
275 if (!mSource || !mSink) {
276 return;
277 }
278
279 nsresult cancelStatus;
280 bool canceled;
281 {
282 MutexAutoLock lock(mLock);
283 canceled = mCanceled;
284 cancelStatus = mCancelStatus;
285 }
286
287 // If the copy was canceled before Process() was even called, then
288 // sourceCondition and sinkCondition should be set to error results to
289 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
290 MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
291 nsresult sourceCondition = cancelStatus;
292 nsresult sinkCondition = cancelStatus;
293
294 // Copy data from the source to the sink until we hit failure or have
295 // copied all the data.
296 for (;;) {
297 // Note: copyFailed will be true if the source or the sink have
298 // reported an error, or if we failed to write any bytes
299 // because we have consumed all of our data.
300 bool copyFailed = false;
301 if (!canceled) {
302 uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
303 if (n > 0 && mProgressCallback) {
304 mProgressCallback(mClosure, n);
305 }
306 copyFailed =
307 NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
308
309 MutexAutoLock lock(mLock);
310 canceled = mCanceled;
311 cancelStatus = mCancelStatus;
312 }
313 if (copyFailed && !canceled) {
314 if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
315 // need to wait for more data from source. while waiting for
316 // more source data, be sure to observe failures on output end.
317 mAsyncSource->AsyncWait(this, 0, 0, nullptr);
318
319 if (mAsyncSink)
320 mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
321 0, nullptr);
322 break;
323 } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
324 // need to wait for more room in the sink. while waiting for
325 // more room in the sink, be sure to observer failures on the
326 // input end.
327 mAsyncSink->AsyncWait(this, 0, 0, nullptr);
328
329 if (mAsyncSource)
330 mAsyncSource->AsyncWait(
331 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
332 break;
333 }
334 }
335 if (copyFailed || canceled) {
336 if (mCloseSource) {
337 // close source
338 if (mAsyncSource)
339 mAsyncSource->CloseWithStatus(canceled ? cancelStatus
340 : sinkCondition);
341 else {
342 mSource->Close();
343 }
344 }
345 mAsyncSource = nullptr;
346 mSource = nullptr;
347
348 if (mCloseSink) {
349 // close sink
350 if (mAsyncSink)
351 mAsyncSink->CloseWithStatus(canceled ? cancelStatus
352 : sourceCondition);
353 else {
354 // If we have an nsISafeOutputStream, and our
355 // sourceCondition and sinkCondition are not set to a
356 // failure state, finish writing.
357 nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
358 if (sostream && NS_SUCCEEDED(sourceCondition) &&
359 NS_SUCCEEDED(sinkCondition)) {
360 sostream->Finish();
361 } else {
362 mSink->Close();
363 }
364 }
365 }
366 mAsyncSink = nullptr;
367 mSink = nullptr;
368
369 // notify state complete...
370 if (mCallback) {
371 nsresult status;
372 if (!canceled) {
373 status = sourceCondition;
374 if (NS_SUCCEEDED(status)) {
375 status = sinkCondition;
376 }
377 if (status == NS_BASE_STREAM_CLOSED) {
378 status = NS_OK;
379 }
380 } else {
381 status = cancelStatus;
382 }
383 mCallback(mClosure, status);
384 }
385 break;
386 }
387 }
388 }
389
Cancel(nsresult aReason)390 nsresult Cancel(nsresult aReason) {
391 MutexAutoLock lock(mLock);
392 if (mCanceled) {
393 return NS_ERROR_FAILURE;
394 }
395
396 if (NS_SUCCEEDED(aReason)) {
397 NS_WARNING("cancel with non-failure status code");
398 aReason = NS_BASE_STREAM_CLOSED;
399 }
400
401 mCanceled = true;
402 mCancelStatus = aReason;
403 return NS_OK;
404 }
405
OnInputStreamReady(nsIAsyncInputStream * aSource)406 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
407 PostContinuationEvent();
408 return NS_OK;
409 }
410
OnOutputStreamReady(nsIAsyncOutputStream * aSink)411 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
412 PostContinuationEvent();
413 return NS_OK;
414 }
415
416 // continuation event handler
Run()417 NS_IMETHOD Run() override {
418 Process();
419
420 // clear "in process" flag and post any pending continuation event
421 MutexAutoLock lock(mLock);
422 mEventInProcess = false;
423 if (mEventIsPending) {
424 mEventIsPending = false;
425 PostContinuationEvent_Locked();
426 }
427
428 return NS_OK;
429 }
430
431 nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
432
PostContinuationEvent()433 nsresult PostContinuationEvent() {
434 // we cannot post a continuation event if there is currently
435 // an event in process. doing so could result in Process being
436 // run simultaneously on multiple threads, so we mark the event
437 // as pending, and if an event is already in process then we
438 // just let that existing event take care of posting the real
439 // continuation event.
440
441 MutexAutoLock lock(mLock);
442 return PostContinuationEvent_Locked();
443 }
444
PostContinuationEvent_Locked()445 nsresult PostContinuationEvent_Locked() {
446 nsresult rv = NS_OK;
447 if (mEventInProcess) {
448 mEventIsPending = true;
449 } else {
450 rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
451 if (NS_SUCCEEDED(rv)) {
452 mEventInProcess = true;
453 } else {
454 NS_WARNING("unable to post continuation event");
455 }
456 }
457 return rv;
458 }
459
460 protected:
461 nsCOMPtr<nsIInputStream> mSource;
462 nsCOMPtr<nsIOutputStream> mSink;
463 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
464 nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
465 nsCOMPtr<nsIEventTarget> mTarget;
466 Mutex mLock;
467 nsAsyncCopyCallbackFun mCallback;
468 nsAsyncCopyProgressFun mProgressCallback;
469 void* mClosure;
470 uint32_t mChunkSize;
471 bool mEventInProcess;
472 bool mEventIsPending;
473 bool mCloseSource;
474 bool mCloseSink;
475 bool mCanceled;
476 nsresult mCancelStatus;
477
478 // virtual since subclasses call superclass Release()
479 virtual ~nsAStreamCopier() = default;
480 };
481
482 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
483 nsIInputStreamCallback, nsIOutputStreamCallback)
484
485 class nsStreamCopierIB final : public nsAStreamCopier {
486 public:
nsStreamCopierIB()487 nsStreamCopierIB() : nsAStreamCopier() {}
488 virtual ~nsStreamCopierIB() = default;
489
490 struct MOZ_STACK_CLASS ReadSegmentsState {
491 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
492 nsIOutputStream* MOZ_NON_OWNING_REF mSink;
493 nsresult mSinkCondition;
494 };
495
ConsumeInputBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)496 static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
497 const char* aBuffer, uint32_t aOffset,
498 uint32_t aCount, uint32_t* aCountWritten) {
499 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
500
501 nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
502 if (NS_FAILED(rv)) {
503 state->mSinkCondition = rv;
504 } else if (*aCountWritten == 0) {
505 state->mSinkCondition = NS_BASE_STREAM_CLOSED;
506 }
507
508 return state->mSinkCondition;
509 }
510
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)511 uint32_t DoCopy(nsresult* aSourceCondition,
512 nsresult* aSinkCondition) override {
513 ReadSegmentsState state;
514 state.mSink = mSink;
515 state.mSinkCondition = NS_OK;
516
517 uint32_t n;
518 *aSourceCondition =
519 mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
520 *aSinkCondition = state.mSinkCondition;
521 return n;
522 }
523
Cancel()524 nsresult Cancel() override { return NS_OK; }
525 };
526
527 class nsStreamCopierOB final : public nsAStreamCopier {
528 public:
nsStreamCopierOB()529 nsStreamCopierOB() : nsAStreamCopier() {}
530 virtual ~nsStreamCopierOB() = default;
531
532 struct MOZ_STACK_CLASS WriteSegmentsState {
533 // the nsIInputStream will outlive the WriteSegmentsState on the stack
534 nsIInputStream* MOZ_NON_OWNING_REF mSource;
535 nsresult mSourceCondition;
536 };
537
FillOutputBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)538 static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
539 char* aBuffer, uint32_t aOffset,
540 uint32_t aCount, uint32_t* aCountRead) {
541 WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
542
543 nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
544 if (NS_FAILED(rv)) {
545 state->mSourceCondition = rv;
546 } else if (*aCountRead == 0) {
547 state->mSourceCondition = NS_BASE_STREAM_CLOSED;
548 }
549
550 return state->mSourceCondition;
551 }
552
DoCopy(nsresult * aSourceCondition,nsresult * aSinkCondition)553 uint32_t DoCopy(nsresult* aSourceCondition,
554 nsresult* aSinkCondition) override {
555 WriteSegmentsState state;
556 state.mSource = mSource;
557 state.mSourceCondition = NS_OK;
558
559 uint32_t n;
560 *aSinkCondition =
561 mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
562 *aSourceCondition = state.mSourceCondition;
563 return n;
564 }
565
Cancel()566 nsresult Cancel() override { return NS_OK; }
567 };
568
569 //-----------------------------------------------------------------------------
570
NS_AsyncCopy(nsIInputStream * aSource,nsIOutputStream * aSink,nsIEventTarget * aTarget,nsAsyncCopyMode aMode,uint32_t aChunkSize,nsAsyncCopyCallbackFun aCallback,void * aClosure,bool aCloseSource,bool aCloseSink,nsISupports ** aCopierCtx,nsAsyncCopyProgressFun aProgressCallback)571 nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
572 nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
573 uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
574 void* aClosure, bool aCloseSource, bool aCloseSink,
575 nsISupports** aCopierCtx,
576 nsAsyncCopyProgressFun aProgressCallback) {
577 NS_ASSERTION(aTarget, "non-null target required");
578
579 nsresult rv;
580 nsAStreamCopier* copier;
581
582 if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
583 copier = new nsStreamCopierIB();
584 } else {
585 copier = new nsStreamCopierOB();
586 }
587
588 // Start() takes an owning ref to the copier...
589 NS_ADDREF(copier);
590 rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
591 aCloseSource, aCloseSink, aProgressCallback);
592
593 if (aCopierCtx) {
594 *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
595 NS_ADDREF(*aCopierCtx);
596 }
597 NS_RELEASE(copier);
598
599 return rv;
600 }
601
602 //-----------------------------------------------------------------------------
603
NS_CancelAsyncCopy(nsISupports * aCopierCtx,nsresult aReason)604 nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
605 nsAStreamCopier* copier =
606 static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
607 return copier->Cancel(aReason);
608 }
609
610 //-----------------------------------------------------------------------------
611
612 namespace {
613 template <typename T>
614 struct ResultTraits {};
615
616 template <>
617 struct ResultTraits<nsACString> {
Clear__anon38b0e1200111::ResultTraits618 static void Clear(nsACString& aString) { aString.Truncate(); }
619
GetStorage__anon38b0e1200111::ResultTraits620 static char* GetStorage(nsACString& aString) {
621 return aString.BeginWriting();
622 }
623 };
624
625 template <>
626 struct ResultTraits<nsTArray<uint8_t>> {
Clear__anon38b0e1200111::ResultTraits627 static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); }
628
GetStorage__anon38b0e1200111::ResultTraits629 static char* GetStorage(nsTArray<uint8_t>& aArray) {
630 return reinterpret_cast<char*>(aArray.Elements());
631 }
632 };
633 } // namespace
634
635 template <typename T>
DoConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,T & aResult)636 nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
637 T& aResult) {
638 nsresult rv = NS_OK;
639 ResultTraits<T>::Clear(aResult);
640
641 while (aMaxCount) {
642 uint64_t avail64;
643 rv = aStream->Available(&avail64);
644 if (NS_FAILED(rv)) {
645 if (rv == NS_BASE_STREAM_CLOSED) {
646 rv = NS_OK;
647 }
648 break;
649 }
650 if (avail64 == 0) {
651 break;
652 }
653
654 uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
655
656 // resize aResult buffer
657 uint32_t length = aResult.Length();
658 CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail;
659 if (!newLength.isValid()) {
660 return NS_ERROR_FILE_TOO_BIG;
661 }
662
663 if (!aResult.SetLength(newLength.value(), fallible)) {
664 return NS_ERROR_OUT_OF_MEMORY;
665 }
666 char* buf = ResultTraits<T>::GetStorage(aResult) + length;
667
668 uint32_t n;
669 rv = aStream->Read(buf, avail, &n);
670 if (NS_FAILED(rv)) {
671 break;
672 }
673 if (n != avail) {
674 MOZ_ASSERT(n < avail, "What happened there???");
675 aResult.SetLength(length + n);
676 }
677 if (n == 0) {
678 break;
679 }
680 aMaxCount -= n;
681 }
682
683 return rv;
684 }
685
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsACString & aResult)686 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
687 nsACString& aResult) {
688 return DoConsumeStream(aStream, aMaxCount, aResult);
689 }
690
NS_ConsumeStream(nsIInputStream * aStream,uint32_t aMaxCount,nsTArray<uint8_t> & aResult)691 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
692 nsTArray<uint8_t>& aResult) {
693 return DoConsumeStream(aStream, aMaxCount, aResult);
694 }
695
696 //-----------------------------------------------------------------------------
697
TestInputStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)698 static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
699 const char* aBuffer, uint32_t aOffset,
700 uint32_t aCount, uint32_t* aCountWritten) {
701 bool* result = static_cast<bool*>(aClosure);
702 *result = true;
703 *aCountWritten = 0;
704 return NS_ERROR_ABORT; // don't call me anymore
705 }
706
NS_InputStreamIsBuffered(nsIInputStream * aStream)707 bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
708 nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
709 if (bufferedIn) {
710 return true;
711 }
712
713 bool result = false;
714 uint32_t n;
715 nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
716 return result || rv != NS_ERROR_NOT_IMPLEMENTED;
717 }
718
TestOutputStream(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)719 static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
720 char* aBuffer, uint32_t aOffset,
721 uint32_t aCount, uint32_t* aCountRead) {
722 bool* result = static_cast<bool*>(aClosure);
723 *result = true;
724 *aCountRead = 0;
725 return NS_ERROR_ABORT; // don't call me anymore
726 }
727
NS_OutputStreamIsBuffered(nsIOutputStream * aStream)728 bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
729 nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
730 if (bufferedOut) {
731 return true;
732 }
733
734 bool result = false;
735 uint32_t n;
736 aStream->WriteSegments(TestOutputStream, &result, 1, &n);
737 return result;
738 }
739
740 //-----------------------------------------------------------------------------
741
NS_CopySegmentToStream(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)742 nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
743 const char* aBuffer, uint32_t aOffset,
744 uint32_t aCount, uint32_t* aCountWritten) {
745 nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
746 *aCountWritten = 0;
747 while (aCount) {
748 uint32_t n;
749 nsresult rv = outStr->Write(aBuffer, aCount, &n);
750 if (NS_FAILED(rv)) {
751 return rv;
752 }
753 aBuffer += n;
754 aCount -= n;
755 *aCountWritten += n;
756 }
757 return NS_OK;
758 }
759
NS_CopySegmentToBuffer(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)760 nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
761 const char* aBuffer, uint32_t aOffset,
762 uint32_t aCount, uint32_t* aCountWritten) {
763 char* toBuf = static_cast<char*>(aClosure);
764 memcpy(&toBuf[aOffset], aBuffer, aCount);
765 *aCountWritten = aCount;
766 return NS_OK;
767 }
768
NS_CopySegmentToBuffer(nsIOutputStream * aOutStr,void * aClosure,char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountRead)769 nsresult NS_CopySegmentToBuffer(nsIOutputStream* aOutStr, void* aClosure,
770 char* aBuffer, uint32_t aOffset,
771 uint32_t aCount, uint32_t* aCountRead) {
772 const char* fromBuf = static_cast<const char*>(aClosure);
773 memcpy(aBuffer, &fromBuf[aOffset], aCount);
774 *aCountRead = aCount;
775 return NS_OK;
776 }
777
NS_DiscardSegment(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)778 nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
779 const char* aBuffer, uint32_t aOffset,
780 uint32_t aCount, uint32_t* aCountWritten) {
781 *aCountWritten = aCount;
782 return NS_OK;
783 }
784
785 //-----------------------------------------------------------------------------
786
NS_WriteSegmentThunk(nsIInputStream * aInStr,void * aClosure,const char * aBuffer,uint32_t aOffset,uint32_t aCount,uint32_t * aCountWritten)787 nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
788 const char* aBuffer, uint32_t aOffset,
789 uint32_t aCount, uint32_t* aCountWritten) {
790 nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
791 return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
792 aCountWritten);
793 }
794
NS_FillArray(FallibleTArray<char> & aDest,nsIInputStream * aInput,uint32_t aKeep,uint32_t * aNewBytes)795 nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
796 uint32_t aKeep, uint32_t* aNewBytes) {
797 MOZ_ASSERT(aInput, "null stream");
798 MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
799
800 char* aBuffer = aDest.Elements();
801 int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
802 if (aKeep != 0 && keepOffset > 0) {
803 memmove(aBuffer, aBuffer + keepOffset, aKeep);
804 }
805
806 nsresult rv =
807 aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
808 if (NS_FAILED(rv)) {
809 *aNewBytes = 0;
810 }
811 // NOTE: we rely on the fact that the new slots are NOT initialized by
812 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
813 // in nsTArray.h:
814 aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
815
816 MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
817 return rv;
818 }
819
NS_InputStreamIsCloneable(nsIInputStream * aSource)820 bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
821 if (!aSource) {
822 return false;
823 }
824
825 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
826 return cloneable && cloneable->GetCloneable();
827 }
828
NS_CloneInputStream(nsIInputStream * aSource,nsIInputStream ** aCloneOut,nsIInputStream ** aReplacementOut)829 nsresult NS_CloneInputStream(nsIInputStream* aSource,
830 nsIInputStream** aCloneOut,
831 nsIInputStream** aReplacementOut) {
832 if (NS_WARN_IF(!aSource)) {
833 return NS_ERROR_FAILURE;
834 }
835
836 // Attempt to perform the clone directly on the source stream
837 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
838 if (cloneable && cloneable->GetCloneable()) {
839 if (aReplacementOut) {
840 *aReplacementOut = nullptr;
841 }
842 return cloneable->Clone(aCloneOut);
843 }
844
845 // If we failed the clone and the caller does not want to replace their
846 // original stream, then we are done. Return error.
847 if (!aReplacementOut) {
848 return NS_ERROR_FAILURE;
849 }
850
851 // The caller has opted-in to the fallback clone support that replaces
852 // the original stream. Copy the data to a pipe and return two cloned
853 // input streams.
854
855 nsCOMPtr<nsIInputStream> reader;
856 nsCOMPtr<nsIInputStream> readerClone;
857 nsCOMPtr<nsIOutputStream> writer;
858
859 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
860 0, // default segment size and max size
861 true, true); // non-blocking
862 if (NS_WARN_IF(NS_FAILED(rv))) {
863 return rv;
864 }
865
866 cloneable = do_QueryInterface(reader);
867 MOZ_ASSERT(cloneable && cloneable->GetCloneable());
868
869 rv = cloneable->Clone(getter_AddRefs(readerClone));
870 if (NS_WARN_IF(NS_FAILED(rv))) {
871 return rv;
872 }
873
874 nsCOMPtr<nsIEventTarget> target =
875 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
876 if (NS_WARN_IF(NS_FAILED(rv))) {
877 return rv;
878 }
879
880 rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
881 if (NS_WARN_IF(NS_FAILED(rv))) {
882 return rv;
883 }
884
885 readerClone.forget(aCloneOut);
886 reader.forget(aReplacementOut);
887
888 return NS_OK;
889 }
890
NS_MakeAsyncNonBlockingInputStream(already_AddRefed<nsIInputStream> aSource,nsIAsyncInputStream ** aAsyncInputStream,bool aCloseWhenDone,uint32_t aFlags,uint32_t aSegmentSize,uint32_t aSegmentCount)891 nsresult NS_MakeAsyncNonBlockingInputStream(
892 already_AddRefed<nsIInputStream> aSource,
893 nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone,
894 uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) {
895 nsCOMPtr<nsIInputStream> source = std::move(aSource);
896 if (NS_WARN_IF(!aAsyncInputStream)) {
897 return NS_ERROR_FAILURE;
898 }
899
900 bool nonBlocking = false;
901 nsresult rv = source->IsNonBlocking(&nonBlocking);
902 if (NS_WARN_IF(NS_FAILED(rv))) {
903 return rv;
904 }
905
906 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
907
908 if (nonBlocking && asyncStream) {
909 // This stream is perfect!
910 asyncStream.forget(aAsyncInputStream);
911 return NS_OK;
912 }
913
914 if (nonBlocking) {
915 // If the stream is non-blocking but not async, we wrap it.
916 return NonBlockingAsyncInputStream::Create(source.forget(),
917 aAsyncInputStream);
918 }
919
920 nsCOMPtr<nsIStreamTransportService> sts =
921 do_GetService(kStreamTransportServiceCID, &rv);
922 if (NS_WARN_IF(NS_FAILED(rv))) {
923 return rv;
924 }
925
926 nsCOMPtr<nsITransport> transport;
927 rv = sts->CreateInputTransport(source, aCloseWhenDone,
928 getter_AddRefs(transport));
929 if (NS_WARN_IF(NS_FAILED(rv))) {
930 return rv;
931 }
932
933 nsCOMPtr<nsIInputStream> wrapper;
934 rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount,
935 getter_AddRefs(wrapper));
936 if (NS_WARN_IF(NS_FAILED(rv))) {
937 return rv;
938 }
939
940 asyncStream = do_QueryInterface(wrapper);
941 MOZ_ASSERT(asyncStream);
942
943 asyncStream.forget(aAsyncInputStream);
944 return NS_OK;
945 }
946