1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
4  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 // Original author: ekr@rtfm.com
7 
8 #include "MediaPipeline.h"
9 
10 #include <inttypes.h>
11 #include <math.h>
12 
13 #include "AudioSegment.h"
14 #include "AudioConverter.h"
15 #include "DOMMediaStream.h"
16 #include "ImageContainer.h"
17 #include "ImageTypes.h"
18 #include "Layers.h"
19 #include "LayersLogging.h"
20 #include "MediaEngine.h"
21 #include "MediaSegment.h"
22 #include "MediaTrackGraphImpl.h"
23 #include "MediaTrackListener.h"
24 #include "MediaStreamTrack.h"
25 #include "RemoteTrackSource.h"
26 #include "RtpLogger.h"
27 #include "VideoFrameConverter.h"
28 #include "VideoSegment.h"
29 #include "VideoStreamTrack.h"
30 #include "VideoUtils.h"
31 #include "mozilla/Logging.h"
32 #include "mozilla/NullPrincipal.h"
33 #include "mozilla/PeerIdentity.h"
34 #include "mozilla/Preferences.h"
35 #include "mozilla/SharedThreadPool.h"
36 #include "mozilla/Sprintf.h"
37 #include "mozilla/StaticPrefs_media.h"
38 #include "mozilla/TaskQueue.h"
39 #include "mozilla/UniquePtr.h"
40 #include "mozilla/UniquePtrExtensions.h"
41 #include "mozilla/dom/RTCStatsReportBinding.h"
42 #include "mozilla/gfx/Point.h"
43 #include "mozilla/gfx/Types.h"
44 #include "nsError.h"
45 #include "nsThreadUtils.h"
46 #include "runnable_utils.h"
47 #include "signaling/src/peerconnection/MediaTransportHandler.h"
48 #include "Tracing.h"
49 #include "WebrtcImageBuffer.h"
50 #include "webrtc/common_video/include/video_frame_buffer.h"
51 #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
52 
53 // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
54 // 48KHz)
55 #define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
56 static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
57                   AUDIO_SAMPLE_BUFFER_MAX_BYTES,
58               "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
59 
60 using namespace mozilla;
61 using namespace mozilla::dom;
62 using namespace mozilla::gfx;
63 using namespace mozilla::layers;
64 
65 mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
66 
67 namespace mozilla {
68 
69 // An async inserter for audio data, to avoid running audio codec encoders
70 // on the MTG/input audio thread.  Basically just bounces all the audio
71 // data to a single audio processing/input queue.  We could if we wanted to
72 // use multiple threads and a TaskQueue.
73 class AudioProxyThread {
74  public:
75   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
76 
AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)77   explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
78       : mConduit(std::move(aConduit)),
79         mTaskQueue(new TaskQueue(
80             GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER), "AudioProxy")),
81         mAudioConverter(nullptr) {
82     MOZ_ASSERT(mConduit);
83     MOZ_COUNT_CTOR(AudioProxyThread);
84   }
85 
86   // This function is the identity if aInputRate is supported.
87   // Else, it returns a rate that is supported, that ensure no loss in audio
88   // quality: the sampling rate returned is always greater to the inputed
89   // sampling-rate, if they differ..
AppropriateSendingRateForInputRate(uint32_t aInputRate)90   uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
91     AudioSessionConduit* conduit =
92         static_cast<AudioSessionConduit*>(mConduit.get());
93     if (conduit->IsSamplingFreqSupported(aInputRate)) {
94       return aInputRate;
95     }
96     if (aInputRate < 16000) {
97       return 16000;
98     }
99     if (aInputRate < 32000) {
100       return 32000;
101     }
102     if (aInputRate < 44100) {
103       return 44100;
104     }
105     return 48000;
106   }
107 
108   // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
109   // something the conduit can work with (or send silence if the track is not
110   // enabled), and send the audio in 10ms chunks to the conduit.
InternalProcessAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)111   void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
112                                  bool aEnabled) {
113     MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
114 
115     // Convert to interleaved 16-bits integer audio, with a maximum of two
116     // channels (since the WebRTC.org code below makes the assumption that the
117     // input audio is either mono or stereo), with a sample-rate rate that is
118     // 16, 32, 44.1, or 48kHz.
119     uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
120     int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
121 
122     // We take advantage of the fact that the common case (microphone directly
123     // to PeerConnection, that is, a normal call), the samples are already
124     // 16-bits mono, so the representation in interleaved and planar is the
125     // same, and we can just use that.
126     if (aEnabled && outputChannels == 1 &&
127         aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
128       const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
129       PacketizeAndSend(samples, transmissionRate, outputChannels,
130                        aChunk.mDuration);
131       return;
132     }
133 
134     uint32_t sampleCount = aChunk.mDuration * outputChannels;
135     if (mInterleavedAudio.Length() < sampleCount) {
136       mInterleavedAudio.SetLength(sampleCount);
137     }
138 
139     if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
140       PodZero(mInterleavedAudio.Elements(), sampleCount);
141     } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
142       DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
143                            aChunk.mVolume, outputChannels,
144                            mInterleavedAudio.Elements());
145     } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
146       DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
147                            aChunk.mVolume, outputChannels,
148                            mInterleavedAudio.Elements());
149     }
150     int16_t* inputAudio = mInterleavedAudio.Elements();
151     size_t inputAudioFrameCount = aChunk.mDuration;
152 
153     AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
154                             AudioConfig::FORMAT_S16);
155     AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
156                              transmissionRate, AudioConfig::FORMAT_S16);
157     // Resample to an acceptable sample-rate for the sending side
158     if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
159         mAudioConverter->OutputConfig() != outputConfig) {
160       mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
161     }
162 
163     int16_t* processedAudio = nullptr;
164     size_t framesProcessed =
165         mAudioConverter->Process(inputAudio, inputAudioFrameCount);
166 
167     if (framesProcessed == 0) {
168       // In place conversion not possible, use a buffer.
169       framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
170                                                  inputAudioFrameCount);
171       processedAudio = mOutputAudio.Data();
172     } else {
173       processedAudio = inputAudio;
174     }
175 
176     PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
177                      framesProcessed);
178   }
179 
180   // This packetizes aAudioData in 10ms chunks and sends it.
181   // aAudioData is interleaved audio data at a rate and with a channel count
182   // that is appropriate to send with the conduit.
PacketizeAndSend(const int16_t * aAudioData,uint32_t aRate,uint32_t aChannels,uint32_t aFrameCount)183   void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
184                         uint32_t aChannels, uint32_t aFrameCount) {
185     MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
186     MOZ_ASSERT(aChannels == 1 || aChannels == 2);
187     MOZ_ASSERT(aAudioData);
188 
189     uint32_t audio_10ms = aRate / 100;
190 
191     if (!mPacketizer || mPacketizer->PacketSize() != audio_10ms ||
192         mPacketizer->Channels() != aChannels) {
193       // It's the right thing to drop the bit of audio still in the packetizer:
194       // we don't want to send to the conduit audio that has two different
195       // rates while telling it that it has a constante rate.
196       mPacketizer =
197           MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
198       mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
199     }
200 
201     mPacketizer->Input(aAudioData, aFrameCount);
202 
203     while (mPacketizer->PacketsAvailable()) {
204       mPacketizer->Output(mPacket.get());
205       mConduit->SendAudioFrame(mPacket.get(), mPacketizer->PacketSize(), aRate,
206                                mPacketizer->Channels(), 0);
207     }
208   }
209 
QueueAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)210   void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
211                        bool aEnabled) {
212     RefPtr<AudioProxyThread> self = this;
213     nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
214         "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
215           self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
216         }));
217     MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
218     Unused << rv;
219   }
220 
221  protected:
~AudioProxyThread()222   virtual ~AudioProxyThread() {
223     // Conduits must be released on MainThread, and we might have the last
224     // reference We don't need to worry about runnables still trying to access
225     // the conduit, since the runnables hold a ref to AudioProxyThread.
226     NS_ReleaseOnMainThread("AudioProxyThread::mConduit", mConduit.forget());
227     MOZ_COUNT_DTOR(AudioProxyThread);
228   }
229 
230   RefPtr<AudioSessionConduit> mConduit;
231   const RefPtr<TaskQueue> mTaskQueue;
232   // Only accessed on mTaskQueue
233   UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
234   // A buffer to hold a single packet of audio.
235   UniquePtr<int16_t[]> mPacket;
236   nsTArray<int16_t> mInterleavedAudio;
237   AlignedShortBuffer mOutputAudio;
238   UniquePtr<AudioConverter> mAudioConverter;
239 };
240 
MediaPipeline(const std::string & aPc,RefPtr<MediaTransportHandler> aTransportHandler,DirectionType aDirection,RefPtr<nsISerialEventTarget> aMainThread,RefPtr<nsISerialEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)241 MediaPipeline::MediaPipeline(const std::string& aPc,
242                              RefPtr<MediaTransportHandler> aTransportHandler,
243                              DirectionType aDirection,
244                              RefPtr<nsISerialEventTarget> aMainThread,
245                              RefPtr<nsISerialEventTarget> aStsThread,
246                              RefPtr<MediaSessionConduit> aConduit)
247     : mDirection(aDirection),
248       mLevel(0),
249       mTransportHandler(std::move(aTransportHandler)),
250       mConduit(std::move(aConduit)),
251       mMainThread(std::move(aMainThread)),
252       mStsThread(aStsThread),
253       mTransport(new PipelineTransport(std::move(aStsThread))),
254       mRtpPacketsSent(0),
255       mRtcpPacketsSent(0),
256       mRtpPacketsReceived(0),
257       mRtcpPacketsReceived(0),
258       mRtpBytesSent(0),
259       mRtpBytesReceived(0),
260       mPc(aPc),
261       mFilter(),
262       mRtpParser(webrtc::RtpHeaderParser::Create()),
263       mPacketDumper(new PacketDumper(mPc)) {
264   if (mDirection == DirectionType::RECEIVE) {
265     mConduit->SetReceiverTransport(mTransport);
266   } else {
267     mConduit->SetTransmitterTransport(mTransport);
268   }
269 }
270 
~MediaPipeline()271 MediaPipeline::~MediaPipeline() {
272   MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
273           ("Destroying MediaPipeline: %s", mDescription.c_str()));
274   NS_ReleaseOnMainThread("MediaPipeline::mConduit", mConduit.forget());
275 }
276 
Shutdown_m()277 void MediaPipeline::Shutdown_m() {
278   Stop();
279   DetachMedia();
280 
281   RUN_ON_THREAD(mStsThread,
282                 WrapRunnable(RefPtr<MediaPipeline>(this),
283                              &MediaPipeline::DetachTransport_s),
284                 NS_DISPATCH_NORMAL);
285 }
286 
DetachTransport_s()287 void MediaPipeline::DetachTransport_s() {
288   ASSERT_ON_THREAD(mStsThread);
289 
290   MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
291           ("%s in %s", mDescription.c_str(), __FUNCTION__));
292 
293   disconnect_all();
294   mRtpState = TransportLayer::TS_NONE;
295   mRtcpState = TransportLayer::TS_NONE;
296   mTransportId.clear();
297   mTransport->Detach();
298 
299   // Make sure any cycles are broken
300   mPacketDumper = nullptr;
301 }
302 
UpdateTransport_m(const std::string & aTransportId,UniquePtr<MediaPipelineFilter> && aFilter)303 void MediaPipeline::UpdateTransport_m(
304     const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
305   mStsThread->Dispatch(NS_NewRunnableFunction(
306       __func__, [aTransportId, filter = std::move(aFilter),
307                  self = RefPtr<MediaPipeline>(this)]() mutable {
308         self->UpdateTransport_s(aTransportId, std::move(filter));
309       }));
310 }
311 
UpdateTransport_s(const std::string & aTransportId,UniquePtr<MediaPipelineFilter> && aFilter)312 void MediaPipeline::UpdateTransport_s(
313     const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
314   ASSERT_ON_THREAD(mStsThread);
315   if (!mSignalsConnected) {
316     mTransportHandler->SignalStateChange.connect(
317         this, &MediaPipeline::RtpStateChange);
318     mTransportHandler->SignalRtcpStateChange.connect(
319         this, &MediaPipeline::RtcpStateChange);
320     mTransportHandler->SignalEncryptedSending.connect(
321         this, &MediaPipeline::EncryptedPacketSending);
322     mTransportHandler->SignalPacketReceived.connect(
323         this, &MediaPipeline::PacketReceived);
324     mTransportHandler->SignalAlpnNegotiated.connect(
325         this, &MediaPipeline::AlpnNegotiated);
326     mSignalsConnected = true;
327   }
328 
329   if (aTransportId != mTransportId) {
330     mTransportId = aTransportId;
331     mRtpState = mTransportHandler->GetState(mTransportId, false);
332     mRtcpState = mTransportHandler->GetState(mTransportId, true);
333     CheckTransportStates();
334   }
335 
336   if (mFilter) {
337     for (const auto& extension : mFilter->GetExtmap()) {
338       mRtpParser->DeregisterRtpHeaderExtension(
339           webrtc::StringToRtpExtensionType(extension.uri));
340     }
341   }
342   if (mFilter && aFilter) {
343     // Use the new filter, but don't forget any remote SSRCs that we've learned
344     // by receiving traffic.
345     mFilter->Update(*aFilter);
346   } else {
347     mFilter = std::move(aFilter);
348   }
349   if (mFilter) {
350     for (const auto& extension : mFilter->GetExtmap()) {
351       mRtpParser->RegisterRtpHeaderExtension(
352           webrtc::StringToRtpExtensionType(extension.uri), extension.id);
353     }
354   }
355 }
356 
AddRIDExtension_m(size_t aExtensionId)357 void MediaPipeline::AddRIDExtension_m(size_t aExtensionId) {
358   RUN_ON_THREAD(mStsThread,
359                 WrapRunnable(RefPtr<MediaPipeline>(this),
360                              &MediaPipeline::AddRIDExtension_s, aExtensionId),
361                 NS_DISPATCH_NORMAL);
362 }
363 
AddRIDExtension_s(size_t aExtensionId)364 void MediaPipeline::AddRIDExtension_s(size_t aExtensionId) {
365   mRtpParser->RegisterRtpHeaderExtension(webrtc::kRtpExtensionRtpStreamId,
366                                          aExtensionId);
367 }
368 
AddRIDFilter_m(const std::string & aRid)369 void MediaPipeline::AddRIDFilter_m(const std::string& aRid) {
370   RUN_ON_THREAD(mStsThread,
371                 WrapRunnable(RefPtr<MediaPipeline>(this),
372                              &MediaPipeline::AddRIDFilter_s, aRid),
373                 NS_DISPATCH_NORMAL);
374 }
375 
AddRIDFilter_s(const std::string & aRid)376 void MediaPipeline::AddRIDFilter_s(const std::string& aRid) {
377   // Running a simulcast test, ignore other filtering
378   mFilter = MakeUnique<MediaPipelineFilter>();
379   mFilter->AddRemoteRtpStreamId(aRid);
380 }
381 
GetContributingSourceStats(const nsString & aInboundRtpStreamId,FallibleTArray<dom::RTCRTPContributingSourceStats> & aArr) const382 void MediaPipeline::GetContributingSourceStats(
383     const nsString& aInboundRtpStreamId,
384     FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
385   // Get the expiry from now
386   DOMHighResTimeStamp expiry = RtpCSRCStats::GetExpiryFromTime(GetNow());
387   for (auto info : mCsrcStats) {
388     if (!info.second.Expired(expiry)) {
389       RTCRTPContributingSourceStats stats;
390       info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
391       if (!aArr.AppendElement(stats, fallible)) {
392         mozalloc_handle_oom(0);
393       }
394     }
395   }
396 }
397 
RtpStateChange(const std::string & aTransportId,TransportLayer::State aState)398 void MediaPipeline::RtpStateChange(const std::string& aTransportId,
399                                    TransportLayer::State aState) {
400   if (mTransportId != aTransportId) {
401     return;
402   }
403   mRtpState = aState;
404   CheckTransportStates();
405 }
406 
RtcpStateChange(const std::string & aTransportId,TransportLayer::State aState)407 void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
408                                     TransportLayer::State aState) {
409   if (mTransportId != aTransportId) {
410     return;
411   }
412   mRtcpState = aState;
413   CheckTransportStates();
414 }
415 
CheckTransportStates()416 void MediaPipeline::CheckTransportStates() {
417   ASSERT_ON_THREAD(mStsThread);
418 
419   if (mRtpState == TransportLayer::TS_CLOSED ||
420       mRtpState == TransportLayer::TS_ERROR ||
421       mRtcpState == TransportLayer::TS_CLOSED ||
422       mRtcpState == TransportLayer::TS_ERROR) {
423     MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
424             ("RTP Transport failed for pipeline %p flow %s", this,
425              mDescription.c_str()));
426 
427     NS_WARNING(
428         "MediaPipeline Transport failed. This is not properly cleaned up yet");
429     // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
430     // connection was good and now it is bad.
431     // TODO(ekr@rtfm.com): Report up so that the PC knows we
432     // have experienced an error.
433     mTransport->Detach();
434     return;
435   }
436 
437   if (mRtpState == TransportLayer::TS_OPEN) {
438     MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
439             ("RTP Transport ready for pipeline %p flow %s", this,
440              mDescription.c_str()));
441   }
442 
443   if (mRtcpState == TransportLayer::TS_OPEN) {
444     MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
445             ("RTCP Transport ready for pipeline %p flow %s", this,
446              mDescription.c_str()));
447   }
448 
449   if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
450     mTransport->Attach(this);
451     TransportReady_s();
452   }
453 }
454 
SendPacket(MediaPacket && packet)455 void MediaPipeline::SendPacket(MediaPacket&& packet) {
456   ASSERT_ON_THREAD(mStsThread);
457   MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
458   MOZ_ASSERT(!mTransportId.empty());
459   mTransportHandler->SendPacket(mTransportId, std::move(packet));
460 }
461 
IncrementRtpPacketsSent(int32_t aBytes)462 void MediaPipeline::IncrementRtpPacketsSent(int32_t aBytes) {
463   ++mRtpPacketsSent;
464   mRtpBytesSent += aBytes;
465 
466   if (!(mRtpPacketsSent % 100)) {
467     MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
468             ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
469              mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
470   }
471 }
472 
IncrementRtcpPacketsSent()473 void MediaPipeline::IncrementRtcpPacketsSent() {
474   ++mRtcpPacketsSent;
475   if (!(mRtcpPacketsSent % 100)) {
476     MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
477             ("RTCP sent packet count for %s Pipeline %p: %u",
478              mDescription.c_str(), this, mRtcpPacketsSent));
479   }
480 }
481 
IncrementRtpPacketsReceived(int32_t aBytes)482 void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
483   ++mRtpPacketsReceived;
484   mRtpBytesReceived += aBytes;
485   if (!(mRtpPacketsReceived % 100)) {
486     MOZ_LOG(
487         gMediaPipelineLog, LogLevel::Info,
488         ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
489          mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
490   }
491 }
492 
IncrementRtcpPacketsReceived()493 void MediaPipeline::IncrementRtcpPacketsReceived() {
494   ++mRtcpPacketsReceived;
495   if (!(mRtcpPacketsReceived % 100)) {
496     MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
497             ("RTCP received packet count for %s Pipeline %p: %u",
498              mDescription.c_str(), this, mRtcpPacketsReceived));
499   }
500 }
501 
RtpPacketReceived(const MediaPacket & packet)502 void MediaPipeline::RtpPacketReceived(const MediaPacket& packet) {
503   if (mDirection == DirectionType::TRANSMIT) {
504     return;
505   }
506 
507   if (!mTransport->Pipeline()) {
508     MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
509             ("Discarding incoming packet; transport disconnected"));
510     return;
511   }
512 
513   if (!mConduit) {
514     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
515             ("Discarding incoming packet; media disconnected"));
516     return;
517   }
518 
519   if (!packet.len()) {
520     return;
521   }
522 
523   webrtc::RTPHeader header;
524   if (!mRtpParser->Parse(packet.data(), packet.len(), &header, true)) {
525     return;
526   }
527 
528   if (mFilter && !mFilter->Filter(header)) {
529     return;
530   }
531 
532   // Make sure to only get the time once, and only if we need it by
533   // using getTimestamp() for access
534   DOMHighResTimeStamp now = 0.0;
535   bool hasTime = false;
536 
537   // Remove expired RtpCSRCStats
538   if (!mCsrcStats.empty()) {
539     if (!hasTime) {
540       now = GetNow();
541       hasTime = true;
542     }
543     auto expiry = RtpCSRCStats::GetExpiryFromTime(now);
544     for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
545       if (p->second.Expired(expiry)) {
546         p = mCsrcStats.erase(p);
547         continue;
548       }
549       p++;
550     }
551   }
552 
553   // Add new RtpCSRCStats
554   if (header.numCSRCs) {
555     for (auto i = 0; i < header.numCSRCs; i++) {
556       if (!hasTime) {
557         now = GetNow();
558         hasTime = true;
559       }
560       auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
561       if (csrcInfo == mCsrcStats.end()) {
562         mCsrcStats.insert(std::make_pair(
563             header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now)));
564       } else {
565         csrcInfo->second.SetTimestamp(now);
566       }
567     }
568   }
569 
570   MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
571           ("%s received RTP packet.", mDescription.c_str()));
572   IncrementRtpPacketsReceived(packet.len());
573   OnRtpPacketReceived();
574 
575   RtpLogger::LogPacket(packet, true, mDescription);
576 
577   // Might be nice to pass ownership of the buffer in this case, but it is a
578   // small optimization in a rare case.
579   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
580                       packet.encrypted_data(), packet.encrypted_len());
581 
582   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
583                       packet.len());
584 
585   (void)mConduit->ReceivedRTPPacket(packet.data(), packet.len(),
586                                     header);  // Ignore error codes
587 }
588 
RtcpPacketReceived(const MediaPacket & packet)589 void MediaPipeline::RtcpPacketReceived(const MediaPacket& packet) {
590   if (!mTransport->Pipeline()) {
591     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
592             ("Discarding incoming packet; transport disconnected"));
593     return;
594   }
595 
596   if (!mConduit) {
597     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
598             ("Discarding incoming packet; media disconnected"));
599     return;
600   }
601 
602   if (!packet.len()) {
603     return;
604   }
605 
606   // We do not filter RTCP. This is because a compound RTCP packet can contain
607   // any collection of RTCP packets, and webrtc.org already knows how to filter
608   // out what it is interested in, and what it is not. Maybe someday we should
609   // have a TransportLayer that breaks up compound RTCP so we can filter them
610   // individually, but I doubt that will matter much.
611 
612   MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
613           ("%s received RTCP packet.", mDescription.c_str()));
614   IncrementRtcpPacketsReceived();
615 
616   RtpLogger::LogPacket(packet, true, mDescription);
617 
618   // Might be nice to pass ownership of the buffer in this case, but it is a
619   // small optimization in a rare case.
620   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false,
621                       packet.encrypted_data(), packet.encrypted_len());
622 
623   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false,
624                       packet.data(), packet.len());
625 
626   if (StaticPrefs::media_webrtc_net_force_disable_rtcp_reception()) {
627     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
628             ("%s RTCP packet forced to be dropped", mDescription.c_str()));
629     return;
630   }
631 
632   (void)mConduit->ReceivedRTCPPacket(packet.data(),
633                                      packet.len());  // Ignore error codes
634 }
635 
PacketReceived(const std::string & aTransportId,const MediaPacket & packet)636 void MediaPipeline::PacketReceived(const std::string& aTransportId,
637                                    const MediaPacket& packet) {
638   if (mTransportId != aTransportId) {
639     return;
640   }
641 
642   if (!mTransport->Pipeline()) {
643     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
644             ("Discarding incoming packet; transport disconnected"));
645     return;
646   }
647 
648   switch (packet.type()) {
649     case MediaPacket::RTP:
650       RtpPacketReceived(packet);
651       break;
652     case MediaPacket::RTCP:
653       RtcpPacketReceived(packet);
654       break;
655     default:;
656   }
657 }
658 
AlpnNegotiated(const std::string & aAlpn,bool aPrivacyRequested)659 void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
660                                    bool aPrivacyRequested) {
661   if (aPrivacyRequested) {
662     MakePrincipalPrivate_s();
663   }
664 }
665 
EncryptedPacketSending(const std::string & aTransportId,const MediaPacket & aPacket)666 void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
667                                            const MediaPacket& aPacket) {
668   if (mTransportId == aTransportId) {
669     dom::mozPacketDumpType type;
670     if (aPacket.type() == MediaPacket::SRTP) {
671       type = dom::mozPacketDumpType::Srtp;
672     } else if (aPacket.type() == MediaPacket::SRTCP) {
673       type = dom::mozPacketDumpType::Srtcp;
674     } else if (aPacket.type() == MediaPacket::DTLS) {
675       // TODO(bug 1497936): Implement packet dump for DTLS
676       return;
677     } else {
678       MOZ_ASSERT(false);
679       return;
680     }
681     mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
682   }
683 }
684 
685 class MediaPipelineTransmit::PipelineListener
686     : public DirectMediaTrackListener {
687   friend class MediaPipelineTransmit;
688 
689  public:
PipelineListener(RefPtr<MediaSessionConduit> aConduit)690   explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
691       : mConduit(std::move(aConduit)),
692         mActive(false),
693         mEnabled(false),
694         mDirectConnect(false) {}
695 
~PipelineListener()696   ~PipelineListener() {
697     NS_ReleaseOnMainThread("MediaPipeline::mConduit", mConduit.forget());
698     if (mConverter) {
699       mConverter->Shutdown();
700     }
701   }
702 
SetActive(bool aActive)703   void SetActive(bool aActive) {
704     mActive = aActive;
705     if (mConverter) {
706       mConverter->SetActive(aActive);
707     }
708   }
SetEnabled(bool aEnabled)709   void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
710 
711   // These are needed since nested classes don't have access to any particular
712   // instance of the parent
SetAudioProxy(RefPtr<AudioProxyThread> aProxy)713   void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
714     mAudioProcessing = std::move(aProxy);
715   }
716 
SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter)717   void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
718     mConverter = std::move(aConverter);
719   }
720 
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)721   void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
722     MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
723     static_cast<VideoSessionConduit*>(mConduit.get())
724         ->SendVideoFrame(aVideoFrame);
725   }
726 
727   void SetTrackEnabled(MediaStreamTrack* aTrack, bool aEnabled);
728 
729   // Implement MediaTrackListener
730   void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
731                            const MediaSegment& aQueuedMedia) override;
732 
733   // Implement DirectMediaTrackListener
734   void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
735                                const MediaSegment& aMedia) override;
736   void NotifyDirectListenerInstalled(InstallationResult aResult) override;
737   void NotifyDirectListenerUninstalled() override;
738 
739  private:
740   void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
741 
742   RefPtr<MediaSessionConduit> mConduit;
743   RefPtr<AudioProxyThread> mAudioProcessing;
744   RefPtr<VideoFrameConverter> mConverter;
745 
746   // active is true if there is a transport to send on
747   mozilla::Atomic<bool> mActive;
748   // enabled is true if the media access control permits sending
749   // actual content; when false you get black/silence
750   mozilla::Atomic<bool> mEnabled;
751 
752   // Written and read on the MediaTrackGraph thread
753   bool mDirectConnect;
754 };
755 
756 // MediaStreamTrackConsumer inherits from SupportsWeakPtr, which is
757 // main-thread-only.
758 class MediaPipelineTransmit::PipelineListenerTrackConsumer
759     : public MediaStreamTrackConsumer {
~PipelineListenerTrackConsumer()760   virtual ~PipelineListenerTrackConsumer() { MOZ_ASSERT(NS_IsMainThread()); }
761 
762   const RefPtr<PipelineListener> mListener;
763 
764  public:
765   NS_INLINE_DECL_REFCOUNTING(PipelineListenerTrackConsumer)
766 
PipelineListenerTrackConsumer(RefPtr<PipelineListener> aListener)767   explicit PipelineListenerTrackConsumer(RefPtr<PipelineListener> aListener)
768       : mListener(std::move(aListener)) {
769     MOZ_ASSERT(NS_IsMainThread());
770   }
771 
772   // Implement MediaStreamTrackConsumer
NotifyEnabledChanged(MediaStreamTrack * aTrack,bool aEnabled)773   void NotifyEnabledChanged(MediaStreamTrack* aTrack, bool aEnabled) override {
774     MOZ_ASSERT(NS_IsMainThread());
775     mListener->SetTrackEnabled(aTrack, aEnabled);
776   }
777 };
778 
779 // Implements VideoConverterListener for MediaPipeline.
780 //
781 // We pass converted frames on to MediaPipelineTransmit::PipelineListener
782 // where they are further forwarded to VideoConduit.
783 // MediaPipelineTransmit calls Detach() during shutdown to ensure there is
784 // no cyclic dependencies between us and PipelineListener.
785 class MediaPipelineTransmit::VideoFrameFeeder : public VideoConverterListener {
786  public:
VideoFrameFeeder(RefPtr<PipelineListener> aListener)787   explicit VideoFrameFeeder(RefPtr<PipelineListener> aListener)
788       : mMutex("VideoFrameFeeder"), mListener(std::move(aListener)) {
789     MOZ_COUNT_CTOR(VideoFrameFeeder);
790   }
791 
Detach()792   void Detach() {
793     MutexAutoLock lock(mMutex);
794 
795     mListener = nullptr;
796   }
797 
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)798   void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) override {
799     MutexAutoLock lock(mMutex);
800 
801     if (!mListener) {
802       return;
803     }
804 
805     mListener->OnVideoFrameConverted(aVideoFrame);
806   }
807 
808  protected:
809   MOZ_COUNTED_DTOR_OVERRIDE(VideoFrameFeeder)
810 
811   Mutex mMutex;  // Protects the member below.
812   RefPtr<PipelineListener> mListener;
813 };
814 
MediaPipelineTransmit(const std::string & aPc,RefPtr<MediaTransportHandler> aTransportHandler,RefPtr<nsISerialEventTarget> aMainThread,RefPtr<nsISerialEventTarget> aStsThread,bool aIsVideo,RefPtr<MediaSessionConduit> aConduit)815 MediaPipelineTransmit::MediaPipelineTransmit(
816     const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
817     RefPtr<nsISerialEventTarget> aMainThread,
818     RefPtr<nsISerialEventTarget> aStsThread, bool aIsVideo,
819     RefPtr<MediaSessionConduit> aConduit)
820     : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
821                     std::move(aMainThread), std::move(aStsThread),
822                     std::move(aConduit)),
823       mIsVideo(aIsVideo),
824       mListener(new PipelineListener(mConduit)),
825       mTrackConsumer(
826           MakeAndAddRef<nsMainThreadPtrHolder<PipelineListenerTrackConsumer>>(
827               "MediaPipelineTransmit::mTrackConsumer",
828               MakeAndAddRef<PipelineListenerTrackConsumer>(mListener))),
829       mFeeder(aIsVideo ? MakeAndAddRef<VideoFrameFeeder>(mListener)
830                        : nullptr),  // For video we send frames to an
831                                     // async VideoFrameConverter that
832                                     // calls back to a VideoFrameFeeder
833                                     // that feeds I420 frames to
834                                     // VideoConduit.
835       mTransmitting(false) {
836   if (!IsVideo()) {
837     mAudioProcessing = MakeAndAddRef<AudioProxyThread>(
838         static_cast<AudioSessionConduit*>(Conduit()));
839     mListener->SetAudioProxy(mAudioProcessing);
840   } else {  // Video
841     mConverter = MakeAndAddRef<VideoFrameConverter>();
842     mConverter->AddListener(mFeeder);
843     mListener->SetVideoFrameConverter(mConverter);
844   }
845 }
846 
~MediaPipelineTransmit()847 MediaPipelineTransmit::~MediaPipelineTransmit() {
848   if (mFeeder) {
849     mFeeder->Detach();
850   }
851 
852   MOZ_ASSERT(!mDomTrack);
853 }
854 
SetDescription_s(const std::string & description)855 void MediaPipeline::SetDescription_s(const std::string& description) {
856   mDescription = description;
857 }
858 
SetDescription()859 void MediaPipelineTransmit::SetDescription() {
860   std::string description;
861   description = mPc + "| ";
862   description += mConduit->type() == MediaSessionConduit::AUDIO
863                      ? "Transmit audio["
864                      : "Transmit video[";
865 
866   if (!mDomTrack) {
867     description += "no track]";
868   } else {
869     nsString nsTrackId;
870     mDomTrack->GetId(nsTrackId);
871     std::string trackId(NS_ConvertUTF16toUTF8(nsTrackId).get());
872     description += trackId;
873     description += "]";
874   }
875 
876   RUN_ON_THREAD(
877       mStsThread,
878       WrapRunnable(RefPtr<MediaPipeline>(this),
879                    &MediaPipelineTransmit::SetDescription_s, description),
880       NS_DISPATCH_NORMAL);
881 }
882 
Stop()883 RefPtr<GenericPromise> MediaPipelineTransmit::Stop() {
884   ASSERT_ON_THREAD(mMainThread);
885 
886   // Since we are stopping Start is not needed.
887   mAsyncStartRequested = false;
888 
889   if (!mTransmitting) {
890     return GenericPromise::CreateAndResolve(true, __func__);
891   }
892 
893   if (!mSendTrack) {
894     return GenericPromise::CreateAndResolve(true, __func__);
895   }
896 
897   mTransmitting = false;
898   mConduit->StopTransmitting();
899 
900   mSendTrack->Suspend();
901   if (mSendTrack->mType == MediaSegment::VIDEO) {
902     mSendTrack->RemoveDirectListener(mListener);
903   }
904   return mSendTrack->RemoveListener(mListener);
905 }
906 
Transmitting() const907 bool MediaPipelineTransmit::Transmitting() const {
908   ASSERT_ON_THREAD(mMainThread);
909 
910   return mTransmitting;
911 }
912 
Start()913 void MediaPipelineTransmit::Start() {
914   ASSERT_ON_THREAD(mMainThread);
915 
916   // Since start arrived reset the flag.
917   mAsyncStartRequested = false;
918 
919   if (mTransmitting) {
920     return;
921   }
922 
923   if (!mSendTrack) {
924     return;
925   }
926 
927   mTransmitting = true;
928   mConduit->StartTransmitting();
929 
930   // TODO(ekr@rtfm.com): Check for errors
931   MOZ_LOG(
932       gMediaPipelineLog, LogLevel::Debug,
933       ("Attaching pipeline to track %p conduit type=%s", this,
934        (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video")));
935 
936   mSendTrack->Resume();
937 
938   if (mSendTrack->mType == MediaSegment::VIDEO) {
939     mSendTrack->AddDirectListener(mListener);
940   }
941   mSendTrack->AddListener(mListener);
942 }
943 
IsVideo() const944 bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
945 
UpdateSinkIdentity_m(const MediaStreamTrack * aTrack,nsIPrincipal * aPrincipal,const PeerIdentity * aSinkIdentity)946 void MediaPipelineTransmit::UpdateSinkIdentity_m(
947     const MediaStreamTrack* aTrack, nsIPrincipal* aPrincipal,
948     const PeerIdentity* aSinkIdentity) {
949   ASSERT_ON_THREAD(mMainThread);
950 
951   if (aTrack != nullptr && aTrack != mDomTrack) {
952     // If a track is specified, then it might not be for this pipeline,
953     // since we receive notifications for all tracks on the PC.
954     // nullptr means that the PeerIdentity has changed and shall be applied
955     // to all tracks of the PC.
956     return;
957   }
958 
959   if (!mDomTrack) {
960     // Nothing to do here
961     return;
962   }
963 
964   bool enableTrack = aPrincipal->Subsumes(mDomTrack->GetPrincipal());
965   if (!enableTrack) {
966     // first try didn't work, but there's a chance that this is still available
967     // if our track is bound to a peerIdentity, and the peer connection (our
968     // sink) is bound to the same identity, then we can enable the track.
969     const PeerIdentity* trackIdentity = mDomTrack->GetPeerIdentity();
970     if (aSinkIdentity && trackIdentity) {
971       enableTrack = (*aSinkIdentity == *trackIdentity);
972     }
973   }
974 
975   mListener->SetEnabled(enableTrack);
976 }
977 
DetachMedia()978 void MediaPipelineTransmit::DetachMedia() {
979   ASSERT_ON_THREAD(mMainThread);
980   MOZ_ASSERT(!mTransmitting);
981   if (mDomTrack) {
982     mDomTrack->RemoveConsumer(mTrackConsumer);
983     mDomTrack = nullptr;
984   }
985   if (mSendPort) {
986     mSendPort->Destroy();
987     mSendPort = nullptr;
988   }
989   if (mSendTrack) {
990     mSendTrack->Destroy();
991     mSendTrack = nullptr;
992   }
993   // Let the listener be destroyed with the pipeline (or later).
994 }
995 
TransportReady_s()996 void MediaPipelineTransmit::TransportReady_s() {
997   ASSERT_ON_THREAD(mStsThread);
998   // Call base ready function.
999   MediaPipeline::TransportReady_s();
1000   mListener->SetActive(true);
1001 }
1002 
AsyncStart(const RefPtr<GenericPromise> & aPromise)1003 void MediaPipelineTransmit::AsyncStart(const RefPtr<GenericPromise>& aPromise) {
1004   MOZ_ASSERT(NS_IsMainThread());
1005 
1006   // Start has already been scheduled.
1007   if (mAsyncStartRequested) {
1008     return;
1009   }
1010 
1011   mAsyncStartRequested = true;
1012   RefPtr<MediaPipelineTransmit> self = this;
1013   aPromise->Then(
1014       GetMainThreadSerialEventTarget(), __func__,
1015       [self](bool) {
1016         // In the meantime start or stop took place, do nothing.
1017         if (!self->mAsyncStartRequested) {
1018           return;
1019         }
1020         self->Start();
1021       },
1022       [](nsresult aRv) { MOZ_CRASH("Never get here!"); });
1023 }
1024 
SetTrack(RefPtr<MediaStreamTrack> aDomTrack)1025 nsresult MediaPipelineTransmit::SetTrack(RefPtr<MediaStreamTrack> aDomTrack) {
1026   MOZ_ASSERT(NS_IsMainThread());
1027 
1028   if (aDomTrack) {
1029     nsString nsTrackId;
1030     aDomTrack->GetId(nsTrackId);
1031     std::string track_id(NS_ConvertUTF16toUTF8(nsTrackId).get());
1032     MOZ_LOG(
1033         gMediaPipelineLog, LogLevel::Debug,
1034         ("Reattaching pipeline to track %p track %s conduit type: %s",
1035          &aDomTrack, track_id.c_str(),
1036          (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video")));
1037   }
1038 
1039   if (mDomTrack) {
1040     mDomTrack->RemoveConsumer(mTrackConsumer);
1041   }
1042   if (mSendPort) {
1043     mSendPort->Destroy();
1044     mSendPort = nullptr;
1045   }
1046 
1047   if (aDomTrack && !aDomTrack->Ended() && mSendTrack &&
1048       aDomTrack->Graph() != mSendTrack->Graph()) {
1049     // Recreate the send track if the new stream resides in a different MTG.
1050     // Stopping and re-starting will result in removing and re-adding the
1051     // listener BUT in different threads, since tracks belong to different MTGs.
1052     // This can create thread races so we wait here for the stop to happen
1053     // before re-starting. Please note that start should happen at the end of
1054     // the method after the mSendTrack replace bellow. However, since the
1055     // result of the promise is dispatched in another event in the same thread,
1056     // it is guaranteed that the start will be executed after the end of that
1057     // method.
1058     if (mTransmitting) {
1059       RefPtr<GenericPromise> p = Stop();
1060       AsyncStart(p);
1061     }
1062     mSendTrack->Destroy();
1063     mSendTrack = nullptr;
1064   }
1065 
1066   mDomTrack = std::move(aDomTrack);
1067   SetDescription();
1068 
1069   if (mDomTrack) {
1070     if (!mDomTrack->Ended()) {
1071       if (!mSendTrack) {
1072         // Create the send track when the first live track is set or when the
1073         // new track resides in different MTG.
1074         SetSendTrack(mDomTrack->Graph()->CreateForwardedInputTrack(
1075             mDomTrack->GetTrack()->mType));
1076       }
1077       mSendPort = mSendTrack->AllocateInputPort(mDomTrack->GetTrack());
1078     }
1079     mDomTrack->AddConsumer(mTrackConsumer);
1080     if (mConverter) {
1081       mConverter->SetTrackEnabled(mDomTrack->Enabled());
1082     }
1083   }
1084 
1085   return NS_OK;
1086 }
1087 
SetSendTrack(RefPtr<ProcessedMediaTrack> aSendTrack)1088 void MediaPipelineTransmit::SetSendTrack(
1089     RefPtr<ProcessedMediaTrack> aSendTrack) {
1090   MOZ_ASSERT(NS_IsMainThread());
1091   MOZ_ASSERT(!mTransmitting);
1092   MOZ_ASSERT(!mSendTrack);
1093   mSendTrack = std::move(aSendTrack);
1094   mSendTrack->QueueSetAutoend(false);
1095   mSendTrack->Suspend();  // Suspended while not transmitting.
1096 }
1097 
SendRtpPacket(const uint8_t * aData,size_t aLen)1098 nsresult MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData,
1099                                                          size_t aLen) {
1100   MediaPacket packet;
1101   packet.Copy(aData, aLen, aLen + SRTP_MAX_EXPANSION);
1102   packet.SetType(MediaPacket::RTP);
1103 
1104   mStsThread->Dispatch(NS_NewRunnableFunction(
1105       __func__,
1106       [packet = std::move(packet),
1107        self = RefPtr<MediaPipeline::PipelineTransport>(this)]() mutable {
1108         self->SendRtpRtcpPacket_s(std::move(packet));
1109       }));
1110 
1111   return NS_OK;
1112 }
1113 
SendRtpRtcpPacket_s(MediaPacket && aPacket)1114 void MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s(
1115     MediaPacket&& aPacket) {
1116   bool isRtp = aPacket.type() == MediaPacket::RTP;
1117 
1118   ASSERT_ON_THREAD(mStsThread);
1119   if (!mPipeline) {
1120     return;  // Detached
1121   }
1122 
1123   if (isRtp && mPipeline->mRtpState != TransportLayer::TS_OPEN) {
1124     return;
1125   }
1126 
1127   if (!isRtp && mPipeline->mRtcpState != TransportLayer::TS_OPEN) {
1128     return;
1129   }
1130 
1131   aPacket.sdp_level() = Some(mPipeline->Level());
1132 
1133   if (RtpLogger::IsPacketLoggingOn()) {
1134     RtpLogger::LogPacket(aPacket, false, mPipeline->mDescription);
1135   }
1136 
1137   if (isRtp) {
1138     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1139                                    dom::mozPacketDumpType::Rtp, true,
1140                                    aPacket.data(), aPacket.len());
1141     mPipeline->IncrementRtpPacketsSent(aPacket.len());
1142   } else {
1143     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1144                                    dom::mozPacketDumpType::Rtcp, true,
1145                                    aPacket.data(), aPacket.len());
1146     mPipeline->IncrementRtcpPacketsSent();
1147   }
1148 
1149   MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
1150           ("%s sending %s packet", mPipeline->mDescription.c_str(),
1151            (isRtp ? "RTP" : "RTCP")));
1152 
1153   mPipeline->SendPacket(std::move(aPacket));
1154 }
1155 
SendRtcpPacket(const uint8_t * aData,size_t aLen)1156 nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(const uint8_t* aData,
1157                                                           size_t aLen) {
1158   MediaPacket packet;
1159   packet.Copy(aData, aLen, aLen + SRTP_MAX_EXPANSION);
1160   packet.SetType(MediaPacket::RTCP);
1161 
1162   mStsThread->Dispatch(NS_NewRunnableFunction(
1163       __func__,
1164       [packet = std::move(packet),
1165        self = RefPtr<MediaPipeline::PipelineTransport>(this)]() mutable {
1166         self->SendRtpRtcpPacket_s(std::move(packet));
1167       }));
1168 
1169   return NS_OK;
1170 }
1171 
1172 // Called if we're attached with AddDirectListener()
NotifyRealtimeTrackData(MediaTrackGraph * aGraph,TrackTime aOffset,const MediaSegment & aMedia)1173 void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
1174     MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) {
1175   MOZ_LOG(
1176       gMediaPipelineLog, LogLevel::Debug,
1177       ("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
1178        ", duration=%" PRId64,
1179        this, aOffset, aMedia.GetDuration()));
1180   TRACE_COMMENT("%s",
1181                 aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio");
1182   NewData(aMedia, aGraph->GraphRate());
1183 }
1184 
NotifyQueuedChanges(MediaTrackGraph * aGraph,TrackTime aOffset,const MediaSegment & aQueuedMedia)1185 void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
1186     MediaTrackGraph* aGraph, TrackTime aOffset,
1187     const MediaSegment& aQueuedMedia) {
1188   MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
1189           ("MediaPipeline::NotifyQueuedChanges()"));
1190 
1191   if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
1192     // We always get video from the direct listener.
1193     return;
1194   }
1195 
1196   TRACE_COMMENT("Audio");
1197 
1198   if (mDirectConnect) {
1199     // ignore non-direct data if we're also getting direct data
1200     return;
1201   }
1202 
1203   size_t rate;
1204   if (aGraph) {
1205     rate = aGraph->GraphRate();
1206   } else {
1207     // When running tests, graph may be null. In that case use a default.
1208     rate = 16000;
1209   }
1210   NewData(aQueuedMedia, rate);
1211 }
1212 
SetTrackEnabled(MediaStreamTrack * aTrack,bool aEnabled)1213 void MediaPipelineTransmit::PipelineListener::SetTrackEnabled(
1214     MediaStreamTrack* aTrack, bool aEnabled) {
1215   MOZ_ASSERT(NS_IsMainThread());
1216   if (mConduit->type() != MediaSessionConduit::VIDEO) {
1217     return;
1218   }
1219 
1220   MOZ_ASSERT(mConverter);
1221   mConverter->SetTrackEnabled(aEnabled);
1222 }
1223 
NotifyDirectListenerInstalled(InstallationResult aResult)1224 void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
1225     InstallationResult aResult) {
1226   MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
1227           ("MediaPipeline::NotifyDirectListenerInstalled() listener=%p,"
1228            " result=%d",
1229            this, static_cast<int32_t>(aResult)));
1230 
1231   mDirectConnect = InstallationResult::SUCCESS == aResult;
1232 }
1233 
1234 void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerUninstalled()1235     NotifyDirectListenerUninstalled() {
1236   MOZ_LOG(
1237       gMediaPipelineLog, LogLevel::Info,
1238       ("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p", this));
1239 
1240   if (mConduit->type() == MediaSessionConduit::VIDEO) {
1241     // Reset the converter's track-enabled state. If re-added to a new track
1242     // later and that track is disabled, we will be signaled explicitly.
1243     MOZ_ASSERT(mConverter);
1244     mConverter->SetTrackEnabled(true);
1245   }
1246 
1247   mDirectConnect = false;
1248 }
1249 
NewData(const MediaSegment & aMedia,TrackRate aRate)1250 void MediaPipelineTransmit::PipelineListener::NewData(
1251     const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
1252   if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
1253                                ? MediaSessionConduit::AUDIO
1254                                : MediaSessionConduit::VIDEO)) {
1255     MOZ_ASSERT(false,
1256                "The media type should always be correct since the "
1257                "listener is locked to a specific track");
1258     return;
1259   }
1260 
1261   // TODO(ekr@rtfm.com): For now assume that we have only one
1262   // track type and it's destined for us
1263   // See bug 784517
1264   if (aMedia.GetType() == MediaSegment::AUDIO) {
1265     MOZ_RELEASE_ASSERT(aRate > 0);
1266 
1267     if (!mActive) {
1268       MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
1269               ("Discarding audio packets because transport not ready"));
1270       return;
1271     }
1272 
1273     const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
1274     for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
1275          iter.Next()) {
1276       mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
1277     }
1278   } else {
1279     const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
1280 
1281     for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
1282          iter.Next()) {
1283       mConverter->QueueVideoChunk(*iter, !mEnabled);
1284     }
1285   }
1286 }
1287 
1288 class GenericReceiveListener : public MediaTrackListener {
1289  public:
GenericReceiveListener(RefPtr<nsISerialEventTarget> aMainThread,const RefPtr<dom::MediaStreamTrack> & aTrack)1290   explicit GenericReceiveListener(RefPtr<nsISerialEventTarget> aMainThread,
1291                                   const RefPtr<dom::MediaStreamTrack>& aTrack)
1292       : mMainThread(std::move(aMainThread)),
1293         mTrackSource(new nsMainThreadPtrHolder<RemoteTrackSource>(
1294             "GenericReceiveListener::mTrackSource",
1295             &static_cast<RemoteTrackSource&>(aTrack->GetSource()))),
1296         mSource(mTrackSource->mStream),
1297         mIsAudio(aTrack->AsAudioStreamTrack()),
1298         mListening(false),
1299         mMaybeTrackNeedsUnmute(true) {
1300     MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
1301     MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack");
1302   }
1303 
1304   virtual ~GenericReceiveListener() = default;
1305 
AddSelf()1306   void AddSelf() {
1307     if (mListening) {
1308       return;
1309     }
1310     mListening = true;
1311     mMaybeTrackNeedsUnmute = true;
1312     if (mIsAudio && !mSource->IsDestroyed()) {
1313       mSource->SetPullingEnabled(true);
1314     }
1315   }
1316 
RemoveSelf()1317   void RemoveSelf() {
1318     if (!mListening) {
1319       return;
1320     }
1321     mListening = false;
1322     if (mIsAudio && !mSource->IsDestroyed()) {
1323       mSource->SetPullingEnabled(false);
1324     }
1325   }
1326 
OnRtpReceived()1327   void OnRtpReceived() {
1328     if (mMaybeTrackNeedsUnmute) {
1329       mMaybeTrackNeedsUnmute = false;
1330       mMainThread->Dispatch(
1331           NewRunnableMethod("GenericReceiveListener::OnRtpReceived_m", this,
1332                             &GenericReceiveListener::OnRtpReceived_m));
1333     }
1334   }
1335 
OnRtpReceived_m()1336   void OnRtpReceived_m() {
1337     if (mListening) {
1338       mTrackSource->SetMuted(false);
1339     }
1340   }
1341 
EndTrack()1342   void EndTrack() {
1343     MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
1344             ("GenericReceiveListener ending track"));
1345 
1346     if (!mSource->IsDestroyed()) {
1347       // This breaks the cycle with the SourceMediaTrack
1348       mSource->RemoveListener(this);
1349       mSource->End();
1350       mSource->Destroy();
1351     }
1352 
1353     mMainThread->Dispatch(NewRunnableMethod("RemoteTrackSource::ForceEnded",
1354                                             mTrackSource.get(),
1355                                             &RemoteTrackSource::ForceEnded));
1356   }
1357 
1358  protected:
1359   const RefPtr<nsISerialEventTarget> mMainThread;
1360   const nsMainThreadPtrHandle<RemoteTrackSource> mTrackSource;
1361   const RefPtr<SourceMediaTrack> mSource;
1362   const bool mIsAudio;
1363   // Main thread only.
1364   bool mListening;
1365   // Any thread.
1366   Atomic<bool> mMaybeTrackNeedsUnmute;
1367 };
1368 
MediaPipelineReceive(const std::string & aPc,RefPtr<MediaTransportHandler> aTransportHandler,RefPtr<nsISerialEventTarget> aMainThread,RefPtr<nsISerialEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)1369 MediaPipelineReceive::MediaPipelineReceive(
1370     const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
1371     RefPtr<nsISerialEventTarget> aMainThread,
1372     RefPtr<nsISerialEventTarget> aStsThread,
1373     RefPtr<MediaSessionConduit> aConduit)
1374     : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE,
1375                     std::move(aMainThread), std::move(aStsThread),
1376                     std::move(aConduit)) {}
1377 
1378 MediaPipelineReceive::~MediaPipelineReceive() = default;
1379 
1380 class MediaPipelineReceiveAudio::PipelineListener
1381     : public GenericReceiveListener {
1382  public:
PipelineListener(RefPtr<nsISerialEventTarget> aMainThread,const RefPtr<dom::MediaStreamTrack> & aTrack,RefPtr<MediaSessionConduit> aConduit,const PrincipalHandle & aPrincipalHandle)1383   PipelineListener(RefPtr<nsISerialEventTarget> aMainThread,
1384                    const RefPtr<dom::MediaStreamTrack>& aTrack,
1385                    RefPtr<MediaSessionConduit> aConduit,
1386                    const PrincipalHandle& aPrincipalHandle)
1387       : GenericReceiveListener(std::move(aMainThread), aTrack),
1388         mConduit(std::move(aConduit)),
1389         // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
1390         // This is an artificial limitation, it would however require more
1391         // changes to support any rates. If the sampling rate is not-supported,
1392         // we will use 48kHz instead.
1393         mRate(static_cast<AudioSessionConduit*>(mConduit.get())
1394                       ->IsSamplingFreqSupported(mSource->Graph()->GraphRate())
1395                   ? mSource->Graph()->GraphRate()
1396                   : WEBRTC_MAX_SAMPLE_RATE),
1397         mTaskQueue(
1398             new TaskQueue(GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER),
1399                           "AudioPipelineListener")),
1400         mPlayedTicks(0),
1401         mPrincipalHandle(aPrincipalHandle),
1402         mForceSilence(false) {}
1403 
Init()1404   void Init() {
1405     mSource->SetAppendDataSourceRate(mRate);
1406     mSource->AddListener(this);
1407   }
1408 
1409   // Implement MediaTrackListener
NotifyPull(MediaTrackGraph * aGraph,TrackTime aEndOfAppendedData,TrackTime aDesiredTime)1410   void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData,
1411                   TrackTime aDesiredTime) override {
1412     NotifyPullImpl(aDesiredTime);
1413   }
1414 
MakePrincipalPrivate_s()1415   void MakePrincipalPrivate_s() {
1416     mForceSilence = true;
1417 
1418     mMainThread->Dispatch(NS_NewRunnableFunction(
1419         "MediaPipelineReceiveAudio::PipelineListener::MakePrincipalPrivate_s",
1420         [self = RefPtr<PipelineListener>(this), this] {
1421           class Message : public ControlMessage {
1422            public:
1423             Message(RefPtr<PipelineListener> aListener,
1424                     const PrincipalHandle& aPrivatePrincipal)
1425                 : ControlMessage(nullptr),
1426                   mListener(std::move(aListener)),
1427                   mPrivatePrincipal(aPrivatePrincipal) {}
1428 
1429             void Run() override {
1430               mListener->mPrincipalHandle = mPrivatePrincipal;
1431               mListener->mForceSilence = false;
1432             }
1433 
1434             const RefPtr<PipelineListener> mListener;
1435             PrincipalHandle mPrivatePrincipal;
1436           };
1437 
1438           RefPtr<nsIPrincipal> privatePrincipal =
1439               NullPrincipal::CreateWithInheritedAttributes(
1440                   mTrackSource->GetPrincipal());
1441           mTrackSource->SetPrincipal(privatePrincipal);
1442 
1443           if (mSource->IsDestroyed()) {
1444             return;
1445           }
1446 
1447           mSource->GraphImpl()->AppendMessage(
1448               MakeUnique<Message>(this, MakePrincipalHandle(privatePrincipal)));
1449         }));
1450   }
1451 
1452  private:
~PipelineListener()1453   ~PipelineListener() {
1454     NS_ReleaseOnMainThread("MediaPipeline::mConduit", mConduit.forget());
1455   }
1456 
NotifyPullImpl(TrackTime aDesiredTime)1457   void NotifyPullImpl(TrackTime aDesiredTime) {
1458     TRACE_COMMENT("Listener %p", this);
1459     uint32_t samplesPer10ms = mRate / 100;
1460 
1461     // mSource's rate is not necessarily the same as the graph rate, since there
1462     // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
1463     // 48kHz are supported. The audio frames we get here is going to be
1464     // resampled when inserted into the graph. aDesiredTime and mPlayedTicks are
1465     // in the graph rate.
1466 
1467     while (mPlayedTicks < aDesiredTime) {
1468       constexpr size_t scratchBufferLength =
1469           AUDIO_SAMPLE_BUFFER_MAX_BYTES / sizeof(int16_t);
1470       int16_t scratchBuffer[scratchBufferLength];
1471 
1472       size_t channelCount = 0;
1473       size_t samplesLength = scratchBufferLength;
1474 
1475       // This fetches 10ms of data, either mono or stereo
1476       MediaConduitErrorCode err =
1477           static_cast<AudioSessionConduit*>(mConduit.get())
1478               ->GetAudioFrame(scratchBuffer, mRate,
1479                               0,  // TODO(ekr@rtfm.com): better estimate of
1480                                   // "capture" (really playout) delay
1481                               channelCount, samplesLength);
1482 
1483       if (err != kMediaConduitNoError) {
1484         // Insert silence on conduit/GIPS failure (extremely unlikely)
1485         MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
1486                 ("Audio conduit failed (%d) to return data @ %" PRId64
1487                  " (desired %" PRId64 " -> %f)",
1488                  err, mPlayedTicks, aDesiredTime,
1489                  mSource->TrackTimeToSeconds(aDesiredTime)));
1490         channelCount = 1;
1491         // if this is not enough we'll loop and provide more
1492         samplesLength = samplesPer10ms;
1493         PodArrayZero(scratchBuffer);
1494       }
1495 
1496       MOZ_RELEASE_ASSERT(samplesLength <= scratchBufferLength);
1497 
1498       MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
1499               ("Audio conduit returned buffer of length %zu", samplesLength));
1500 
1501       CheckedInt<size_t> bufferSize(sizeof(uint16_t));
1502       bufferSize *= samplesLength;
1503       RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize);
1504       int16_t* samplesData = static_cast<int16_t*>(samples->Data());
1505       AudioSegment segment;
1506       size_t frames = samplesLength / channelCount;
1507       if (mForceSilence) {
1508         segment.AppendNullData(frames);
1509       } else {
1510         AutoTArray<int16_t*, 2> channels;
1511         AutoTArray<const int16_t*, 2> outputChannels;
1512 
1513         channels.SetLength(channelCount);
1514 
1515         size_t offset = 0;
1516         for (size_t i = 0; i < channelCount; i++) {
1517           channels[i] = samplesData + offset;
1518           offset += frames;
1519         }
1520 
1521         DeinterleaveAndConvertBuffer(scratchBuffer, frames, channelCount,
1522                                      channels.Elements());
1523 
1524         outputChannels.AppendElements(channels);
1525 
1526         segment.AppendFrames(samples.forget(), outputChannels, frames,
1527                              mPrincipalHandle);
1528       }
1529 
1530       // Handle track not actually added yet or removed/finished
1531       if (TrackTime appended = mSource->AppendData(&segment)) {
1532         mPlayedTicks += appended;
1533       } else {
1534         MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed"));
1535         // we can't un-read the data, but that's ok since we don't want to
1536         // buffer - but don't i-loop!
1537         break;
1538       }
1539     }
1540   }
1541 
1542   RefPtr<MediaSessionConduit> mConduit;
1543   // This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and
1544   // tries to be the same as the graph rate. If the graph rate is higher than
1545   // 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate,
1546   // audio is resampled to the graph rate.
1547   const TrackRate mRate;
1548   const RefPtr<TaskQueue> mTaskQueue;
1549   // Number of frames of data that has been added to the SourceMediaTrack in
1550   // the graph's rate. Graph thread only.
1551   TrackTicks mPlayedTicks;
1552   // Principal handle used when appending data to the SourceMediaTrack. Graph
1553   // thread only.
1554   PrincipalHandle mPrincipalHandle;
1555   // Set to true on the sts thread if privacy is requested when ALPN was
1556   // negotiated. Set to false again when mPrincipalHandle is private.
1557   Atomic<bool> mForceSilence;
1558 };
1559 
MediaPipelineReceiveAudio(const std::string & aPc,RefPtr<MediaTransportHandler> aTransportHandler,RefPtr<nsISerialEventTarget> aMainThread,RefPtr<nsISerialEventTarget> aStsThread,RefPtr<AudioSessionConduit> aConduit,const RefPtr<dom::MediaStreamTrack> & aTrack,const PrincipalHandle & aPrincipalHandle)1560 MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
1561     const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
1562     RefPtr<nsISerialEventTarget> aMainThread,
1563     RefPtr<nsISerialEventTarget> aStsThread,
1564     RefPtr<AudioSessionConduit> aConduit,
1565     const RefPtr<dom::MediaStreamTrack>& aTrack,
1566     const PrincipalHandle& aPrincipalHandle)
1567     : MediaPipelineReceive(aPc, std::move(aTransportHandler), aMainThread,
1568                            std::move(aStsThread), std::move(aConduit)),
1569       mListener(aTrack ? new PipelineListener(std::move(aMainThread), aTrack,
1570                                               mConduit, aPrincipalHandle)
1571                        : nullptr) {
1572   mDescription = mPc + "| Receive audio";
1573   if (mListener) {
1574     mListener->Init();
1575   }
1576 }
1577 
DetachMedia()1578 void MediaPipelineReceiveAudio::DetachMedia() {
1579   ASSERT_ON_THREAD(mMainThread);
1580   if (mListener) {
1581     mListener->EndTrack();
1582   }
1583 }
1584 
MakePrincipalPrivate_s()1585 void MediaPipelineReceiveAudio::MakePrincipalPrivate_s() {
1586   if (mListener) {
1587     mListener->MakePrincipalPrivate_s();
1588   }
1589 }
1590 
Start()1591 void MediaPipelineReceiveAudio::Start() {
1592   mConduit->StartReceiving();
1593   if (mListener) {
1594     mListener->AddSelf();
1595   }
1596 }
1597 
Stop()1598 RefPtr<GenericPromise> MediaPipelineReceiveAudio::Stop() {
1599   if (mListener) {
1600     mListener->RemoveSelf();
1601   }
1602   mConduit->StopReceiving();
1603   return GenericPromise::CreateAndResolve(true, __func__);
1604 }
1605 
OnRtpPacketReceived()1606 void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
1607   if (mListener) {
1608     mListener->OnRtpReceived();
1609   }
1610 }
1611 
1612 class MediaPipelineReceiveVideo::PipelineListener
1613     : public GenericReceiveListener {
1614  public:
PipelineListener(RefPtr<nsISerialEventTarget> aMainThread,const RefPtr<dom::MediaStreamTrack> & aTrack,const PrincipalHandle & aPrincipalHandle)1615   PipelineListener(RefPtr<nsISerialEventTarget> aMainThread,
1616                    const RefPtr<dom::MediaStreamTrack>& aTrack,
1617                    const PrincipalHandle& aPrincipalHandle)
1618       : GenericReceiveListener(std::move(aMainThread), aTrack),
1619         mImageContainer(
1620             LayerManager::CreateImageContainer(ImageContainer::ASYNCHRONOUS)),
1621         mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"),
1622         mPrincipalHandle(aPrincipalHandle) {}
1623 
Init()1624   void Init() { mSource->AddListener(this); }
1625 
MakePrincipalPrivate_s()1626   void MakePrincipalPrivate_s() {
1627     {
1628       MutexAutoLock lock(mMutex);
1629       mForceDropFrames = true;
1630     }
1631 
1632     mMainThread->Dispatch(NS_NewRunnableFunction(
1633         __func__, [self = RefPtr<PipelineListener>(this), this] {
1634           RefPtr<nsIPrincipal> privatePrincipal =
1635               NullPrincipal::CreateWithInheritedAttributes(
1636                   mTrackSource->GetPrincipal());
1637           mTrackSource->SetPrincipal(privatePrincipal);
1638 
1639           MutexAutoLock lock(mMutex);
1640           mPrincipalHandle = MakePrincipalHandle(privatePrincipal);
1641           mForceDropFrames = false;
1642         }));
1643   }
1644 
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)1645   void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
1646                         uint32_t aTimeStamp, int64_t aRenderTime) {
1647     PrincipalHandle principal;
1648     {
1649       MutexAutoLock lock(mMutex);
1650       if (mForceDropFrames) {
1651         return;
1652       }
1653       principal = mPrincipalHandle;
1654     }
1655     RefPtr<Image> image;
1656     if (aBuffer.type() == webrtc::VideoFrameBuffer::Type::kNative) {
1657       // We assume that only native handles are used with the
1658       // WebrtcMediaDataCodec decoder.
1659       const ImageBuffer* imageBuffer =
1660           static_cast<const ImageBuffer*>(&aBuffer);
1661       image = imageBuffer->GetNativeImage();
1662     } else {
1663       MOZ_ASSERT(aBuffer.type() == webrtc::VideoFrameBuffer::Type::kI420);
1664       rtc::scoped_refptr<const webrtc::I420BufferInterface> i420 =
1665           aBuffer.GetI420();
1666 
1667       MOZ_ASSERT(i420->DataY());
1668       // Create a video frame using |buffer|.
1669       RefPtr<PlanarYCbCrImage> yuvImage =
1670           mImageContainer->CreatePlanarYCbCrImage();
1671 
1672       PlanarYCbCrData yuvData;
1673       yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY());
1674       yuvData.mYSize = IntSize(i420->width(), i420->height());
1675       yuvData.mYStride = i420->StrideY();
1676       MOZ_ASSERT(i420->StrideU() == i420->StrideV());
1677       yuvData.mCbCrStride = i420->StrideU();
1678       yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU());
1679       yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV());
1680       yuvData.mCbCrSize =
1681           IntSize((i420->width() + 1) >> 1, (i420->height() + 1) >> 1);
1682       yuvData.mPicX = 0;
1683       yuvData.mPicY = 0;
1684       yuvData.mPicSize = IntSize(i420->width(), i420->height());
1685       yuvData.mStereoMode = StereoMode::MONO;
1686       // This isn't the best default.
1687       yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601;
1688 
1689       if (!yuvImage->CopyData(yuvData)) {
1690         MOZ_ASSERT(false);
1691         return;
1692       }
1693 
1694       image = std::move(yuvImage);
1695     }
1696 
1697     VideoSegment segment;
1698     auto size = image->GetSize();
1699     segment.AppendFrame(image.forget(), size, principal);
1700     mSource->AppendData(&segment);
1701   }
1702 
1703  private:
1704   RefPtr<layers::ImageContainer> mImageContainer;
1705   Mutex mMutex;  // Protects the below members.
1706   PrincipalHandle mPrincipalHandle;
1707   // Set to true on the sts thread if privacy is requested when ALPN was
1708   // negotiated. Set to false again when mPrincipalHandle is private.
1709   bool mForceDropFrames = false;
1710 };
1711 
1712 class MediaPipelineReceiveVideo::PipelineRenderer
1713     : public mozilla::VideoRenderer {
1714  public:
PipelineRenderer(MediaPipelineReceiveVideo * aPipeline)1715   explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
1716       : mPipeline(aPipeline) {}
1717 
Detach()1718   void Detach() { mPipeline = nullptr; }
1719 
1720   // Implement VideoRenderer
FrameSizeChange(unsigned int aWidth,unsigned int aHeight)1721   void FrameSizeChange(unsigned int aWidth, unsigned int aHeight) override {}
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)1722   void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
1723                         uint32_t aTimeStamp, int64_t aRenderTime) override {
1724     mPipeline->mListener->RenderVideoFrame(aBuffer, aTimeStamp, aRenderTime);
1725   }
1726 
1727  private:
1728   MediaPipelineReceiveVideo* mPipeline;  // Raw pointer to avoid cycles
1729 };
1730 
MediaPipelineReceiveVideo(const std::string & aPc,RefPtr<MediaTransportHandler> aTransportHandler,RefPtr<nsISerialEventTarget> aMainThread,RefPtr<nsISerialEventTarget> aStsThread,RefPtr<VideoSessionConduit> aConduit,const RefPtr<dom::MediaStreamTrack> & aTrack,const PrincipalHandle & aPrincipalHandle)1731 MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
1732     const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
1733     RefPtr<nsISerialEventTarget> aMainThread,
1734     RefPtr<nsISerialEventTarget> aStsThread,
1735     RefPtr<VideoSessionConduit> aConduit,
1736     const RefPtr<dom::MediaStreamTrack>& aTrack,
1737     const PrincipalHandle& aPrincipalHandle)
1738     : MediaPipelineReceive(aPc, std::move(aTransportHandler), aMainThread,
1739                            std::move(aStsThread), std::move(aConduit)),
1740       mRenderer(new PipelineRenderer(this)),
1741       mListener(aTrack ? new PipelineListener(std::move(aMainThread), aTrack,
1742                                               aPrincipalHandle)
1743                        : nullptr) {
1744   mDescription = mPc + "| Receive video";
1745   if (mListener) {
1746     mListener->Init();
1747   }
1748   static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer);
1749 }
1750 
DetachMedia()1751 void MediaPipelineReceiveVideo::DetachMedia() {
1752   ASSERT_ON_THREAD(mMainThread);
1753 
1754   // stop generating video and thus stop invoking the PipelineRenderer
1755   // and PipelineListener - the renderer has a raw ptr to the Pipeline to
1756   // avoid cycles, and the render callbacks are invoked from a different
1757   // thread so simple null-checks would cause TSAN bugs without locks.
1758   static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
1759   if (mListener) {
1760     mListener->EndTrack();
1761   }
1762 }
1763 
MakePrincipalPrivate_s()1764 void MediaPipelineReceiveVideo::MakePrincipalPrivate_s() {
1765   if (mListener) {
1766     mListener->MakePrincipalPrivate_s();
1767   }
1768 }
1769 
Start()1770 void MediaPipelineReceiveVideo::Start() {
1771   mConduit->StartReceiving();
1772   if (mListener) {
1773     mListener->AddSelf();
1774   }
1775 }
1776 
Stop()1777 RefPtr<GenericPromise> MediaPipelineReceiveVideo::Stop() {
1778   if (mListener) {
1779     mListener->RemoveSelf();
1780   }
1781   mConduit->StopReceiving();
1782   return GenericPromise::CreateAndResolve(true, __func__);
1783 }
1784 
OnRtpPacketReceived()1785 void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
1786   if (mListener) {
1787     mListener->OnRtpReceived();
1788   }
1789 }
1790 
GetNow() const1791 DOMHighResTimeStamp MediaPipeline::GetNow() const {
1792   return Conduit()->GetNow();
1793 }
1794 
GetExpiryFromTime(const DOMHighResTimeStamp aTime)1795 DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
1796     const DOMHighResTimeStamp aTime) {
1797   // DOMHighResTimeStamp is a unit measured in ms
1798   return aTime + EXPIRY_TIME_MILLISECONDS;
1799 }
1800 
RtpCSRCStats(const uint32_t aCsrc,const DOMHighResTimeStamp aTime)1801 MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
1802                                           const DOMHighResTimeStamp aTime)
1803     : mCsrc(aCsrc), mTimestamp(aTime) {}
1804 
GetWebidlInstance(dom::RTCRTPContributingSourceStats & aWebidlObj,const nsString & aInboundRtpStreamId) const1805 void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
1806     dom::RTCRTPContributingSourceStats& aWebidlObj,
1807     const nsString& aInboundRtpStreamId) const {
1808   nsString statId = NS_LITERAL_STRING("csrc_") + aInboundRtpStreamId;
1809   statId.AppendLiteral("_");
1810   statId.AppendInt(mCsrc);
1811   aWebidlObj.mId.Construct(statId);
1812   aWebidlObj.mType.Construct(RTCStatsType::Csrc);
1813   aWebidlObj.mTimestamp.Construct(mTimestamp);
1814   aWebidlObj.mContributorSsrc.Construct(mCsrc);
1815   aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
1816 }
1817 
1818 }  // namespace mozilla
1819