1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "StreamFilterParent.h"
8 
9 #include "mozilla/ScopeExit.h"
10 #include "mozilla/Unused.h"
11 #include "mozilla/dom/ContentParent.h"
12 #include "mozilla/net/ChannelEventQueue.h"
13 #include "nsHttpChannel.h"
14 #include "nsIChannel.h"
15 #include "nsIInputStream.h"
16 #include "nsITraceableChannel.h"
17 #include "nsProxyRelease.h"
18 #include "nsQueryObject.h"
19 #include "nsSocketTransportService2.h"
20 #include "nsStringStream.h"
21 #include "mozilla/net/DocumentChannelChild.h"
22 #include "nsIViewSourceChannel.h"
23 
24 namespace mozilla {
25 namespace extensions {
26 
27 /*****************************************************************************
28  * Event queueing helpers
29  *****************************************************************************/
30 
31 using net::ChannelEvent;
32 using net::ChannelEventQueue;
33 
34 namespace {
35 
36 // Define some simple ChannelEvent sub-classes that store the appropriate
37 // EventTarget and delegate their Run methods to a wrapped Runnable or lambda
38 // function.
39 
40 class ChannelEventWrapper : public ChannelEvent {
41  public:
ChannelEventWrapper(nsIEventTarget * aTarget)42   ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {}
43 
GetEventTarget()44   already_AddRefed<nsIEventTarget> GetEventTarget() override {
45     return do_AddRef(mTarget);
46   }
47 
48  protected:
49   ~ChannelEventWrapper() override = default;
50 
51  private:
52   nsCOMPtr<nsIEventTarget> mTarget;
53 };
54 
55 class ChannelEventFunction final : public ChannelEventWrapper {
56  public:
ChannelEventFunction(nsIEventTarget * aTarget,std::function<void ()> && aFunc)57   ChannelEventFunction(nsIEventTarget* aTarget, std::function<void()>&& aFunc)
58       : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {}
59 
Run()60   void Run() override { mFunc(); }
61 
62  protected:
63   ~ChannelEventFunction() override = default;
64 
65  private:
66   std::function<void()> mFunc;
67 };
68 
69 class ChannelEventRunnable final : public ChannelEventWrapper {
70  public:
ChannelEventRunnable(nsIEventTarget * aTarget,already_AddRefed<Runnable> aRunnable)71   ChannelEventRunnable(nsIEventTarget* aTarget,
72                        already_AddRefed<Runnable> aRunnable)
73       : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {}
74 
Run()75   void Run() override {
76     nsresult rv = mRunnable->Run();
77     Unused << NS_WARN_IF(NS_FAILED(rv));
78   }
79 
80  protected:
81   ~ChannelEventRunnable() override = default;
82 
83  private:
84   RefPtr<Runnable> mRunnable;
85 };
86 
87 }  // anonymous namespace
88 
89 /*****************************************************************************
90  * Initialization
91  *****************************************************************************/
92 
StreamFilterParent()93 StreamFilterParent::StreamFilterParent()
94     : mMainThread(GetCurrentThreadEventTarget()),
95       mIOThread(mMainThread),
96       mQueue(new ChannelEventQueue(static_cast<nsIStreamListener*>(this))),
97       mBufferMutex("StreamFilter buffer mutex"),
98       mReceivedStop(false),
99       mSentStop(false),
100       mContext(nullptr),
101       mOffset(0),
102       mState(State::Uninitialized) {}
103 
~StreamFilterParent()104 StreamFilterParent::~StreamFilterParent() {
105   NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget());
106   NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget());
107   NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener",
108                          mOrigListener.forget());
109   NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget());
110 }
111 
Create(dom::ContentParent * aContentParent,uint64_t aChannelId,const nsAString & aAddonId)112 auto StreamFilterParent::Create(dom::ContentParent* aContentParent,
113                                 uint64_t aChannelId, const nsAString& aAddonId)
114     -> RefPtr<ChildEndpointPromise> {
115   AssertIsMainThread();
116 
117   auto& webreq = WebRequestService::GetSingleton();
118 
119   RefPtr<nsAtom> addonId = NS_Atomize(aAddonId);
120   nsCOMPtr<nsITraceableChannel> channel =
121       webreq.GetTraceableChannel(aChannelId, addonId, aContentParent);
122 
123   RefPtr<mozilla::net::nsHttpChannel> chan = do_QueryObject(channel);
124   if (!chan) {
125     return ChildEndpointPromise::CreateAndReject(false, __func__);
126   }
127 
128   // Disable alt-data for extension stream listeners.
129   nsCOMPtr<nsIHttpChannelInternal> internal(do_QueryObject(channel));
130   internal->DisableAltDataCache();
131 
132   return chan->AttachStreamFilter(aContentParent ? aContentParent->OtherPid()
133                                                  : base::GetCurrentProcId());
134 }
135 
136 /* static */
Attach(nsIChannel * aChannel,ParentEndpoint && aEndpoint)137 void StreamFilterParent::Attach(nsIChannel* aChannel,
138                                 ParentEndpoint&& aEndpoint) {
139   auto self = MakeRefPtr<StreamFilterParent>();
140 
141   self->ActorThread()->Dispatch(
142       NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind", self,
143                                           &StreamFilterParent::Bind,
144                                           std::move(aEndpoint)),
145       NS_DISPATCH_NORMAL);
146 
147   self->Init(aChannel);
148 
149   // IPC owns this reference now.
150   Unused << self.forget();
151 }
152 
Bind(ParentEndpoint && aEndpoint)153 void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) {
154   aEndpoint.Bind(this);
155 }
156 
Init(nsIChannel * aChannel)157 void StreamFilterParent::Init(nsIChannel* aChannel) {
158   mChannel = aChannel;
159 
160   nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
161   if (MOZ_UNLIKELY(!traceable)) {
162     // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it.
163     nsCOMPtr<nsIViewSourceChannel> vsc = do_QueryInterface(aChannel);
164     if (vsc) {
165       traceable = do_QueryObject(vsc->GetInnerChannel());
166       // OnStartRequest etc. is passed the unwrapped channel, so update mChannel
167       // to prevent OnStartRequest from mistaking it for a redirect, which would
168       // close the filter.
169       mChannel = do_QueryObject(traceable);
170     }
171     // TODO bug 1683403: Replace assertion; Close StreamFilter instead.
172     MOZ_RELEASE_ASSERT(traceable);
173   }
174 
175   nsresult rv = traceable->SetNewListener(this, getter_AddRefs(mOrigListener));
176   MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
177 }
178 
179 /*****************************************************************************
180  * nsIThreadRetargetableStreamListener
181  *****************************************************************************/
182 
183 NS_IMETHODIMP
CheckListenerChain()184 StreamFilterParent::CheckListenerChain() {
185   AssertIsMainThread();
186 
187   nsCOMPtr<nsIThreadRetargetableStreamListener> trsl =
188       do_QueryInterface(mOrigListener);
189   if (trsl) {
190     return trsl->CheckListenerChain();
191   }
192   return NS_ERROR_FAILURE;
193 }
194 
195 /*****************************************************************************
196  * Error handling
197  *****************************************************************************/
198 
Broken()199 void StreamFilterParent::Broken() {
200   AssertIsActorThread();
201 
202   switch (mState) {
203     case State::Initialized:
204     case State::TransferringData:
205     case State::Suspended: {
206       mState = State::Disconnecting;
207       RefPtr<StreamFilterParent> self(this);
208       RunOnMainThread(FUNC, [=] {
209         if (self->mChannel) {
210           self->mChannel->Cancel(NS_ERROR_FAILURE);
211         }
212       });
213 
214       FinishDisconnect();
215     } break;
216 
217     default:
218       break;
219   }
220 }
221 
222 /*****************************************************************************
223  * State change requests
224  *****************************************************************************/
225 
RecvClose()226 IPCResult StreamFilterParent::RecvClose() {
227   AssertIsActorThread();
228 
229   mState = State::Closed;
230 
231   if (!mSentStop) {
232     RefPtr<StreamFilterParent> self(this);
233     RunOnMainThread(FUNC, [=] {
234       nsresult rv = self->EmitStopRequest(NS_OK);
235       Unused << NS_WARN_IF(NS_FAILED(rv));
236     });
237   }
238 
239   Unused << SendClosed();
240   Destroy();
241   return IPC_OK();
242 }
243 
Destroy()244 void StreamFilterParent::Destroy() {
245   // Close the channel asynchronously so the actor is never destroyed before
246   // this message is fully processed.
247   ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this,
248                                             &StreamFilterParent::Close),
249                           NS_DISPATCH_NORMAL);
250 }
251 
RecvDestroy()252 IPCResult StreamFilterParent::RecvDestroy() {
253   AssertIsActorThread();
254   Destroy();
255   return IPC_OK();
256 }
257 
RecvSuspend()258 IPCResult StreamFilterParent::RecvSuspend() {
259   AssertIsActorThread();
260 
261   if (mState == State::TransferringData) {
262     RefPtr<StreamFilterParent> self(this);
263     RunOnMainThread(FUNC, [=] {
264       self->mChannel->Suspend();
265 
266       RunOnActorThread(FUNC, [=] {
267         if (self->IPCActive()) {
268           self->mState = State::Suspended;
269           self->CheckResult(self->SendSuspended());
270         }
271       });
272     });
273   }
274   return IPC_OK();
275 }
276 
RecvResume()277 IPCResult StreamFilterParent::RecvResume() {
278   AssertIsActorThread();
279 
280   if (mState == State::Suspended) {
281     // Change state before resuming so incoming data is handled correctly
282     // immediately after resuming.
283     mState = State::TransferringData;
284 
285     RefPtr<StreamFilterParent> self(this);
286     RunOnMainThread(FUNC, [=] {
287       self->mChannel->Resume();
288 
289       RunOnActorThread(FUNC, [=] {
290         if (self->IPCActive()) {
291           self->CheckResult(self->SendResumed());
292         }
293       });
294     });
295   }
296   return IPC_OK();
297 }
RecvDisconnect()298 IPCResult StreamFilterParent::RecvDisconnect() {
299   AssertIsActorThread();
300 
301   if (mState == State::Suspended) {
302     RefPtr<StreamFilterParent> self(this);
303     RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); });
304   } else if (mState != State::TransferringData) {
305     return IPC_OK();
306   }
307 
308   mState = State::Disconnecting;
309   CheckResult(SendFlushData());
310   return IPC_OK();
311 }
312 
RecvFlushedData()313 IPCResult StreamFilterParent::RecvFlushedData() {
314   AssertIsActorThread();
315 
316   MOZ_ASSERT(mState == State::Disconnecting);
317 
318   Destroy();
319 
320   FinishDisconnect();
321   return IPC_OK();
322 }
323 
FinishDisconnect()324 void StreamFilterParent::FinishDisconnect() {
325   RefPtr<StreamFilterParent> self(this);
326   RunOnIOThread(FUNC, [=] {
327     self->FlushBufferedData();
328 
329     RunOnMainThread(FUNC, [=] {
330       if (self->mLoadGroup && !self->mDisconnected) {
331         Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK);
332       }
333       self->mDisconnected = true;
334     });
335 
336     RunOnActorThread(FUNC, [=] {
337       if (self->mState != State::Closed) {
338         self->mState = State::Disconnected;
339       }
340     });
341   });
342 }
343 
344 /*****************************************************************************
345  * Data output
346  *****************************************************************************/
347 
RecvWrite(Data && aData)348 IPCResult StreamFilterParent::RecvWrite(Data&& aData) {
349   AssertIsActorThread();
350 
351   RunOnIOThread(NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove", this,
352                                           &StreamFilterParent::WriteMove,
353                                           std::move(aData)));
354   return IPC_OK();
355 }
356 
WriteMove(Data && aData)357 void StreamFilterParent::WriteMove(Data&& aData) {
358   nsresult rv = Write(aData);
359   Unused << NS_WARN_IF(NS_FAILED(rv));
360 }
361 
Write(Data & aData)362 nsresult StreamFilterParent::Write(Data& aData) {
363   AssertIsIOThread();
364 
365   nsCOMPtr<nsIInputStream> stream;
366   nsresult rv = NS_NewByteInputStream(
367       getter_AddRefs(stream),
368       MakeSpan(reinterpret_cast<char*>(aData.Elements()), aData.Length()),
369       NS_ASSIGNMENT_DEPEND);
370   NS_ENSURE_SUCCESS(rv, rv);
371 
372   rv =
373       mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length());
374   NS_ENSURE_SUCCESS(rv, rv);
375 
376   mOffset += aData.Length();
377   return NS_OK;
378 }
379 
380 /*****************************************************************************
381  * nsIRequest
382  *****************************************************************************/
383 
384 NS_IMETHODIMP
GetName(nsACString & aName)385 StreamFilterParent::GetName(nsACString& aName) {
386   AssertIsMainThread();
387   MOZ_ASSERT(mChannel);
388   return mChannel->GetName(aName);
389 }
390 
391 NS_IMETHODIMP
GetStatus(nsresult * aStatus)392 StreamFilterParent::GetStatus(nsresult* aStatus) {
393   AssertIsMainThread();
394   MOZ_ASSERT(mChannel);
395   return mChannel->GetStatus(aStatus);
396 }
397 
398 NS_IMETHODIMP
IsPending(bool * aIsPending)399 StreamFilterParent::IsPending(bool* aIsPending) {
400   switch (mState) {
401     case State::Initialized:
402     case State::TransferringData:
403     case State::Suspended:
404       *aIsPending = true;
405       break;
406     default:
407       *aIsPending = false;
408   }
409   return NS_OK;
410 }
411 
412 NS_IMETHODIMP
Cancel(nsresult aResult)413 StreamFilterParent::Cancel(nsresult aResult) {
414   AssertIsMainThread();
415   MOZ_ASSERT(mChannel);
416   return mChannel->Cancel(aResult);
417 }
418 
419 NS_IMETHODIMP
Suspend()420 StreamFilterParent::Suspend() {
421   AssertIsMainThread();
422   MOZ_ASSERT(mChannel);
423   return mChannel->Suspend();
424 }
425 
426 NS_IMETHODIMP
Resume()427 StreamFilterParent::Resume() {
428   AssertIsMainThread();
429   MOZ_ASSERT(mChannel);
430   return mChannel->Resume();
431 }
432 
433 NS_IMETHODIMP
GetLoadGroup(nsILoadGroup ** aLoadGroup)434 StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) {
435   *aLoadGroup = mLoadGroup;
436   return NS_OK;
437 }
438 
439 NS_IMETHODIMP
SetLoadGroup(nsILoadGroup * aLoadGroup)440 StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) {
441   return NS_ERROR_NOT_IMPLEMENTED;
442 }
443 
444 NS_IMETHODIMP
GetLoadFlags(nsLoadFlags * aLoadFlags)445 StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) {
446   AssertIsMainThread();
447   MOZ_ASSERT(mChannel);
448   MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags));
449   *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI;
450   return NS_OK;
451 }
452 
453 NS_IMETHODIMP
SetLoadFlags(nsLoadFlags aLoadFlags)454 StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) {
455   AssertIsMainThread();
456   MOZ_ASSERT(mChannel);
457   return mChannel->SetLoadFlags(aLoadFlags);
458 }
459 
460 NS_IMETHODIMP
GetTRRMode(nsIRequest::TRRMode * aTRRMode)461 StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
462   return GetTRRModeImpl(aTRRMode);
463 }
464 
465 NS_IMETHODIMP
SetTRRMode(nsIRequest::TRRMode aTRRMode)466 StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
467   return SetTRRModeImpl(aTRRMode);
468 }
469 
470 /*****************************************************************************
471  * nsIStreamListener
472  *****************************************************************************/
473 
474 NS_IMETHODIMP
OnStartRequest(nsIRequest * aRequest)475 StreamFilterParent::OnStartRequest(nsIRequest* aRequest) {
476   AssertIsMainThread();
477 
478   // Always reset mChannel if aRequest is different.  Various calls in
479   // StreamFilterParent will use mChannel, but aRequest is *always* the
480   // right channel to use at this point.
481   //
482   // For ALL redirections, we will disconnect this listener.  Extensions
483   // will create a new filter if they need it.
484   if (aRequest != mChannel) {
485     nsCOMPtr<nsIChannel> channel = do_QueryInterface(aRequest);
486     nsCOMPtr<nsILoadInfo> loadInfo = channel ? channel->LoadInfo() : nullptr;
487     mChannel = channel;
488 
489     if (!(loadInfo &&
490           loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) {
491       mDisconnected = true;
492 
493       RefPtr<StreamFilterParent> self(this);
494       RunOnActorThread(FUNC, [=] {
495         if (self->IPCActive()) {
496           self->mState = State::Disconnected;
497           CheckResult(
498               self->SendError(NS_LITERAL_CSTRING("Channel redirected")));
499         }
500       });
501     }
502   }
503 
504   // Check if alterate cached data is being sent, if so we receive un-decoded
505   // data and we must disconnect the filter and send an error to the extension.
506   if (!mDisconnected) {
507     RefPtr<net::HttpBaseChannel> chan = do_QueryObject(aRequest);
508     if (chan && chan->IsDeliveringAltData()) {
509       mDisconnected = true;
510 
511       RefPtr<StreamFilterParent> self(this);
512       RunOnActorThread(FUNC, [=] {
513         if (self->IPCActive()) {
514           self->mState = State::Disconnected;
515           CheckResult(self->SendError(
516               NS_LITERAL_CSTRING("Channel is delivering cached alt-data")));
517         }
518       });
519     }
520   }
521 
522   if (!mDisconnected) {
523     Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup));
524     if (mLoadGroup) {
525       Unused << mLoadGroup->AddRequest(this, nullptr);
526     }
527   }
528 
529   nsresult rv = mOrigListener->OnStartRequest(aRequest);
530 
531   // Important: Do this only *after* running the next listener in the chain, so
532   // that we get the final delivery target after any retargeting that it may do.
533   if (nsCOMPtr<nsIThreadRetargetableRequest> req =
534           do_QueryInterface(aRequest)) {
535     nsCOMPtr<nsIEventTarget> thread;
536     Unused << req->GetDeliveryTarget(getter_AddRefs(thread));
537     if (thread) {
538       mIOThread = std::move(thread);
539     }
540   }
541 
542   // Important: Do this *after* we have set the thread delivery target, or it is
543   // possible in rare circumstances for an extension to attempt to write data
544   // before the thread has been set up, even though there are several layers of
545   // asynchrony involved.
546   if (!mDisconnected) {
547     RefPtr<StreamFilterParent> self(this);
548     RunOnActorThread(FUNC, [=] {
549       if (self->IPCActive()) {
550         self->mState = State::TransferringData;
551         self->CheckResult(self->SendStartRequest());
552       }
553     });
554   }
555 
556   return rv;
557 }
558 
559 NS_IMETHODIMP
OnStopRequest(nsIRequest * aRequest,nsresult aStatusCode)560 StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) {
561   AssertIsMainThread();
562   MOZ_ASSERT(aRequest == mChannel);
563 
564   mReceivedStop = true;
565   if (mDisconnected) {
566     return EmitStopRequest(aStatusCode);
567   }
568 
569   RefPtr<StreamFilterParent> self(this);
570   RunOnActorThread(FUNC, [=] {
571     if (self->IPCActive()) {
572       self->CheckResult(self->SendStopRequest(aStatusCode));
573     } else if (self->mState != State::Disconnecting) {
574       // If we're currently disconnecting, then we'll emit a stop
575       // request at the end of that process. Otherwise we need to
576       // manually emit one here, since we won't be getting a response
577       // from the child.
578       RunOnMainThread(FUNC, [=] {
579         if (!self->mSentStop) {
580           self->EmitStopRequest(aStatusCode);
581         }
582       });
583     }
584   });
585   return NS_OK;
586 }
587 
EmitStopRequest(nsresult aStatusCode)588 nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) {
589   AssertIsMainThread();
590   MOZ_ASSERT(!mSentStop);
591 
592   mSentStop = true;
593   nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode);
594 
595   if (mLoadGroup && !mDisconnected) {
596     Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode);
597   }
598 
599   return rv;
600 }
601 
602 /*****************************************************************************
603  * Incoming data handling
604  *****************************************************************************/
605 
DoSendData(Data && aData)606 void StreamFilterParent::DoSendData(Data&& aData) {
607   AssertIsActorThread();
608 
609   if (mState == State::TransferringData) {
610     CheckResult(SendData(aData));
611   }
612 }
613 
614 NS_IMETHODIMP
OnDataAvailable(nsIRequest * aRequest,nsIInputStream * aInputStream,uint64_t aOffset,uint32_t aCount)615 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
616                                     nsIInputStream* aInputStream,
617                                     uint64_t aOffset, uint32_t aCount) {
618   AssertIsIOThread();
619 
620   if (mDisconnected) {
621     // If we're offloading data in a thread pool, it's possible that we'll
622     // have buffered some additional data while waiting for the buffer to
623     // flush. So, if there's any buffered data left, flush that before we
624     // flush this incoming data.
625     //
626     // Note: When in the eDisconnected state, the buffer list is guaranteed
627     // never to be accessed by another thread during an OnDataAvailable call.
628     if (!mBufferedData.isEmpty()) {
629       FlushBufferedData();
630     }
631 
632     mOffset += aCount;
633     return mOrigListener->OnDataAvailable(aRequest, aInputStream,
634                                           mOffset - aCount, aCount);
635   }
636 
637   Data data;
638   data.SetLength(aCount);
639 
640   uint32_t count;
641   nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()),
642                                    aCount, &count);
643   NS_ENSURE_SUCCESS(rv, rv);
644   NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
645 
646   if (mState == State::Disconnecting) {
647     MutexAutoLock al(mBufferMutex);
648     BufferData(std::move(data));
649   } else if (mState == State::Closed) {
650     return NS_ERROR_FAILURE;
651   } else {
652     ActorThread()->Dispatch(
653         NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData", this,
654                                   &StreamFilterParent::DoSendData,
655                                   std::move(data)),
656         NS_DISPATCH_NORMAL);
657   }
658   return NS_OK;
659 }
660 
FlushBufferedData()661 nsresult StreamFilterParent::FlushBufferedData() {
662   AssertIsIOThread();
663 
664   // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
665   // to always run in the same thread, so it's possible for this function to
666   // run in parallel with OnDataAvailable.
667   MutexAutoLock al(mBufferMutex);
668 
669   while (!mBufferedData.isEmpty()) {
670     UniquePtr<BufferedData> data(mBufferedData.popFirst());
671 
672     nsresult rv = Write(data->mData);
673     NS_ENSURE_SUCCESS(rv, rv);
674   }
675 
676   if (mReceivedStop && !mSentStop) {
677     RefPtr<StreamFilterParent> self(this);
678     RunOnMainThread(FUNC, [=] {
679       if (!mSentStop) {
680         nsresult rv = self->EmitStopRequest(NS_OK);
681         Unused << NS_WARN_IF(NS_FAILED(rv));
682       }
683     });
684   }
685 
686   return NS_OK;
687 }
688 
689 /*****************************************************************************
690  * Thread helpers
691  *****************************************************************************/
692 
ActorThread()693 nsIEventTarget* StreamFilterParent::ActorThread() {
694   return net::gSocketTransportService;
695 }
696 
IsActorThread()697 bool StreamFilterParent::IsActorThread() {
698   return ActorThread()->IsOnCurrentThread();
699 }
700 
AssertIsActorThread()701 void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); }
702 
IOThread()703 nsIEventTarget* StreamFilterParent::IOThread() { return mIOThread; }
704 
IsIOThread()705 bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); }
706 
AssertIsIOThread()707 void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); }
708 
709 template <typename Function>
RunOnMainThread(const char * aName,Function && aFunc)710 void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) {
711   mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc)));
712 }
713 
RunOnMainThread(already_AddRefed<Runnable> aRunnable)714 void StreamFilterParent::RunOnMainThread(already_AddRefed<Runnable> aRunnable) {
715   mQueue->RunOrEnqueue(
716       new ChannelEventRunnable(mMainThread, std::move(aRunnable)));
717 }
718 
719 template <typename Function>
RunOnIOThread(const char * aName,Function && aFunc)720 void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) {
721   mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc)));
722 }
723 
RunOnIOThread(already_AddRefed<Runnable> aRunnable)724 void StreamFilterParent::RunOnIOThread(already_AddRefed<Runnable> aRunnable) {
725   mQueue->RunOrEnqueue(
726       new ChannelEventRunnable(mIOThread, std::move(aRunnable)));
727 }
728 
729 template <typename Function>
RunOnActorThread(const char * aName,Function && aFunc)730 void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) {
731   // We don't use mQueue for dispatch to the actor thread.
732   //
733   // The main thread and IO thread are used for dispatching events to the
734   // wrapped stream listener, and those events need to be processed
735   // consistently, in the order they were dispatched. An event dispatched to the
736   // main thread can't be run before events that were dispatched to the IO
737   // thread before it.
738   //
739   // Additionally, the IO thread is likely to be a thread pool, which means that
740   // without thread-safe queuing, it's possible for multiple events dispatched
741   // to it to be processed in parallel, or out of order.
742   //
743   // The actor thread, however, is always a serial event target. Its events are
744   // always processed in order, and events dispatched to the actor thread are
745   // independent of the events in the output event queue.
746   if (IsActorThread()) {
747     aFunc();
748   } else {
749     ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)),
750                             NS_DISPATCH_NORMAL);
751   }
752 }
753 
754 /*****************************************************************************
755  * Glue
756  *****************************************************************************/
757 
ActorDestroy(ActorDestroyReason aWhy)758 void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) {
759   AssertIsActorThread();
760 
761   if (mState != State::Disconnected && mState != State::Closed) {
762     Broken();
763   }
764 }
765 
ActorDealloc()766 void StreamFilterParent::ActorDealloc() {
767   RefPtr<StreamFilterParent> self = dont_AddRef(this);
768 }
769 
770 NS_INTERFACE_MAP_BEGIN(StreamFilterParent)
771   NS_INTERFACE_MAP_ENTRY(nsIStreamListener)
772   NS_INTERFACE_MAP_ENTRY(nsIRequestObserver)
773   NS_INTERFACE_MAP_ENTRY(nsIRequest)
774   NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener)
775   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener)
776 NS_INTERFACE_MAP_END
777 
778 NS_IMPL_ADDREF(StreamFilterParent)
779 NS_IMPL_RELEASE(StreamFilterParent)
780 
781 }  // namespace extensions
782 }  // namespace mozilla
783