1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "StreamFilterParent.h"
8
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/ContentParent.h"
11 #include "mozilla/net/ChannelEventQueue.h"
12 #include "nsHttpChannel.h"
13 #include "nsIChannel.h"
14 #include "nsIInputStream.h"
15 #include "nsITraceableChannel.h"
16 #include "nsProxyRelease.h"
17 #include "nsQueryObject.h"
18 #include "nsSocketTransportService2.h"
19 #include "nsStringStream.h"
20 #include "mozilla/net/DocumentChannelChild.h"
21 #include "nsIViewSourceChannel.h"
22
23 namespace mozilla {
24 namespace extensions {
25
26 /*****************************************************************************
27 * Event queueing helpers
28 *****************************************************************************/
29
30 using net::ChannelEvent;
31 using net::ChannelEventQueue;
32
33 namespace {
34
35 // Define some simple ChannelEvent sub-classes that store the appropriate
36 // EventTarget and delegate their Run methods to a wrapped Runnable or lambda
37 // function.
38
39 class ChannelEventWrapper : public ChannelEvent {
40 public:
ChannelEventWrapper(nsIEventTarget * aTarget)41 ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {}
42
GetEventTarget()43 already_AddRefed<nsIEventTarget> GetEventTarget() override {
44 return do_AddRef(mTarget);
45 }
46
47 protected:
48 ~ChannelEventWrapper() override = default;
49
50 private:
51 nsCOMPtr<nsIEventTarget> mTarget;
52 };
53
54 class ChannelEventFunction final : public ChannelEventWrapper {
55 public:
ChannelEventFunction(nsIEventTarget * aTarget,std::function<void ()> && aFunc)56 ChannelEventFunction(nsIEventTarget* aTarget, std::function<void()>&& aFunc)
57 : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {}
58
Run()59 void Run() override { mFunc(); }
60
61 protected:
62 ~ChannelEventFunction() override = default;
63
64 private:
65 std::function<void()> mFunc;
66 };
67
68 class ChannelEventRunnable final : public ChannelEventWrapper {
69 public:
ChannelEventRunnable(nsIEventTarget * aTarget,already_AddRefed<Runnable> aRunnable)70 ChannelEventRunnable(nsIEventTarget* aTarget,
71 already_AddRefed<Runnable> aRunnable)
72 : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {}
73
Run()74 void Run() override {
75 nsresult rv = mRunnable->Run();
76 Unused << NS_WARN_IF(NS_FAILED(rv));
77 }
78
79 protected:
80 ~ChannelEventRunnable() override = default;
81
82 private:
83 RefPtr<Runnable> mRunnable;
84 };
85
86 } // anonymous namespace
87
88 /*****************************************************************************
89 * Initialization
90 *****************************************************************************/
91
StreamFilterParent()92 StreamFilterParent::StreamFilterParent()
93 : mMainThread(GetCurrentEventTarget()),
94 mIOThread(mMainThread),
95 mQueue(new ChannelEventQueue(static_cast<nsIStreamListener*>(this))),
96 mBufferMutex("StreamFilter buffer mutex"),
97 mReceivedStop(false),
98 mSentStop(false),
99 mContext(nullptr),
100 mOffset(0),
101 mState(State::Uninitialized) {}
102
~StreamFilterParent()103 StreamFilterParent::~StreamFilterParent() {
104 NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget());
105 NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget());
106 NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener",
107 mOrigListener.forget());
108 NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget());
109 mQueue->NotifyReleasingOwner();
110 }
111
Create(dom::ContentParent * aContentParent,uint64_t aChannelId,const nsAString & aAddonId)112 auto StreamFilterParent::Create(dom::ContentParent* aContentParent,
113 uint64_t aChannelId, const nsAString& aAddonId)
114 -> RefPtr<ChildEndpointPromise> {
115 AssertIsMainThread();
116
117 auto& webreq = WebRequestService::GetSingleton();
118
119 RefPtr<nsAtom> addonId = NS_Atomize(aAddonId);
120 nsCOMPtr<nsITraceableChannel> channel =
121 webreq.GetTraceableChannel(aChannelId, addonId, aContentParent);
122
123 RefPtr<mozilla::net::nsHttpChannel> chan = do_QueryObject(channel);
124 if (!chan) {
125 return ChildEndpointPromise::CreateAndReject(false, __func__);
126 }
127
128 // Disable alt-data for extension stream listeners.
129 nsCOMPtr<nsIHttpChannelInternal> internal(do_QueryObject(channel));
130 internal->DisableAltDataCache();
131
132 return chan->AttachStreamFilter(aContentParent ? aContentParent->OtherPid()
133 : base::GetCurrentProcId());
134 }
135
136 /* static */
Attach(nsIChannel * aChannel,ParentEndpoint && aEndpoint)137 void StreamFilterParent::Attach(nsIChannel* aChannel,
138 ParentEndpoint&& aEndpoint) {
139 auto self = MakeRefPtr<StreamFilterParent>();
140
141 self->ActorThread()->Dispatch(
142 NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind", self,
143 &StreamFilterParent::Bind,
144 std::move(aEndpoint)),
145 NS_DISPATCH_NORMAL);
146
147 self->Init(aChannel);
148
149 // IPC owns this reference now.
150 Unused << self.forget();
151 }
152
Bind(ParentEndpoint && aEndpoint)153 void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) {
154 aEndpoint.Bind(this);
155 }
156
Init(nsIChannel * aChannel)157 void StreamFilterParent::Init(nsIChannel* aChannel) {
158 mChannel = aChannel;
159
160 nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
161 if (MOZ_UNLIKELY(!traceable)) {
162 // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it.
163 nsCOMPtr<nsIViewSourceChannel> vsc = do_QueryInterface(aChannel);
164 if (vsc) {
165 traceable = do_QueryObject(vsc->GetInnerChannel());
166 // OnStartRequest etc. is passed the unwrapped channel, so update mChannel
167 // to prevent OnStartRequest from mistaking it for a redirect, which would
168 // close the filter.
169 mChannel = do_QueryObject(traceable);
170 }
171 // TODO bug 1683403: Replace assertion; Close StreamFilter instead.
172 MOZ_RELEASE_ASSERT(traceable);
173 }
174
175 nsresult rv =
176 traceable->SetNewListener(this, /* aMustApplyContentConversion = */ true,
177 getter_AddRefs(mOrigListener));
178 MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
179 }
180
181 /*****************************************************************************
182 * nsIThreadRetargetableStreamListener
183 *****************************************************************************/
184
185 NS_IMETHODIMP
CheckListenerChain()186 StreamFilterParent::CheckListenerChain() {
187 AssertIsMainThread();
188
189 nsCOMPtr<nsIThreadRetargetableStreamListener> trsl =
190 do_QueryInterface(mOrigListener);
191 if (trsl) {
192 return trsl->CheckListenerChain();
193 }
194 return NS_ERROR_FAILURE;
195 }
196
197 /*****************************************************************************
198 * Error handling
199 *****************************************************************************/
200
Broken()201 void StreamFilterParent::Broken() {
202 AssertIsActorThread();
203
204 switch (mState) {
205 case State::Initialized:
206 case State::TransferringData:
207 case State::Suspended: {
208 mState = State::Disconnecting;
209 RefPtr<StreamFilterParent> self(this);
210 RunOnMainThread(FUNC, [=] {
211 if (self->mChannel) {
212 self->mChannel->Cancel(NS_ERROR_FAILURE);
213 }
214 });
215
216 FinishDisconnect();
217 } break;
218
219 default:
220 break;
221 }
222 }
223
224 /*****************************************************************************
225 * State change requests
226 *****************************************************************************/
227
RecvClose()228 IPCResult StreamFilterParent::RecvClose() {
229 AssertIsActorThread();
230
231 mState = State::Closed;
232
233 if (!mSentStop) {
234 RefPtr<StreamFilterParent> self(this);
235 RunOnMainThread(FUNC, [=] {
236 nsresult rv = self->EmitStopRequest(NS_OK);
237 Unused << NS_WARN_IF(NS_FAILED(rv));
238 });
239 }
240
241 Unused << SendClosed();
242 Destroy();
243 return IPC_OK();
244 }
245
Destroy()246 void StreamFilterParent::Destroy() {
247 // Close the channel asynchronously so the actor is never destroyed before
248 // this message is fully processed.
249 ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this,
250 &StreamFilterParent::Close),
251 NS_DISPATCH_NORMAL);
252 }
253
RecvDestroy()254 IPCResult StreamFilterParent::RecvDestroy() {
255 AssertIsActorThread();
256 Destroy();
257 return IPC_OK();
258 }
259
RecvSuspend()260 IPCResult StreamFilterParent::RecvSuspend() {
261 AssertIsActorThread();
262
263 if (mState == State::TransferringData) {
264 RefPtr<StreamFilterParent> self(this);
265 RunOnMainThread(FUNC, [=] {
266 self->mChannel->Suspend();
267
268 RunOnActorThread(FUNC, [=] {
269 if (self->IPCActive()) {
270 self->mState = State::Suspended;
271 self->CheckResult(self->SendSuspended());
272 }
273 });
274 });
275 }
276 return IPC_OK();
277 }
278
RecvResume()279 IPCResult StreamFilterParent::RecvResume() {
280 AssertIsActorThread();
281
282 if (mState == State::Suspended) {
283 // Change state before resuming so incoming data is handled correctly
284 // immediately after resuming.
285 mState = State::TransferringData;
286
287 RefPtr<StreamFilterParent> self(this);
288 RunOnMainThread(FUNC, [=] {
289 self->mChannel->Resume();
290
291 RunOnActorThread(FUNC, [=] {
292 if (self->IPCActive()) {
293 self->CheckResult(self->SendResumed());
294 }
295 });
296 });
297 }
298 return IPC_OK();
299 }
RecvDisconnect()300 IPCResult StreamFilterParent::RecvDisconnect() {
301 AssertIsActorThread();
302
303 if (mState == State::Suspended) {
304 RefPtr<StreamFilterParent> self(this);
305 RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); });
306 } else if (mState != State::TransferringData) {
307 return IPC_OK();
308 }
309
310 mState = State::Disconnecting;
311 CheckResult(SendFlushData());
312 return IPC_OK();
313 }
314
RecvFlushedData()315 IPCResult StreamFilterParent::RecvFlushedData() {
316 AssertIsActorThread();
317
318 MOZ_ASSERT(mState == State::Disconnecting);
319
320 Destroy();
321
322 FinishDisconnect();
323 return IPC_OK();
324 }
325
FinishDisconnect()326 void StreamFilterParent::FinishDisconnect() {
327 RefPtr<StreamFilterParent> self(this);
328 RunOnIOThread(FUNC, [=] {
329 self->FlushBufferedData();
330
331 RunOnMainThread(FUNC, [=] {
332 if (self->mReceivedStop && !self->mSentStop) {
333 nsresult rv = self->EmitStopRequest(NS_OK);
334 Unused << NS_WARN_IF(NS_FAILED(rv));
335 } else if (self->mLoadGroup && !self->mDisconnected) {
336 Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK);
337 }
338 self->mDisconnected = true;
339 });
340
341 RunOnActorThread(FUNC, [=] {
342 if (self->mState != State::Closed) {
343 self->mState = State::Disconnected;
344 }
345 });
346 });
347 }
348
349 /*****************************************************************************
350 * Data output
351 *****************************************************************************/
352
RecvWrite(Data && aData)353 IPCResult StreamFilterParent::RecvWrite(Data&& aData) {
354 AssertIsActorThread();
355
356 RunOnIOThread(NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove", this,
357 &StreamFilterParent::WriteMove,
358 std::move(aData)));
359 return IPC_OK();
360 }
361
WriteMove(Data && aData)362 void StreamFilterParent::WriteMove(Data&& aData) {
363 nsresult rv = Write(aData);
364 Unused << NS_WARN_IF(NS_FAILED(rv));
365 }
366
Write(Data & aData)367 nsresult StreamFilterParent::Write(Data& aData) {
368 AssertIsIOThread();
369
370 nsCOMPtr<nsIInputStream> stream;
371 nsresult rv = NS_NewByteInputStream(
372 getter_AddRefs(stream),
373 Span(reinterpret_cast<char*>(aData.Elements()), aData.Length()),
374 NS_ASSIGNMENT_DEPEND);
375 NS_ENSURE_SUCCESS(rv, rv);
376
377 rv =
378 mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length());
379 NS_ENSURE_SUCCESS(rv, rv);
380
381 mOffset += aData.Length();
382 return NS_OK;
383 }
384
385 /*****************************************************************************
386 * nsIRequest
387 *****************************************************************************/
388
389 NS_IMETHODIMP
GetName(nsACString & aName)390 StreamFilterParent::GetName(nsACString& aName) {
391 AssertIsMainThread();
392 MOZ_ASSERT(mChannel);
393 return mChannel->GetName(aName);
394 }
395
396 NS_IMETHODIMP
GetStatus(nsresult * aStatus)397 StreamFilterParent::GetStatus(nsresult* aStatus) {
398 AssertIsMainThread();
399 MOZ_ASSERT(mChannel);
400 return mChannel->GetStatus(aStatus);
401 }
402
403 NS_IMETHODIMP
IsPending(bool * aIsPending)404 StreamFilterParent::IsPending(bool* aIsPending) {
405 switch (mState) {
406 case State::Initialized:
407 case State::TransferringData:
408 case State::Suspended:
409 *aIsPending = true;
410 break;
411 default:
412 *aIsPending = false;
413 }
414 return NS_OK;
415 }
416
417 NS_IMETHODIMP
Cancel(nsresult aResult)418 StreamFilterParent::Cancel(nsresult aResult) {
419 AssertIsMainThread();
420 MOZ_ASSERT(mChannel);
421 return mChannel->Cancel(aResult);
422 }
423
424 NS_IMETHODIMP
Suspend()425 StreamFilterParent::Suspend() {
426 AssertIsMainThread();
427 MOZ_ASSERT(mChannel);
428 return mChannel->Suspend();
429 }
430
431 NS_IMETHODIMP
Resume()432 StreamFilterParent::Resume() {
433 AssertIsMainThread();
434 MOZ_ASSERT(mChannel);
435 return mChannel->Resume();
436 }
437
438 NS_IMETHODIMP
GetLoadGroup(nsILoadGroup ** aLoadGroup)439 StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) {
440 *aLoadGroup = mLoadGroup;
441 return NS_OK;
442 }
443
444 NS_IMETHODIMP
SetLoadGroup(nsILoadGroup * aLoadGroup)445 StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) {
446 return NS_ERROR_NOT_IMPLEMENTED;
447 }
448
449 NS_IMETHODIMP
GetLoadFlags(nsLoadFlags * aLoadFlags)450 StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) {
451 AssertIsMainThread();
452 MOZ_ASSERT(mChannel);
453 MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags));
454 *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI;
455 return NS_OK;
456 }
457
458 NS_IMETHODIMP
SetLoadFlags(nsLoadFlags aLoadFlags)459 StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) {
460 AssertIsMainThread();
461 MOZ_ASSERT(mChannel);
462 return mChannel->SetLoadFlags(aLoadFlags);
463 }
464
465 NS_IMETHODIMP
GetTRRMode(nsIRequest::TRRMode * aTRRMode)466 StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
467 return GetTRRModeImpl(aTRRMode);
468 }
469
470 NS_IMETHODIMP
SetTRRMode(nsIRequest::TRRMode aTRRMode)471 StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
472 return SetTRRModeImpl(aTRRMode);
473 }
474
475 /*****************************************************************************
476 * nsIStreamListener
477 *****************************************************************************/
478
479 NS_IMETHODIMP
OnStartRequest(nsIRequest * aRequest)480 StreamFilterParent::OnStartRequest(nsIRequest* aRequest) {
481 AssertIsMainThread();
482
483 // Always reset mChannel if aRequest is different. Various calls in
484 // StreamFilterParent will use mChannel, but aRequest is *always* the
485 // right channel to use at this point.
486 //
487 // For ALL redirections, we will disconnect this listener. Extensions
488 // will create a new filter if they need it.
489 if (aRequest != mChannel) {
490 nsCOMPtr<nsIChannel> channel = do_QueryInterface(aRequest);
491 nsCOMPtr<nsILoadInfo> loadInfo = channel ? channel->LoadInfo() : nullptr;
492 mChannel = channel;
493
494 if (!(loadInfo &&
495 loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) {
496 mDisconnected = true;
497 mDisconnectedByOnStartRequest = true;
498
499 RefPtr<StreamFilterParent> self(this);
500 RunOnActorThread(FUNC, [=] {
501 if (self->IPCActive()) {
502 self->mState = State::Disconnected;
503 CheckResult(self->SendError("Channel redirected"_ns));
504 }
505 });
506 }
507 }
508
509 // Check if alterate cached data is being sent, if so we receive un-decoded
510 // data and we must disconnect the filter and send an error to the extension.
511 if (!mDisconnected) {
512 RefPtr<net::HttpBaseChannel> chan = do_QueryObject(aRequest);
513 if (chan && chan->IsDeliveringAltData()) {
514 mDisconnected = true;
515 mDisconnectedByOnStartRequest = true;
516
517 RefPtr<StreamFilterParent> self(this);
518 RunOnActorThread(FUNC, [=] {
519 if (self->IPCActive()) {
520 self->mState = State::Disconnected;
521 CheckResult(
522 self->SendError("Channel is delivering cached alt-data"_ns));
523 }
524 });
525 }
526 }
527
528 if (!mDisconnected) {
529 Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup));
530 if (mLoadGroup) {
531 Unused << mLoadGroup->AddRequest(this, nullptr);
532 }
533 }
534
535 nsresult rv = mOrigListener->OnStartRequest(aRequest);
536
537 // Important: Do this only *after* running the next listener in the chain, so
538 // that we get the final delivery target after any retargeting that it may do.
539 if (nsCOMPtr<nsIThreadRetargetableRequest> req =
540 do_QueryInterface(aRequest)) {
541 nsCOMPtr<nsIEventTarget> thread;
542 Unused << req->GetDeliveryTarget(getter_AddRefs(thread));
543 if (thread) {
544 mIOThread = std::move(thread);
545 }
546 }
547
548 // Important: Do this *after* we have set the thread delivery target, or it is
549 // possible in rare circumstances for an extension to attempt to write data
550 // before the thread has been set up, even though there are several layers of
551 // asynchrony involved.
552 if (!mDisconnected) {
553 RefPtr<StreamFilterParent> self(this);
554 RunOnActorThread(FUNC, [=] {
555 if (self->IPCActive()) {
556 self->mState = State::TransferringData;
557 self->CheckResult(self->SendStartRequest());
558 }
559 });
560 }
561
562 return rv;
563 }
564
565 NS_IMETHODIMP
OnStopRequest(nsIRequest * aRequest,nsresult aStatusCode)566 StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) {
567 AssertIsMainThread();
568 MOZ_ASSERT(aRequest == mChannel);
569
570 mReceivedStop = true;
571 if (mDisconnected) {
572 return EmitStopRequest(aStatusCode);
573 }
574
575 RefPtr<StreamFilterParent> self(this);
576 RunOnActorThread(FUNC, [=] {
577 if (self->IPCActive()) {
578 self->CheckResult(self->SendStopRequest(aStatusCode));
579 } else if (self->mState != State::Disconnecting) {
580 // If we're currently disconnecting, then we'll emit a stop
581 // request at the end of that process. Otherwise we need to
582 // manually emit one here, since we won't be getting a response
583 // from the child.
584 RunOnMainThread(FUNC, [=] {
585 if (!self->mSentStop) {
586 self->EmitStopRequest(aStatusCode);
587 }
588 });
589 }
590 });
591 return NS_OK;
592 }
593
EmitStopRequest(nsresult aStatusCode)594 nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) {
595 AssertIsMainThread();
596 MOZ_ASSERT(!mSentStop);
597
598 mSentStop = true;
599 nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode);
600
601 if (mLoadGroup && !mDisconnected) {
602 Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode);
603 }
604
605 return rv;
606 }
607
608 /*****************************************************************************
609 * Incoming data handling
610 *****************************************************************************/
611
DoSendData(Data && aData)612 void StreamFilterParent::DoSendData(Data&& aData) {
613 AssertIsActorThread();
614
615 if (mState == State::TransferringData) {
616 CheckResult(SendData(aData));
617 }
618 }
619
620 NS_IMETHODIMP
OnDataAvailable(nsIRequest * aRequest,nsIInputStream * aInputStream,uint64_t aOffset,uint32_t aCount)621 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
622 nsIInputStream* aInputStream,
623 uint64_t aOffset, uint32_t aCount) {
624 AssertIsIOThread();
625
626 if (mDisconnectedByOnStartRequest || mState == State::Disconnected) {
627 // If we're offloading data in a thread pool, it's possible that we'll
628 // have buffered some additional data while waiting for the buffer to
629 // flush. So, if there's any buffered data left, flush that before we
630 // flush this incoming data.
631 //
632 // Note: When in the eDisconnected state, the buffer list is guaranteed
633 // never to be accessed by another thread during an OnDataAvailable call.
634 if (!mBufferedData.isEmpty()) {
635 FlushBufferedData();
636 }
637
638 mOffset += aCount;
639 return mOrigListener->OnDataAvailable(aRequest, aInputStream,
640 mOffset - aCount, aCount);
641 }
642
643 Data data;
644 data.SetLength(aCount);
645
646 uint32_t count;
647 nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()),
648 aCount, &count);
649 NS_ENSURE_SUCCESS(rv, rv);
650 NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
651
652 if (mState == State::Disconnecting) {
653 MutexAutoLock al(mBufferMutex);
654 BufferData(std::move(data));
655 } else if (mState == State::Closed) {
656 return NS_ERROR_FAILURE;
657 } else {
658 ActorThread()->Dispatch(
659 NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData", this,
660 &StreamFilterParent::DoSendData,
661 std::move(data)),
662 NS_DISPATCH_NORMAL);
663 }
664 return NS_OK;
665 }
666
FlushBufferedData()667 nsresult StreamFilterParent::FlushBufferedData() {
668 AssertIsIOThread();
669
670 // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
671 // to always run in the same thread, so it's possible for this function to
672 // run in parallel with OnDataAvailable.
673 MutexAutoLock al(mBufferMutex);
674
675 while (!mBufferedData.isEmpty()) {
676 UniquePtr<BufferedData> data(mBufferedData.popFirst());
677
678 nsresult rv = Write(data->mData);
679 NS_ENSURE_SUCCESS(rv, rv);
680 }
681
682 return NS_OK;
683 }
684
685 /*****************************************************************************
686 * Thread helpers
687 *****************************************************************************/
688
ActorThread()689 nsIEventTarget* StreamFilterParent::ActorThread() {
690 return net::gSocketTransportService;
691 }
692
IsActorThread()693 bool StreamFilterParent::IsActorThread() {
694 return ActorThread()->IsOnCurrentThread();
695 }
696
AssertIsActorThread()697 void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); }
698
IOThread()699 nsIEventTarget* StreamFilterParent::IOThread() { return mIOThread; }
700
IsIOThread()701 bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); }
702
AssertIsIOThread()703 void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); }
704
705 template <typename Function>
RunOnMainThread(const char * aName,Function && aFunc)706 void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) {
707 mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc)));
708 }
709
RunOnMainThread(already_AddRefed<Runnable> aRunnable)710 void StreamFilterParent::RunOnMainThread(already_AddRefed<Runnable> aRunnable) {
711 mQueue->RunOrEnqueue(
712 new ChannelEventRunnable(mMainThread, std::move(aRunnable)));
713 }
714
715 template <typename Function>
RunOnIOThread(const char * aName,Function && aFunc)716 void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) {
717 mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc)));
718 }
719
RunOnIOThread(already_AddRefed<Runnable> aRunnable)720 void StreamFilterParent::RunOnIOThread(already_AddRefed<Runnable> aRunnable) {
721 mQueue->RunOrEnqueue(
722 new ChannelEventRunnable(mIOThread, std::move(aRunnable)));
723 }
724
725 template <typename Function>
RunOnActorThread(const char * aName,Function && aFunc)726 void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) {
727 // We don't use mQueue for dispatch to the actor thread.
728 //
729 // The main thread and IO thread are used for dispatching events to the
730 // wrapped stream listener, and those events need to be processed
731 // consistently, in the order they were dispatched. An event dispatched to the
732 // main thread can't be run before events that were dispatched to the IO
733 // thread before it.
734 //
735 // Additionally, the IO thread is likely to be a thread pool, which means that
736 // without thread-safe queuing, it's possible for multiple events dispatched
737 // to it to be processed in parallel, or out of order.
738 //
739 // The actor thread, however, is always a serial event target. Its events are
740 // always processed in order, and events dispatched to the actor thread are
741 // independent of the events in the output event queue.
742 if (IsActorThread()) {
743 aFunc();
744 } else {
745 ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)),
746 NS_DISPATCH_NORMAL);
747 }
748 }
749
750 /*****************************************************************************
751 * Glue
752 *****************************************************************************/
753
ActorDestroy(ActorDestroyReason aWhy)754 void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) {
755 AssertIsActorThread();
756
757 if (mState != State::Disconnected && mState != State::Closed) {
758 Broken();
759 }
760 }
761
ActorDealloc()762 void StreamFilterParent::ActorDealloc() {
763 RefPtr<StreamFilterParent> self = dont_AddRef(this);
764 }
765
766 NS_INTERFACE_MAP_BEGIN(StreamFilterParent)
767 NS_INTERFACE_MAP_ENTRY(nsIStreamListener)
768 NS_INTERFACE_MAP_ENTRY(nsIRequestObserver)
769 NS_INTERFACE_MAP_ENTRY(nsIRequest)
770 NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener)
771 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener)
772 NS_INTERFACE_MAP_END
773
774 NS_IMPL_ADDREF(StreamFilterParent)
775 NS_IMPL_RELEASE(StreamFilterParent)
776
777 } // namespace extensions
778 } // namespace mozilla
779