1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "StreamFilterParent.h"
8 
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/ContentParent.h"
11 #include "mozilla/net/ChannelEventQueue.h"
12 #include "nsHttpChannel.h"
13 #include "nsIChannel.h"
14 #include "nsIInputStream.h"
15 #include "nsITraceableChannel.h"
16 #include "nsProxyRelease.h"
17 #include "nsQueryObject.h"
18 #include "nsSocketTransportService2.h"
19 #include "nsStringStream.h"
20 #include "mozilla/net/DocumentChannelChild.h"
21 #include "nsIViewSourceChannel.h"
22 
23 namespace mozilla {
24 namespace extensions {
25 
26 /*****************************************************************************
27  * Event queueing helpers
28  *****************************************************************************/
29 
30 using net::ChannelEvent;
31 using net::ChannelEventQueue;
32 
33 namespace {
34 
35 // Define some simple ChannelEvent sub-classes that store the appropriate
36 // EventTarget and delegate their Run methods to a wrapped Runnable or lambda
37 // function.
38 
39 class ChannelEventWrapper : public ChannelEvent {
40  public:
ChannelEventWrapper(nsIEventTarget * aTarget)41   ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {}
42 
GetEventTarget()43   already_AddRefed<nsIEventTarget> GetEventTarget() override {
44     return do_AddRef(mTarget);
45   }
46 
47  protected:
48   ~ChannelEventWrapper() override = default;
49 
50  private:
51   nsCOMPtr<nsIEventTarget> mTarget;
52 };
53 
54 class ChannelEventFunction final : public ChannelEventWrapper {
55  public:
ChannelEventFunction(nsIEventTarget * aTarget,std::function<void ()> && aFunc)56   ChannelEventFunction(nsIEventTarget* aTarget, std::function<void()>&& aFunc)
57       : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {}
58 
Run()59   void Run() override { mFunc(); }
60 
61  protected:
62   ~ChannelEventFunction() override = default;
63 
64  private:
65   std::function<void()> mFunc;
66 };
67 
68 class ChannelEventRunnable final : public ChannelEventWrapper {
69  public:
ChannelEventRunnable(nsIEventTarget * aTarget,already_AddRefed<Runnable> aRunnable)70   ChannelEventRunnable(nsIEventTarget* aTarget,
71                        already_AddRefed<Runnable> aRunnable)
72       : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {}
73 
Run()74   void Run() override {
75     nsresult rv = mRunnable->Run();
76     Unused << NS_WARN_IF(NS_FAILED(rv));
77   }
78 
79  protected:
80   ~ChannelEventRunnable() override = default;
81 
82  private:
83   RefPtr<Runnable> mRunnable;
84 };
85 
86 }  // anonymous namespace
87 
88 /*****************************************************************************
89  * Initialization
90  *****************************************************************************/
91 
StreamFilterParent()92 StreamFilterParent::StreamFilterParent()
93     : mMainThread(GetCurrentEventTarget()),
94       mIOThread(mMainThread),
95       mQueue(new ChannelEventQueue(static_cast<nsIStreamListener*>(this))),
96       mBufferMutex("StreamFilter buffer mutex"),
97       mReceivedStop(false),
98       mSentStop(false),
99       mContext(nullptr),
100       mOffset(0),
101       mState(State::Uninitialized) {}
102 
~StreamFilterParent()103 StreamFilterParent::~StreamFilterParent() {
104   NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget());
105   NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget());
106   NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener",
107                          mOrigListener.forget());
108   NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget());
109   mQueue->NotifyReleasingOwner();
110 }
111 
Create(dom::ContentParent * aContentParent,uint64_t aChannelId,const nsAString & aAddonId)112 auto StreamFilterParent::Create(dom::ContentParent* aContentParent,
113                                 uint64_t aChannelId, const nsAString& aAddonId)
114     -> RefPtr<ChildEndpointPromise> {
115   AssertIsMainThread();
116 
117   auto& webreq = WebRequestService::GetSingleton();
118 
119   RefPtr<nsAtom> addonId = NS_Atomize(aAddonId);
120   nsCOMPtr<nsITraceableChannel> channel =
121       webreq.GetTraceableChannel(aChannelId, addonId, aContentParent);
122 
123   RefPtr<mozilla::net::nsHttpChannel> chan = do_QueryObject(channel);
124   if (!chan) {
125     return ChildEndpointPromise::CreateAndReject(false, __func__);
126   }
127 
128   // Disable alt-data for extension stream listeners.
129   nsCOMPtr<nsIHttpChannelInternal> internal(do_QueryObject(channel));
130   internal->DisableAltDataCache();
131 
132   return chan->AttachStreamFilter(aContentParent ? aContentParent->OtherPid()
133                                                  : base::GetCurrentProcId());
134 }
135 
136 /* static */
Attach(nsIChannel * aChannel,ParentEndpoint && aEndpoint)137 void StreamFilterParent::Attach(nsIChannel* aChannel,
138                                 ParentEndpoint&& aEndpoint) {
139   auto self = MakeRefPtr<StreamFilterParent>();
140 
141   self->ActorThread()->Dispatch(
142       NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind", self,
143                                           &StreamFilterParent::Bind,
144                                           std::move(aEndpoint)),
145       NS_DISPATCH_NORMAL);
146 
147   self->Init(aChannel);
148 
149   // IPC owns this reference now.
150   Unused << self.forget();
151 }
152 
Bind(ParentEndpoint && aEndpoint)153 void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) {
154   aEndpoint.Bind(this);
155 }
156 
Init(nsIChannel * aChannel)157 void StreamFilterParent::Init(nsIChannel* aChannel) {
158   mChannel = aChannel;
159 
160   nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
161   if (MOZ_UNLIKELY(!traceable)) {
162     // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it.
163     nsCOMPtr<nsIViewSourceChannel> vsc = do_QueryInterface(aChannel);
164     if (vsc) {
165       traceable = do_QueryObject(vsc->GetInnerChannel());
166       // OnStartRequest etc. is passed the unwrapped channel, so update mChannel
167       // to prevent OnStartRequest from mistaking it for a redirect, which would
168       // close the filter.
169       mChannel = do_QueryObject(traceable);
170     }
171     // TODO bug 1683403: Replace assertion; Close StreamFilter instead.
172     MOZ_RELEASE_ASSERT(traceable);
173   }
174 
175   nsresult rv =
176       traceable->SetNewListener(this, /* aMustApplyContentConversion = */ true,
177                                 getter_AddRefs(mOrigListener));
178   MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
179 }
180 
181 /*****************************************************************************
182  * nsIThreadRetargetableStreamListener
183  *****************************************************************************/
184 
185 NS_IMETHODIMP
CheckListenerChain()186 StreamFilterParent::CheckListenerChain() {
187   AssertIsMainThread();
188 
189   nsCOMPtr<nsIThreadRetargetableStreamListener> trsl =
190       do_QueryInterface(mOrigListener);
191   if (trsl) {
192     return trsl->CheckListenerChain();
193   }
194   return NS_ERROR_FAILURE;
195 }
196 
197 /*****************************************************************************
198  * Error handling
199  *****************************************************************************/
200 
Broken()201 void StreamFilterParent::Broken() {
202   AssertIsActorThread();
203 
204   switch (mState) {
205     case State::Initialized:
206     case State::TransferringData:
207     case State::Suspended: {
208       mState = State::Disconnecting;
209       RefPtr<StreamFilterParent> self(this);
210       RunOnMainThread(FUNC, [=] {
211         if (self->mChannel) {
212           self->mChannel->Cancel(NS_ERROR_FAILURE);
213         }
214       });
215 
216       FinishDisconnect();
217     } break;
218 
219     default:
220       break;
221   }
222 }
223 
224 /*****************************************************************************
225  * State change requests
226  *****************************************************************************/
227 
RecvClose()228 IPCResult StreamFilterParent::RecvClose() {
229   AssertIsActorThread();
230 
231   mState = State::Closed;
232 
233   if (!mSentStop) {
234     RefPtr<StreamFilterParent> self(this);
235     RunOnMainThread(FUNC, [=] {
236       nsresult rv = self->EmitStopRequest(NS_OK);
237       Unused << NS_WARN_IF(NS_FAILED(rv));
238     });
239   }
240 
241   Unused << SendClosed();
242   Destroy();
243   return IPC_OK();
244 }
245 
Destroy()246 void StreamFilterParent::Destroy() {
247   // Close the channel asynchronously so the actor is never destroyed before
248   // this message is fully processed.
249   ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this,
250                                             &StreamFilterParent::Close),
251                           NS_DISPATCH_NORMAL);
252 }
253 
RecvDestroy()254 IPCResult StreamFilterParent::RecvDestroy() {
255   AssertIsActorThread();
256   Destroy();
257   return IPC_OK();
258 }
259 
RecvSuspend()260 IPCResult StreamFilterParent::RecvSuspend() {
261   AssertIsActorThread();
262 
263   if (mState == State::TransferringData) {
264     RefPtr<StreamFilterParent> self(this);
265     RunOnMainThread(FUNC, [=] {
266       self->mChannel->Suspend();
267 
268       RunOnActorThread(FUNC, [=] {
269         if (self->IPCActive()) {
270           self->mState = State::Suspended;
271           self->CheckResult(self->SendSuspended());
272         }
273       });
274     });
275   }
276   return IPC_OK();
277 }
278 
RecvResume()279 IPCResult StreamFilterParent::RecvResume() {
280   AssertIsActorThread();
281 
282   if (mState == State::Suspended) {
283     // Change state before resuming so incoming data is handled correctly
284     // immediately after resuming.
285     mState = State::TransferringData;
286 
287     RefPtr<StreamFilterParent> self(this);
288     RunOnMainThread(FUNC, [=] {
289       self->mChannel->Resume();
290 
291       RunOnActorThread(FUNC, [=] {
292         if (self->IPCActive()) {
293           self->CheckResult(self->SendResumed());
294         }
295       });
296     });
297   }
298   return IPC_OK();
299 }
RecvDisconnect()300 IPCResult StreamFilterParent::RecvDisconnect() {
301   AssertIsActorThread();
302 
303   if (mState == State::Suspended) {
304     RefPtr<StreamFilterParent> self(this);
305     RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); });
306   } else if (mState != State::TransferringData) {
307     return IPC_OK();
308   }
309 
310   mState = State::Disconnecting;
311   CheckResult(SendFlushData());
312   return IPC_OK();
313 }
314 
RecvFlushedData()315 IPCResult StreamFilterParent::RecvFlushedData() {
316   AssertIsActorThread();
317 
318   MOZ_ASSERT(mState == State::Disconnecting);
319 
320   Destroy();
321 
322   FinishDisconnect();
323   return IPC_OK();
324 }
325 
FinishDisconnect()326 void StreamFilterParent::FinishDisconnect() {
327   RefPtr<StreamFilterParent> self(this);
328   RunOnIOThread(FUNC, [=] {
329     self->FlushBufferedData();
330 
331     RunOnMainThread(FUNC, [=] {
332       if (self->mReceivedStop && !self->mSentStop) {
333         nsresult rv = self->EmitStopRequest(NS_OK);
334         Unused << NS_WARN_IF(NS_FAILED(rv));
335       } else if (self->mLoadGroup && !self->mDisconnected) {
336         Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK);
337       }
338       self->mDisconnected = true;
339     });
340 
341     RunOnActorThread(FUNC, [=] {
342       if (self->mState != State::Closed) {
343         self->mState = State::Disconnected;
344       }
345     });
346   });
347 }
348 
349 /*****************************************************************************
350  * Data output
351  *****************************************************************************/
352 
RecvWrite(Data && aData)353 IPCResult StreamFilterParent::RecvWrite(Data&& aData) {
354   AssertIsActorThread();
355 
356   RunOnIOThread(NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove", this,
357                                           &StreamFilterParent::WriteMove,
358                                           std::move(aData)));
359   return IPC_OK();
360 }
361 
WriteMove(Data && aData)362 void StreamFilterParent::WriteMove(Data&& aData) {
363   nsresult rv = Write(aData);
364   Unused << NS_WARN_IF(NS_FAILED(rv));
365 }
366 
Write(Data & aData)367 nsresult StreamFilterParent::Write(Data& aData) {
368   AssertIsIOThread();
369 
370   nsCOMPtr<nsIInputStream> stream;
371   nsresult rv = NS_NewByteInputStream(
372       getter_AddRefs(stream),
373       Span(reinterpret_cast<char*>(aData.Elements()), aData.Length()),
374       NS_ASSIGNMENT_DEPEND);
375   NS_ENSURE_SUCCESS(rv, rv);
376 
377   rv =
378       mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length());
379   NS_ENSURE_SUCCESS(rv, rv);
380 
381   mOffset += aData.Length();
382   return NS_OK;
383 }
384 
385 /*****************************************************************************
386  * nsIRequest
387  *****************************************************************************/
388 
389 NS_IMETHODIMP
GetName(nsACString & aName)390 StreamFilterParent::GetName(nsACString& aName) {
391   AssertIsMainThread();
392   MOZ_ASSERT(mChannel);
393   return mChannel->GetName(aName);
394 }
395 
396 NS_IMETHODIMP
GetStatus(nsresult * aStatus)397 StreamFilterParent::GetStatus(nsresult* aStatus) {
398   AssertIsMainThread();
399   MOZ_ASSERT(mChannel);
400   return mChannel->GetStatus(aStatus);
401 }
402 
403 NS_IMETHODIMP
IsPending(bool * aIsPending)404 StreamFilterParent::IsPending(bool* aIsPending) {
405   switch (mState) {
406     case State::Initialized:
407     case State::TransferringData:
408     case State::Suspended:
409       *aIsPending = true;
410       break;
411     default:
412       *aIsPending = false;
413   }
414   return NS_OK;
415 }
416 
417 NS_IMETHODIMP
Cancel(nsresult aResult)418 StreamFilterParent::Cancel(nsresult aResult) {
419   AssertIsMainThread();
420   MOZ_ASSERT(mChannel);
421   return mChannel->Cancel(aResult);
422 }
423 
424 NS_IMETHODIMP
Suspend()425 StreamFilterParent::Suspend() {
426   AssertIsMainThread();
427   MOZ_ASSERT(mChannel);
428   return mChannel->Suspend();
429 }
430 
431 NS_IMETHODIMP
Resume()432 StreamFilterParent::Resume() {
433   AssertIsMainThread();
434   MOZ_ASSERT(mChannel);
435   return mChannel->Resume();
436 }
437 
438 NS_IMETHODIMP
GetLoadGroup(nsILoadGroup ** aLoadGroup)439 StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) {
440   *aLoadGroup = mLoadGroup;
441   return NS_OK;
442 }
443 
444 NS_IMETHODIMP
SetLoadGroup(nsILoadGroup * aLoadGroup)445 StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) {
446   return NS_ERROR_NOT_IMPLEMENTED;
447 }
448 
449 NS_IMETHODIMP
GetLoadFlags(nsLoadFlags * aLoadFlags)450 StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) {
451   AssertIsMainThread();
452   MOZ_ASSERT(mChannel);
453   MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags));
454   *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI;
455   return NS_OK;
456 }
457 
458 NS_IMETHODIMP
SetLoadFlags(nsLoadFlags aLoadFlags)459 StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) {
460   AssertIsMainThread();
461   MOZ_ASSERT(mChannel);
462   return mChannel->SetLoadFlags(aLoadFlags);
463 }
464 
465 NS_IMETHODIMP
GetTRRMode(nsIRequest::TRRMode * aTRRMode)466 StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
467   return GetTRRModeImpl(aTRRMode);
468 }
469 
470 NS_IMETHODIMP
SetTRRMode(nsIRequest::TRRMode aTRRMode)471 StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
472   return SetTRRModeImpl(aTRRMode);
473 }
474 
475 /*****************************************************************************
476  * nsIStreamListener
477  *****************************************************************************/
478 
479 NS_IMETHODIMP
OnStartRequest(nsIRequest * aRequest)480 StreamFilterParent::OnStartRequest(nsIRequest* aRequest) {
481   AssertIsMainThread();
482 
483   // Always reset mChannel if aRequest is different.  Various calls in
484   // StreamFilterParent will use mChannel, but aRequest is *always* the
485   // right channel to use at this point.
486   //
487   // For ALL redirections, we will disconnect this listener.  Extensions
488   // will create a new filter if they need it.
489   if (aRequest != mChannel) {
490     nsCOMPtr<nsIChannel> channel = do_QueryInterface(aRequest);
491     nsCOMPtr<nsILoadInfo> loadInfo = channel ? channel->LoadInfo() : nullptr;
492     mChannel = channel;
493 
494     if (!(loadInfo &&
495           loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) {
496       mDisconnected = true;
497       mDisconnectedByOnStartRequest = true;
498 
499       RefPtr<StreamFilterParent> self(this);
500       RunOnActorThread(FUNC, [=] {
501         if (self->IPCActive()) {
502           self->mState = State::Disconnected;
503           CheckResult(self->SendError("Channel redirected"_ns));
504         }
505       });
506     }
507   }
508 
509   // Check if alterate cached data is being sent, if so we receive un-decoded
510   // data and we must disconnect the filter and send an error to the extension.
511   if (!mDisconnected) {
512     RefPtr<net::HttpBaseChannel> chan = do_QueryObject(aRequest);
513     if (chan && chan->IsDeliveringAltData()) {
514       mDisconnected = true;
515       mDisconnectedByOnStartRequest = true;
516 
517       RefPtr<StreamFilterParent> self(this);
518       RunOnActorThread(FUNC, [=] {
519         if (self->IPCActive()) {
520           self->mState = State::Disconnected;
521           CheckResult(
522               self->SendError("Channel is delivering cached alt-data"_ns));
523         }
524       });
525     }
526   }
527 
528   if (!mDisconnected) {
529     Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup));
530     if (mLoadGroup) {
531       Unused << mLoadGroup->AddRequest(this, nullptr);
532     }
533   }
534 
535   nsresult rv = mOrigListener->OnStartRequest(aRequest);
536 
537   // Important: Do this only *after* running the next listener in the chain, so
538   // that we get the final delivery target after any retargeting that it may do.
539   if (nsCOMPtr<nsIThreadRetargetableRequest> req =
540           do_QueryInterface(aRequest)) {
541     nsCOMPtr<nsIEventTarget> thread;
542     Unused << req->GetDeliveryTarget(getter_AddRefs(thread));
543     if (thread) {
544       mIOThread = std::move(thread);
545     }
546   }
547 
548   // Important: Do this *after* we have set the thread delivery target, or it is
549   // possible in rare circumstances for an extension to attempt to write data
550   // before the thread has been set up, even though there are several layers of
551   // asynchrony involved.
552   if (!mDisconnected) {
553     RefPtr<StreamFilterParent> self(this);
554     RunOnActorThread(FUNC, [=] {
555       if (self->IPCActive()) {
556         self->mState = State::TransferringData;
557         self->CheckResult(self->SendStartRequest());
558       }
559     });
560   }
561 
562   return rv;
563 }
564 
565 NS_IMETHODIMP
OnStopRequest(nsIRequest * aRequest,nsresult aStatusCode)566 StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) {
567   AssertIsMainThread();
568   MOZ_ASSERT(aRequest == mChannel);
569 
570   mReceivedStop = true;
571   if (mDisconnected) {
572     return EmitStopRequest(aStatusCode);
573   }
574 
575   RefPtr<StreamFilterParent> self(this);
576   RunOnActorThread(FUNC, [=] {
577     if (self->IPCActive()) {
578       self->CheckResult(self->SendStopRequest(aStatusCode));
579     } else if (self->mState != State::Disconnecting) {
580       // If we're currently disconnecting, then we'll emit a stop
581       // request at the end of that process. Otherwise we need to
582       // manually emit one here, since we won't be getting a response
583       // from the child.
584       RunOnMainThread(FUNC, [=] {
585         if (!self->mSentStop) {
586           self->EmitStopRequest(aStatusCode);
587         }
588       });
589     }
590   });
591   return NS_OK;
592 }
593 
EmitStopRequest(nsresult aStatusCode)594 nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) {
595   AssertIsMainThread();
596   MOZ_ASSERT(!mSentStop);
597 
598   mSentStop = true;
599   nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode);
600 
601   if (mLoadGroup && !mDisconnected) {
602     Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode);
603   }
604 
605   return rv;
606 }
607 
608 /*****************************************************************************
609  * Incoming data handling
610  *****************************************************************************/
611 
DoSendData(Data && aData)612 void StreamFilterParent::DoSendData(Data&& aData) {
613   AssertIsActorThread();
614 
615   if (mState == State::TransferringData) {
616     CheckResult(SendData(aData));
617   }
618 }
619 
620 NS_IMETHODIMP
OnDataAvailable(nsIRequest * aRequest,nsIInputStream * aInputStream,uint64_t aOffset,uint32_t aCount)621 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
622                                     nsIInputStream* aInputStream,
623                                     uint64_t aOffset, uint32_t aCount) {
624   AssertIsIOThread();
625 
626   if (mDisconnectedByOnStartRequest || mState == State::Disconnected) {
627     // If we're offloading data in a thread pool, it's possible that we'll
628     // have buffered some additional data while waiting for the buffer to
629     // flush. So, if there's any buffered data left, flush that before we
630     // flush this incoming data.
631     //
632     // Note: When in the eDisconnected state, the buffer list is guaranteed
633     // never to be accessed by another thread during an OnDataAvailable call.
634     if (!mBufferedData.isEmpty()) {
635       FlushBufferedData();
636     }
637 
638     mOffset += aCount;
639     return mOrigListener->OnDataAvailable(aRequest, aInputStream,
640                                           mOffset - aCount, aCount);
641   }
642 
643   Data data;
644   data.SetLength(aCount);
645 
646   uint32_t count;
647   nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()),
648                                    aCount, &count);
649   NS_ENSURE_SUCCESS(rv, rv);
650   NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
651 
652   if (mState == State::Disconnecting) {
653     MutexAutoLock al(mBufferMutex);
654     BufferData(std::move(data));
655   } else if (mState == State::Closed) {
656     return NS_ERROR_FAILURE;
657   } else {
658     ActorThread()->Dispatch(
659         NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData", this,
660                                   &StreamFilterParent::DoSendData,
661                                   std::move(data)),
662         NS_DISPATCH_NORMAL);
663   }
664   return NS_OK;
665 }
666 
FlushBufferedData()667 nsresult StreamFilterParent::FlushBufferedData() {
668   AssertIsIOThread();
669 
670   // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
671   // to always run in the same thread, so it's possible for this function to
672   // run in parallel with OnDataAvailable.
673   MutexAutoLock al(mBufferMutex);
674 
675   while (!mBufferedData.isEmpty()) {
676     UniquePtr<BufferedData> data(mBufferedData.popFirst());
677 
678     nsresult rv = Write(data->mData);
679     NS_ENSURE_SUCCESS(rv, rv);
680   }
681 
682   return NS_OK;
683 }
684 
685 /*****************************************************************************
686  * Thread helpers
687  *****************************************************************************/
688 
ActorThread()689 nsIEventTarget* StreamFilterParent::ActorThread() {
690   return net::gSocketTransportService;
691 }
692 
IsActorThread()693 bool StreamFilterParent::IsActorThread() {
694   return ActorThread()->IsOnCurrentThread();
695 }
696 
AssertIsActorThread()697 void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); }
698 
IOThread()699 nsIEventTarget* StreamFilterParent::IOThread() { return mIOThread; }
700 
IsIOThread()701 bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); }
702 
AssertIsIOThread()703 void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); }
704 
705 template <typename Function>
RunOnMainThread(const char * aName,Function && aFunc)706 void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) {
707   mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc)));
708 }
709 
RunOnMainThread(already_AddRefed<Runnable> aRunnable)710 void StreamFilterParent::RunOnMainThread(already_AddRefed<Runnable> aRunnable) {
711   mQueue->RunOrEnqueue(
712       new ChannelEventRunnable(mMainThread, std::move(aRunnable)));
713 }
714 
715 template <typename Function>
RunOnIOThread(const char * aName,Function && aFunc)716 void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) {
717   mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc)));
718 }
719 
RunOnIOThread(already_AddRefed<Runnable> aRunnable)720 void StreamFilterParent::RunOnIOThread(already_AddRefed<Runnable> aRunnable) {
721   mQueue->RunOrEnqueue(
722       new ChannelEventRunnable(mIOThread, std::move(aRunnable)));
723 }
724 
725 template <typename Function>
RunOnActorThread(const char * aName,Function && aFunc)726 void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) {
727   // We don't use mQueue for dispatch to the actor thread.
728   //
729   // The main thread and IO thread are used for dispatching events to the
730   // wrapped stream listener, and those events need to be processed
731   // consistently, in the order they were dispatched. An event dispatched to the
732   // main thread can't be run before events that were dispatched to the IO
733   // thread before it.
734   //
735   // Additionally, the IO thread is likely to be a thread pool, which means that
736   // without thread-safe queuing, it's possible for multiple events dispatched
737   // to it to be processed in parallel, or out of order.
738   //
739   // The actor thread, however, is always a serial event target. Its events are
740   // always processed in order, and events dispatched to the actor thread are
741   // independent of the events in the output event queue.
742   if (IsActorThread()) {
743     aFunc();
744   } else {
745     ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)),
746                             NS_DISPATCH_NORMAL);
747   }
748 }
749 
750 /*****************************************************************************
751  * Glue
752  *****************************************************************************/
753 
ActorDestroy(ActorDestroyReason aWhy)754 void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) {
755   AssertIsActorThread();
756 
757   if (mState != State::Disconnected && mState != State::Closed) {
758     Broken();
759   }
760 }
761 
ActorDealloc()762 void StreamFilterParent::ActorDealloc() {
763   RefPtr<StreamFilterParent> self = dont_AddRef(this);
764 }
765 
766 NS_INTERFACE_MAP_BEGIN(StreamFilterParent)
767   NS_INTERFACE_MAP_ENTRY(nsIStreamListener)
768   NS_INTERFACE_MAP_ENTRY(nsIRequestObserver)
769   NS_INTERFACE_MAP_ENTRY(nsIRequest)
770   NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener)
771   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener)
772 NS_INTERFACE_MAP_END
773 
774 NS_IMPL_ADDREF(StreamFilterParent)
775 NS_IMPL_RELEASE(StreamFilterParent)
776 
777 }  // namespace extensions
778 }  // namespace mozilla
779