1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "StreamFilterParent.h"
8
9 #include "mozilla/ScopeExit.h"
10 #include "mozilla/Unused.h"
11 #include "mozilla/dom/ContentParent.h"
12 #include "mozilla/net/ChannelEventQueue.h"
13 #include "nsHttpChannel.h"
14 #include "nsIChannel.h"
15 #include "nsIInputStream.h"
16 #include "nsITraceableChannel.h"
17 #include "nsProxyRelease.h"
18 #include "nsQueryObject.h"
19 #include "nsSocketTransportService2.h"
20 #include "nsStringStream.h"
21 #include "mozilla/net/DocumentChannelChild.h"
22 #include "nsIViewSourceChannel.h"
23
24 namespace mozilla {
25 namespace extensions {
26
27 /*****************************************************************************
28 * Event queueing helpers
29 *****************************************************************************/
30
31 using net::ChannelEvent;
32 using net::ChannelEventQueue;
33
34 namespace {
35
36 // Define some simple ChannelEvent sub-classes that store the appropriate
37 // EventTarget and delegate their Run methods to a wrapped Runnable or lambda
38 // function.
39
40 class ChannelEventWrapper : public ChannelEvent {
41 public:
ChannelEventWrapper(nsIEventTarget * aTarget)42 ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {}
43
GetEventTarget()44 already_AddRefed<nsIEventTarget> GetEventTarget() override {
45 return do_AddRef(mTarget);
46 }
47
48 protected:
49 ~ChannelEventWrapper() override = default;
50
51 private:
52 nsCOMPtr<nsIEventTarget> mTarget;
53 };
54
55 class ChannelEventFunction final : public ChannelEventWrapper {
56 public:
ChannelEventFunction(nsIEventTarget * aTarget,std::function<void ()> && aFunc)57 ChannelEventFunction(nsIEventTarget* aTarget, std::function<void()>&& aFunc)
58 : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {}
59
Run()60 void Run() override { mFunc(); }
61
62 protected:
63 ~ChannelEventFunction() override = default;
64
65 private:
66 std::function<void()> mFunc;
67 };
68
69 class ChannelEventRunnable final : public ChannelEventWrapper {
70 public:
ChannelEventRunnable(nsIEventTarget * aTarget,already_AddRefed<Runnable> aRunnable)71 ChannelEventRunnable(nsIEventTarget* aTarget,
72 already_AddRefed<Runnable> aRunnable)
73 : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {}
74
Run()75 void Run() override {
76 nsresult rv = mRunnable->Run();
77 Unused << NS_WARN_IF(NS_FAILED(rv));
78 }
79
80 protected:
81 ~ChannelEventRunnable() override = default;
82
83 private:
84 RefPtr<Runnable> mRunnable;
85 };
86
87 } // anonymous namespace
88
89 /*****************************************************************************
90 * Initialization
91 *****************************************************************************/
92
StreamFilterParent()93 StreamFilterParent::StreamFilterParent()
94 : mMainThread(GetCurrentThreadEventTarget()),
95 mIOThread(mMainThread),
96 mQueue(new ChannelEventQueue(static_cast<nsIStreamListener*>(this))),
97 mBufferMutex("StreamFilter buffer mutex"),
98 mReceivedStop(false),
99 mSentStop(false),
100 mContext(nullptr),
101 mOffset(0),
102 mState(State::Uninitialized) {}
103
~StreamFilterParent()104 StreamFilterParent::~StreamFilterParent() {
105 NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget());
106 NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget());
107 NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener",
108 mOrigListener.forget());
109 NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget());
110 }
111
Create(dom::ContentParent * aContentParent,uint64_t aChannelId,const nsAString & aAddonId)112 auto StreamFilterParent::Create(dom::ContentParent* aContentParent,
113 uint64_t aChannelId, const nsAString& aAddonId)
114 -> RefPtr<ChildEndpointPromise> {
115 AssertIsMainThread();
116
117 auto& webreq = WebRequestService::GetSingleton();
118
119 RefPtr<nsAtom> addonId = NS_Atomize(aAddonId);
120 nsCOMPtr<nsITraceableChannel> channel =
121 webreq.GetTraceableChannel(aChannelId, addonId, aContentParent);
122
123 RefPtr<mozilla::net::nsHttpChannel> chan = do_QueryObject(channel);
124 if (!chan) {
125 return ChildEndpointPromise::CreateAndReject(false, __func__);
126 }
127
128 // Disable alt-data for extension stream listeners.
129 nsCOMPtr<nsIHttpChannelInternal> internal(do_QueryObject(channel));
130 internal->DisableAltDataCache();
131
132 return chan->AttachStreamFilter(aContentParent ? aContentParent->OtherPid()
133 : base::GetCurrentProcId());
134 }
135
136 /* static */
Attach(nsIChannel * aChannel,ParentEndpoint && aEndpoint)137 void StreamFilterParent::Attach(nsIChannel* aChannel,
138 ParentEndpoint&& aEndpoint) {
139 auto self = MakeRefPtr<StreamFilterParent>();
140
141 self->ActorThread()->Dispatch(
142 NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind", self,
143 &StreamFilterParent::Bind,
144 std::move(aEndpoint)),
145 NS_DISPATCH_NORMAL);
146
147 self->Init(aChannel);
148
149 // IPC owns this reference now.
150 Unused << self.forget();
151 }
152
Bind(ParentEndpoint && aEndpoint)153 void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) {
154 aEndpoint.Bind(this);
155 }
156
Init(nsIChannel * aChannel)157 void StreamFilterParent::Init(nsIChannel* aChannel) {
158 mChannel = aChannel;
159
160 nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
161 if (MOZ_UNLIKELY(!traceable)) {
162 // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it.
163 nsCOMPtr<nsIViewSourceChannel> vsc = do_QueryInterface(aChannel);
164 if (vsc) {
165 traceable = do_QueryObject(vsc->GetInnerChannel());
166 // OnStartRequest etc. is passed the unwrapped channel, so update mChannel
167 // to prevent OnStartRequest from mistaking it for a redirect, which would
168 // close the filter.
169 mChannel = do_QueryObject(traceable);
170 }
171 // TODO bug 1683403: Replace assertion; Close StreamFilter instead.
172 MOZ_RELEASE_ASSERT(traceable);
173 }
174
175 nsresult rv = traceable->SetNewListener(this, getter_AddRefs(mOrigListener));
176 MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
177 }
178
179 /*****************************************************************************
180 * nsIThreadRetargetableStreamListener
181 *****************************************************************************/
182
183 NS_IMETHODIMP
CheckListenerChain()184 StreamFilterParent::CheckListenerChain() {
185 AssertIsMainThread();
186
187 nsCOMPtr<nsIThreadRetargetableStreamListener> trsl =
188 do_QueryInterface(mOrigListener);
189 if (trsl) {
190 return trsl->CheckListenerChain();
191 }
192 return NS_ERROR_FAILURE;
193 }
194
195 /*****************************************************************************
196 * Error handling
197 *****************************************************************************/
198
Broken()199 void StreamFilterParent::Broken() {
200 AssertIsActorThread();
201
202 switch (mState) {
203 case State::Initialized:
204 case State::TransferringData:
205 case State::Suspended: {
206 mState = State::Disconnecting;
207 RefPtr<StreamFilterParent> self(this);
208 RunOnMainThread(FUNC, [=] {
209 if (self->mChannel) {
210 self->mChannel->Cancel(NS_ERROR_FAILURE);
211 }
212 });
213
214 FinishDisconnect();
215 } break;
216
217 default:
218 break;
219 }
220 }
221
222 /*****************************************************************************
223 * State change requests
224 *****************************************************************************/
225
RecvClose()226 IPCResult StreamFilterParent::RecvClose() {
227 AssertIsActorThread();
228
229 mState = State::Closed;
230
231 if (!mSentStop) {
232 RefPtr<StreamFilterParent> self(this);
233 RunOnMainThread(FUNC, [=] {
234 nsresult rv = self->EmitStopRequest(NS_OK);
235 Unused << NS_WARN_IF(NS_FAILED(rv));
236 });
237 }
238
239 Unused << SendClosed();
240 Destroy();
241 return IPC_OK();
242 }
243
Destroy()244 void StreamFilterParent::Destroy() {
245 // Close the channel asynchronously so the actor is never destroyed before
246 // this message is fully processed.
247 ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this,
248 &StreamFilterParent::Close),
249 NS_DISPATCH_NORMAL);
250 }
251
RecvDestroy()252 IPCResult StreamFilterParent::RecvDestroy() {
253 AssertIsActorThread();
254 Destroy();
255 return IPC_OK();
256 }
257
RecvSuspend()258 IPCResult StreamFilterParent::RecvSuspend() {
259 AssertIsActorThread();
260
261 if (mState == State::TransferringData) {
262 RefPtr<StreamFilterParent> self(this);
263 RunOnMainThread(FUNC, [=] {
264 self->mChannel->Suspend();
265
266 RunOnActorThread(FUNC, [=] {
267 if (self->IPCActive()) {
268 self->mState = State::Suspended;
269 self->CheckResult(self->SendSuspended());
270 }
271 });
272 });
273 }
274 return IPC_OK();
275 }
276
RecvResume()277 IPCResult StreamFilterParent::RecvResume() {
278 AssertIsActorThread();
279
280 if (mState == State::Suspended) {
281 // Change state before resuming so incoming data is handled correctly
282 // immediately after resuming.
283 mState = State::TransferringData;
284
285 RefPtr<StreamFilterParent> self(this);
286 RunOnMainThread(FUNC, [=] {
287 self->mChannel->Resume();
288
289 RunOnActorThread(FUNC, [=] {
290 if (self->IPCActive()) {
291 self->CheckResult(self->SendResumed());
292 }
293 });
294 });
295 }
296 return IPC_OK();
297 }
RecvDisconnect()298 IPCResult StreamFilterParent::RecvDisconnect() {
299 AssertIsActorThread();
300
301 if (mState == State::Suspended) {
302 RefPtr<StreamFilterParent> self(this);
303 RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); });
304 } else if (mState != State::TransferringData) {
305 return IPC_OK();
306 }
307
308 mState = State::Disconnecting;
309 CheckResult(SendFlushData());
310 return IPC_OK();
311 }
312
RecvFlushedData()313 IPCResult StreamFilterParent::RecvFlushedData() {
314 AssertIsActorThread();
315
316 MOZ_ASSERT(mState == State::Disconnecting);
317
318 Destroy();
319
320 FinishDisconnect();
321 return IPC_OK();
322 }
323
FinishDisconnect()324 void StreamFilterParent::FinishDisconnect() {
325 RefPtr<StreamFilterParent> self(this);
326 RunOnIOThread(FUNC, [=] {
327 self->FlushBufferedData();
328
329 RunOnMainThread(FUNC, [=] {
330 if (self->mLoadGroup && !self->mDisconnected) {
331 Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK);
332 }
333 self->mDisconnected = true;
334 });
335
336 RunOnActorThread(FUNC, [=] {
337 if (self->mState != State::Closed) {
338 self->mState = State::Disconnected;
339 }
340 });
341 });
342 }
343
344 /*****************************************************************************
345 * Data output
346 *****************************************************************************/
347
RecvWrite(Data && aData)348 IPCResult StreamFilterParent::RecvWrite(Data&& aData) {
349 AssertIsActorThread();
350
351 RunOnIOThread(NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove", this,
352 &StreamFilterParent::WriteMove,
353 std::move(aData)));
354 return IPC_OK();
355 }
356
WriteMove(Data && aData)357 void StreamFilterParent::WriteMove(Data&& aData) {
358 nsresult rv = Write(aData);
359 Unused << NS_WARN_IF(NS_FAILED(rv));
360 }
361
Write(Data & aData)362 nsresult StreamFilterParent::Write(Data& aData) {
363 AssertIsIOThread();
364
365 nsCOMPtr<nsIInputStream> stream;
366 nsresult rv = NS_NewByteInputStream(
367 getter_AddRefs(stream),
368 MakeSpan(reinterpret_cast<char*>(aData.Elements()), aData.Length()),
369 NS_ASSIGNMENT_DEPEND);
370 NS_ENSURE_SUCCESS(rv, rv);
371
372 rv =
373 mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length());
374 NS_ENSURE_SUCCESS(rv, rv);
375
376 mOffset += aData.Length();
377 return NS_OK;
378 }
379
380 /*****************************************************************************
381 * nsIRequest
382 *****************************************************************************/
383
384 NS_IMETHODIMP
GetName(nsACString & aName)385 StreamFilterParent::GetName(nsACString& aName) {
386 AssertIsMainThread();
387 MOZ_ASSERT(mChannel);
388 return mChannel->GetName(aName);
389 }
390
391 NS_IMETHODIMP
GetStatus(nsresult * aStatus)392 StreamFilterParent::GetStatus(nsresult* aStatus) {
393 AssertIsMainThread();
394 MOZ_ASSERT(mChannel);
395 return mChannel->GetStatus(aStatus);
396 }
397
398 NS_IMETHODIMP
IsPending(bool * aIsPending)399 StreamFilterParent::IsPending(bool* aIsPending) {
400 switch (mState) {
401 case State::Initialized:
402 case State::TransferringData:
403 case State::Suspended:
404 *aIsPending = true;
405 break;
406 default:
407 *aIsPending = false;
408 }
409 return NS_OK;
410 }
411
412 NS_IMETHODIMP
Cancel(nsresult aResult)413 StreamFilterParent::Cancel(nsresult aResult) {
414 AssertIsMainThread();
415 MOZ_ASSERT(mChannel);
416 return mChannel->Cancel(aResult);
417 }
418
419 NS_IMETHODIMP
Suspend()420 StreamFilterParent::Suspend() {
421 AssertIsMainThread();
422 MOZ_ASSERT(mChannel);
423 return mChannel->Suspend();
424 }
425
426 NS_IMETHODIMP
Resume()427 StreamFilterParent::Resume() {
428 AssertIsMainThread();
429 MOZ_ASSERT(mChannel);
430 return mChannel->Resume();
431 }
432
433 NS_IMETHODIMP
GetLoadGroup(nsILoadGroup ** aLoadGroup)434 StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) {
435 *aLoadGroup = mLoadGroup;
436 return NS_OK;
437 }
438
439 NS_IMETHODIMP
SetLoadGroup(nsILoadGroup * aLoadGroup)440 StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) {
441 return NS_ERROR_NOT_IMPLEMENTED;
442 }
443
444 NS_IMETHODIMP
GetLoadFlags(nsLoadFlags * aLoadFlags)445 StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) {
446 AssertIsMainThread();
447 MOZ_ASSERT(mChannel);
448 MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags));
449 *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI;
450 return NS_OK;
451 }
452
453 NS_IMETHODIMP
SetLoadFlags(nsLoadFlags aLoadFlags)454 StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) {
455 AssertIsMainThread();
456 MOZ_ASSERT(mChannel);
457 return mChannel->SetLoadFlags(aLoadFlags);
458 }
459
460 NS_IMETHODIMP
GetTRRMode(nsIRequest::TRRMode * aTRRMode)461 StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
462 return GetTRRModeImpl(aTRRMode);
463 }
464
465 NS_IMETHODIMP
SetTRRMode(nsIRequest::TRRMode aTRRMode)466 StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
467 return SetTRRModeImpl(aTRRMode);
468 }
469
470 /*****************************************************************************
471 * nsIStreamListener
472 *****************************************************************************/
473
474 NS_IMETHODIMP
OnStartRequest(nsIRequest * aRequest)475 StreamFilterParent::OnStartRequest(nsIRequest* aRequest) {
476 AssertIsMainThread();
477
478 // Always reset mChannel if aRequest is different. Various calls in
479 // StreamFilterParent will use mChannel, but aRequest is *always* the
480 // right channel to use at this point.
481 //
482 // For ALL redirections, we will disconnect this listener. Extensions
483 // will create a new filter if they need it.
484 if (aRequest != mChannel) {
485 nsCOMPtr<nsIChannel> channel = do_QueryInterface(aRequest);
486 nsCOMPtr<nsILoadInfo> loadInfo = channel ? channel->LoadInfo() : nullptr;
487 mChannel = channel;
488
489 if (!(loadInfo &&
490 loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) {
491 mDisconnected = true;
492
493 RefPtr<StreamFilterParent> self(this);
494 RunOnActorThread(FUNC, [=] {
495 if (self->IPCActive()) {
496 self->mState = State::Disconnected;
497 CheckResult(
498 self->SendError(NS_LITERAL_CSTRING("Channel redirected")));
499 }
500 });
501 }
502 }
503
504 // Check if alterate cached data is being sent, if so we receive un-decoded
505 // data and we must disconnect the filter and send an error to the extension.
506 if (!mDisconnected) {
507 RefPtr<net::HttpBaseChannel> chan = do_QueryObject(aRequest);
508 if (chan && chan->IsDeliveringAltData()) {
509 mDisconnected = true;
510
511 RefPtr<StreamFilterParent> self(this);
512 RunOnActorThread(FUNC, [=] {
513 if (self->IPCActive()) {
514 self->mState = State::Disconnected;
515 CheckResult(self->SendError(
516 NS_LITERAL_CSTRING("Channel is delivering cached alt-data")));
517 }
518 });
519 }
520 }
521
522 if (!mDisconnected) {
523 Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup));
524 if (mLoadGroup) {
525 Unused << mLoadGroup->AddRequest(this, nullptr);
526 }
527 }
528
529 nsresult rv = mOrigListener->OnStartRequest(aRequest);
530
531 // Important: Do this only *after* running the next listener in the chain, so
532 // that we get the final delivery target after any retargeting that it may do.
533 if (nsCOMPtr<nsIThreadRetargetableRequest> req =
534 do_QueryInterface(aRequest)) {
535 nsCOMPtr<nsIEventTarget> thread;
536 Unused << req->GetDeliveryTarget(getter_AddRefs(thread));
537 if (thread) {
538 mIOThread = std::move(thread);
539 }
540 }
541
542 // Important: Do this *after* we have set the thread delivery target, or it is
543 // possible in rare circumstances for an extension to attempt to write data
544 // before the thread has been set up, even though there are several layers of
545 // asynchrony involved.
546 if (!mDisconnected) {
547 RefPtr<StreamFilterParent> self(this);
548 RunOnActorThread(FUNC, [=] {
549 if (self->IPCActive()) {
550 self->mState = State::TransferringData;
551 self->CheckResult(self->SendStartRequest());
552 }
553 });
554 }
555
556 return rv;
557 }
558
559 NS_IMETHODIMP
OnStopRequest(nsIRequest * aRequest,nsresult aStatusCode)560 StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) {
561 AssertIsMainThread();
562 MOZ_ASSERT(aRequest == mChannel);
563
564 mReceivedStop = true;
565 if (mDisconnected) {
566 return EmitStopRequest(aStatusCode);
567 }
568
569 RefPtr<StreamFilterParent> self(this);
570 RunOnActorThread(FUNC, [=] {
571 if (self->IPCActive()) {
572 self->CheckResult(self->SendStopRequest(aStatusCode));
573 } else if (self->mState != State::Disconnecting) {
574 // If we're currently disconnecting, then we'll emit a stop
575 // request at the end of that process. Otherwise we need to
576 // manually emit one here, since we won't be getting a response
577 // from the child.
578 RunOnMainThread(FUNC, [=] {
579 if (!self->mSentStop) {
580 self->EmitStopRequest(aStatusCode);
581 }
582 });
583 }
584 });
585 return NS_OK;
586 }
587
EmitStopRequest(nsresult aStatusCode)588 nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) {
589 AssertIsMainThread();
590 MOZ_ASSERT(!mSentStop);
591
592 mSentStop = true;
593 nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode);
594
595 if (mLoadGroup && !mDisconnected) {
596 Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode);
597 }
598
599 return rv;
600 }
601
602 /*****************************************************************************
603 * Incoming data handling
604 *****************************************************************************/
605
DoSendData(Data && aData)606 void StreamFilterParent::DoSendData(Data&& aData) {
607 AssertIsActorThread();
608
609 if (mState == State::TransferringData) {
610 CheckResult(SendData(aData));
611 }
612 }
613
614 NS_IMETHODIMP
OnDataAvailable(nsIRequest * aRequest,nsIInputStream * aInputStream,uint64_t aOffset,uint32_t aCount)615 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
616 nsIInputStream* aInputStream,
617 uint64_t aOffset, uint32_t aCount) {
618 AssertIsIOThread();
619
620 if (mDisconnected) {
621 // If we're offloading data in a thread pool, it's possible that we'll
622 // have buffered some additional data while waiting for the buffer to
623 // flush. So, if there's any buffered data left, flush that before we
624 // flush this incoming data.
625 //
626 // Note: When in the eDisconnected state, the buffer list is guaranteed
627 // never to be accessed by another thread during an OnDataAvailable call.
628 if (!mBufferedData.isEmpty()) {
629 FlushBufferedData();
630 }
631
632 mOffset += aCount;
633 return mOrigListener->OnDataAvailable(aRequest, aInputStream,
634 mOffset - aCount, aCount);
635 }
636
637 Data data;
638 data.SetLength(aCount);
639
640 uint32_t count;
641 nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()),
642 aCount, &count);
643 NS_ENSURE_SUCCESS(rv, rv);
644 NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
645
646 if (mState == State::Disconnecting) {
647 MutexAutoLock al(mBufferMutex);
648 BufferData(std::move(data));
649 } else if (mState == State::Closed) {
650 return NS_ERROR_FAILURE;
651 } else {
652 ActorThread()->Dispatch(
653 NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData", this,
654 &StreamFilterParent::DoSendData,
655 std::move(data)),
656 NS_DISPATCH_NORMAL);
657 }
658 return NS_OK;
659 }
660
FlushBufferedData()661 nsresult StreamFilterParent::FlushBufferedData() {
662 AssertIsIOThread();
663
664 // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
665 // to always run in the same thread, so it's possible for this function to
666 // run in parallel with OnDataAvailable.
667 MutexAutoLock al(mBufferMutex);
668
669 while (!mBufferedData.isEmpty()) {
670 UniquePtr<BufferedData> data(mBufferedData.popFirst());
671
672 nsresult rv = Write(data->mData);
673 NS_ENSURE_SUCCESS(rv, rv);
674 }
675
676 if (mReceivedStop && !mSentStop) {
677 RefPtr<StreamFilterParent> self(this);
678 RunOnMainThread(FUNC, [=] {
679 if (!mSentStop) {
680 nsresult rv = self->EmitStopRequest(NS_OK);
681 Unused << NS_WARN_IF(NS_FAILED(rv));
682 }
683 });
684 }
685
686 return NS_OK;
687 }
688
689 /*****************************************************************************
690 * Thread helpers
691 *****************************************************************************/
692
ActorThread()693 nsIEventTarget* StreamFilterParent::ActorThread() {
694 return net::gSocketTransportService;
695 }
696
IsActorThread()697 bool StreamFilterParent::IsActorThread() {
698 return ActorThread()->IsOnCurrentThread();
699 }
700
AssertIsActorThread()701 void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); }
702
IOThread()703 nsIEventTarget* StreamFilterParent::IOThread() { return mIOThread; }
704
IsIOThread()705 bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); }
706
AssertIsIOThread()707 void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); }
708
709 template <typename Function>
RunOnMainThread(const char * aName,Function && aFunc)710 void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) {
711 mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc)));
712 }
713
RunOnMainThread(already_AddRefed<Runnable> aRunnable)714 void StreamFilterParent::RunOnMainThread(already_AddRefed<Runnable> aRunnable) {
715 mQueue->RunOrEnqueue(
716 new ChannelEventRunnable(mMainThread, std::move(aRunnable)));
717 }
718
719 template <typename Function>
RunOnIOThread(const char * aName,Function && aFunc)720 void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) {
721 mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc)));
722 }
723
RunOnIOThread(already_AddRefed<Runnable> aRunnable)724 void StreamFilterParent::RunOnIOThread(already_AddRefed<Runnable> aRunnable) {
725 mQueue->RunOrEnqueue(
726 new ChannelEventRunnable(mIOThread, std::move(aRunnable)));
727 }
728
729 template <typename Function>
RunOnActorThread(const char * aName,Function && aFunc)730 void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) {
731 // We don't use mQueue for dispatch to the actor thread.
732 //
733 // The main thread and IO thread are used for dispatching events to the
734 // wrapped stream listener, and those events need to be processed
735 // consistently, in the order they were dispatched. An event dispatched to the
736 // main thread can't be run before events that were dispatched to the IO
737 // thread before it.
738 //
739 // Additionally, the IO thread is likely to be a thread pool, which means that
740 // without thread-safe queuing, it's possible for multiple events dispatched
741 // to it to be processed in parallel, or out of order.
742 //
743 // The actor thread, however, is always a serial event target. Its events are
744 // always processed in order, and events dispatched to the actor thread are
745 // independent of the events in the output event queue.
746 if (IsActorThread()) {
747 aFunc();
748 } else {
749 ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)),
750 NS_DISPATCH_NORMAL);
751 }
752 }
753
754 /*****************************************************************************
755 * Glue
756 *****************************************************************************/
757
ActorDestroy(ActorDestroyReason aWhy)758 void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) {
759 AssertIsActorThread();
760
761 if (mState != State::Disconnected && mState != State::Closed) {
762 Broken();
763 }
764 }
765
ActorDealloc()766 void StreamFilterParent::ActorDealloc() {
767 RefPtr<StreamFilterParent> self = dont_AddRef(this);
768 }
769
770 NS_INTERFACE_MAP_BEGIN(StreamFilterParent)
771 NS_INTERFACE_MAP_ENTRY(nsIStreamListener)
772 NS_INTERFACE_MAP_ENTRY(nsIRequestObserver)
773 NS_INTERFACE_MAP_ENTRY(nsIRequest)
774 NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener)
775 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener)
776 NS_INTERFACE_MAP_END
777
778 NS_IMPL_ADDREF(StreamFilterParent)
779 NS_IMPL_RELEASE(StreamFilterParent)
780
781 } // namespace extensions
782 } // namespace mozilla
783