1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
4  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 // Original author: ekr@rtfm.com
7 
8 #include "MediaPipeline.h"
9 
10 #include <inttypes.h>
11 #include <math.h>
12 
13 #include "AudioSegment.h"
14 #include "AudioConverter.h"
15 #include "AutoTaskQueue.h"
16 #include "CSFLog.h"
17 #include "DOMMediaStream.h"
18 #include "ImageContainer.h"
19 #include "ImageTypes.h"
20 #include "Layers.h"
21 #include "LayersLogging.h"
22 #include "MediaEngine.h"
23 #include "MediaPipelineFilter.h"
24 #include "MediaSegment.h"
25 #include "MediaStreamGraphImpl.h"
26 #include "MediaStreamListener.h"
27 #include "MediaStreamTrack.h"
28 #include "MediaStreamVideoSink.h"
29 #include "RtpLogger.h"
30 #include "VideoSegment.h"
31 #include "VideoStreamTrack.h"
32 #include "VideoUtils.h"
33 #include "databuffer.h"
34 #include "libyuv/convert.h"
35 #include "mozilla/PeerIdentity.h"
36 #include "mozilla/Preferences.h"
37 #include "mozilla/SharedThreadPool.h"
38 #include "mozilla/Sprintf.h"
39 #include "mozilla/UniquePtr.h"
40 #include "mozilla/UniquePtrExtensions.h"
41 #include "mozilla/dom/RTCStatsReportBinding.h"
42 #include "mozilla/gfx/Point.h"
43 #include "mozilla/gfx/Types.h"
44 #include "nsError.h"
45 #include "nsThreadUtils.h"
46 #include "nspr.h"
47 #include "runnable_utils.h"
48 #include "srtp.h"
49 #include "transportflow.h"
50 #include "transportlayer.h"
51 #include "transportlayerdtls.h"
52 #include "transportlayerice.h"
53 
54 #include "webrtc/base/bind.h"
55 #include "webrtc/base/keep_ref_until_done.h"
56 #include "webrtc/common_types.h"
57 #include "webrtc/common_video/include/i420_buffer_pool.h"
58 #include "webrtc/common_video/include/video_frame_buffer.h"
59 #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
60 
61 // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
62 // 48KHz)
63 #define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
64 static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
65                   AUDIO_SAMPLE_BUFFER_MAX_BYTES,
66               "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
67 
68 // The number of frame buffers VideoFrameConverter may create before returning
69 // errors.
70 // Sometimes these are released synchronously but they can be forwarded all the
71 // way to the encoder for asynchronous encoding. With a pool size of 5,
72 // we allow 1 buffer for the current conversion, and 4 buffers to be queued at
73 // the encoder.
74 #define CONVERTER_BUFFER_POOL_SIZE 5
75 
76 using namespace mozilla;
77 using namespace mozilla::dom;
78 using namespace mozilla::gfx;
79 using namespace mozilla::layers;
80 
81 static const char* mpLogTag = "MediaPipeline";
82 #ifdef LOGTAG
83 #undef LOGTAG
84 #endif
85 #define LOGTAG mpLogTag
86 
87 namespace mozilla {
88 extern mozilla::LogModule* AudioLogModule();
89 
90 class VideoConverterListener {
91  public:
92   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoConverterListener)
93 
94   virtual void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) = 0;
95 
96  protected:
~VideoConverterListener()97   virtual ~VideoConverterListener() {}
98 };
99 
100 // An async video frame format converter.
101 //
102 // Input is typically a MediaStream(Track)Listener driven by MediaStreamGraph.
103 //
104 // We keep track of the size of the TaskQueue so we can drop frames if
105 // conversion is taking too long.
106 //
107 // Output is passed through to all added VideoConverterListeners on a TaskQueue
108 // thread whenever a frame is converted.
109 class VideoFrameConverter {
110  public:
111   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoFrameConverter)
112 
VideoFrameConverter()113   VideoFrameConverter()
114       : mLength(0),
115         mTaskQueue(new AutoTaskQueue(
116             GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER),
117             "VideoFrameConverter")),
118         mBufferPool(false, CONVERTER_BUFFER_POOL_SIZE),
119         mLastImage(
120             -1)  // -1 is not a guaranteed invalid serial. See bug 1262134.
121 #ifdef DEBUG
122         ,
123         mThrottleCount(0),
124         mThrottleRecord(0)
125 #endif
126         ,
127         mMutex("VideoFrameConverter") {
128     MOZ_COUNT_CTOR(VideoFrameConverter);
129   }
130 
QueueVideoChunk(const VideoChunk & aChunk,bool aForceBlack)131   void QueueVideoChunk(const VideoChunk& aChunk, bool aForceBlack) {
132     IntSize size = aChunk.mFrame.GetIntrinsicSize();
133     if (size.width == 0 || size.width == 0) {
134       return;
135     }
136 
137     if (aChunk.IsNull()) {
138       aForceBlack = true;
139     } else {
140       aForceBlack = aChunk.mFrame.GetForceBlack();
141     }
142 
143     int32_t serial;
144     if (aForceBlack) {
145       // Reset the last-img check.
146       // -1 is not a guaranteed invalid serial. See bug 1262134.
147       serial = -1;
148     } else {
149       serial = aChunk.mFrame.GetImage()->GetSerial();
150     }
151 
152     const double duplicateMinFps = 1.0;
153     TimeStamp t = aChunk.mTimeStamp;
154     MOZ_ASSERT(!t.IsNull());
155     if (!t.IsNull() && serial == mLastImage && !mLastFrameSent.IsNull() &&
156         (t - mLastFrameSent).ToSeconds() < (1.0 / duplicateMinFps)) {
157       // We get passed duplicate frames every ~10ms even with no frame change.
158 
159       // After disabling, or when the source is not producing many frames,
160       // we still want *some* frames to flow to the other side.
161       // It could happen that we drop the packet that carried the first disabled
162       // frame, for instance. Note that this still requires the application to
163       // send a frame, or it doesn't trigger at all.
164       return;
165     }
166     mLastFrameSent = t;
167     mLastImage = serial;
168 
169     // A throttling limit of 1 allows us to convert 2 frames concurrently.
170     // It's short enough to not build up too significant a delay, while
171     // giving us a margin to not cause some machines to drop every other frame.
172     const int32_t queueThrottlingLimit = 1;
173     if (mLength > queueThrottlingLimit) {
174       CSFLogDebug(LOGTAG,
175                   "VideoFrameConverter %p queue is full. Throttling by "
176                   "throwing away a frame.",
177                   this);
178 #ifdef DEBUG
179       ++mThrottleCount;
180       mThrottleRecord = std::max(mThrottleCount, mThrottleRecord);
181 #endif
182       return;
183     }
184 
185 #ifdef DEBUG
186     if (mThrottleCount > 0) {
187       if (mThrottleCount > 5) {
188         // Log at a higher level when we have large drops.
189         CSFLogInfo(LOGTAG,
190                    "VideoFrameConverter %p stopped throttling after throwing "
191                    "away %d frames. Longest throttle so far was %d frames.",
192                    this, mThrottleCount, mThrottleRecord);
193       } else {
194         CSFLogDebug(LOGTAG,
195                     "VideoFrameConverter %p stopped throttling after throwing "
196                     "away %d frames. Longest throttle so far was %d frames.",
197                     this, mThrottleCount, mThrottleRecord);
198       }
199       mThrottleCount = 0;
200     }
201 #endif
202 
203     ++mLength;  // Atomic
204 
205     nsCOMPtr<nsIRunnable> runnable =
206         NewRunnableMethod<StoreRefPtrPassByPtr<Image>, IntSize, bool>(
207             "VideoFrameConverter::ProcessVideoFrame", this,
208             &VideoFrameConverter::ProcessVideoFrame, aChunk.mFrame.GetImage(),
209             size, aForceBlack);
210     nsresult rv = mTaskQueue->Dispatch(runnable.forget());
211     MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
212     Unused << rv;
213   }
214 
AddListener(VideoConverterListener * aListener)215   void AddListener(VideoConverterListener* aListener) {
216     MutexAutoLock lock(mMutex);
217 
218     MOZ_ASSERT(!mListeners.Contains(aListener));
219     mListeners.AppendElement(aListener);
220   }
221 
RemoveListener(VideoConverterListener * aListener)222   bool RemoveListener(VideoConverterListener* aListener) {
223     MutexAutoLock lock(mMutex);
224 
225     return mListeners.RemoveElement(aListener);
226   }
227 
Shutdown()228   void Shutdown() {
229     MutexAutoLock lock(mMutex);
230     mListeners.Clear();
231   }
232 
233  protected:
~VideoFrameConverter()234   virtual ~VideoFrameConverter() { MOZ_COUNT_DTOR(VideoFrameConverter); }
235 
DeleteBuffer(uint8 * aData)236   static void DeleteBuffer(uint8* aData) { delete[] aData; }
237 
238   // This takes ownership of the buffer and attached it to the VideoFrame we
239   // send to the listeners
VideoFrameConverted(UniquePtr<uint8[]> aBuffer,unsigned int aVideoFrameLength,unsigned short aWidth,unsigned short aHeight,VideoType aVideoType,uint64_t aCaptureTime)240   void VideoFrameConverted(UniquePtr<uint8[]> aBuffer,
241                            unsigned int aVideoFrameLength,
242                            unsigned short aWidth, unsigned short aHeight,
243                            VideoType aVideoType, uint64_t aCaptureTime) {
244     // check for parameter sanity
245     if (!aBuffer || aVideoFrameLength == 0 || aWidth == 0 || aHeight == 0) {
246       CSFLogError(LOGTAG, "%s Invalid Parameters", __FUNCTION__);
247       MOZ_ASSERT(false);
248       return;
249     }
250     MOZ_ASSERT(aVideoType == VideoType::kVideoI420);
251 
252     const int stride_y = aWidth;
253     const int stride_uv = (aWidth + 1) / 2;
254 
255     const uint8_t* buffer_y = aBuffer.get();
256     const uint8_t* buffer_u = buffer_y + stride_y * aHeight;
257     const uint8_t* buffer_v = buffer_u + stride_uv * ((aHeight + 1) / 2);
258     rtc::scoped_refptr<webrtc::WrappedI420Buffer> video_frame_buffer(
259         new rtc::RefCountedObject<webrtc::WrappedI420Buffer>(
260             aWidth, aHeight, buffer_y, stride_y, buffer_u, stride_uv, buffer_v,
261             stride_uv, rtc::Bind(&DeleteBuffer, aBuffer.release())));
262 
263     webrtc::VideoFrame video_frame(video_frame_buffer, aCaptureTime,
264                                    aCaptureTime,
265                                    webrtc::kVideoRotation_0);  // XXX
266     VideoFrameConverted(video_frame);
267   }
268 
VideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)269   void VideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
270     MutexAutoLock lock(mMutex);
271 
272     for (RefPtr<VideoConverterListener>& listener : mListeners) {
273       listener->OnVideoFrameConverted(aVideoFrame);
274     }
275   }
276 
ProcessVideoFrame(Image * aImage,IntSize aSize,bool aForceBlack)277   void ProcessVideoFrame(Image* aImage, IntSize aSize, bool aForceBlack) {
278     --mLength;  // Atomic
279     MOZ_ASSERT(mLength >= 0);
280 
281     if (aForceBlack) {
282       // Send a black image.
283       rtc::scoped_refptr<webrtc::I420Buffer> buffer =
284           mBufferPool.CreateBuffer(aSize.width, aSize.height);
285       if (!buffer) {
286         MOZ_DIAGNOSTIC_ASSERT(false,
287                               "Buffers not leaving scope except for "
288                               "reconfig, should never leak");
289         CSFLogWarn(LOGTAG, "Creating a buffer for a black video frame failed");
290         return;
291       }
292 
293       CSFLogDebug(LOGTAG, "Sending a black video frame");
294       webrtc::I420Buffer::SetBlack(buffer);
295       webrtc::VideoFrame frame(buffer, 0, 0,  // not setting timestamps
296                                webrtc::kVideoRotation_0);
297       VideoFrameConverted(frame);
298       return;
299     }
300 
301     if (!aImage) {
302       MOZ_ASSERT_UNREACHABLE("Must have image if not forcing black");
303       return;
304     }
305 
306     ImageFormat format = aImage->GetFormat();
307     if (format == ImageFormat::PLANAR_YCBCR) {
308       // Cast away constness b/c some of the accessors are non-const
309       const PlanarYCbCrData* data =
310           static_cast<const PlanarYCbCrImage*>(aImage)->GetData();
311       if (data) {
312         uint8_t* y = data->mYChannel;
313         uint8_t* cb = data->mCbChannel;
314         uint8_t* cr = data->mCrChannel;
315         int32_t yStride = data->mYStride;
316         int32_t cbCrStride = data->mCbCrStride;
317         uint32_t width = aImage->GetSize().width;
318         uint32_t height = aImage->GetSize().height;
319 
320         rtc::scoped_refptr<webrtc::WrappedI420Buffer> video_frame_buffer(
321             new rtc::RefCountedObject<webrtc::WrappedI420Buffer>(
322                 width, height, y, yStride, cb, cbCrStride, cr, cbCrStride,
323                 rtc::KeepRefUntilDone(aImage)));
324 
325         webrtc::VideoFrame i420_frame(video_frame_buffer, 0,
326                                       0,  // not setting timestamps
327                                       webrtc::kVideoRotation_0);
328         CSFLogDebug(LOGTAG, "Sending an I420 video frame");
329         VideoFrameConverted(i420_frame);
330         return;
331       }
332     }
333 
334     RefPtr<SourceSurface> surf = aImage->GetAsSourceSurface();
335     if (!surf) {
336       CSFLogError(LOGTAG, "Getting surface from %s image failed",
337                   Stringify(format).c_str());
338       return;
339     }
340 
341     RefPtr<DataSourceSurface> data = surf->GetDataSurface();
342     if (!data) {
343       CSFLogError(
344           LOGTAG,
345           "Getting data surface from %s image with %s (%s) surface failed",
346           Stringify(format).c_str(), Stringify(surf->GetType()).c_str(),
347           Stringify(surf->GetFormat()).c_str());
348       return;
349     }
350 
351     if (aImage->GetSize() != aSize) {
352       MOZ_DIAGNOSTIC_ASSERT(false, "Unexpected intended size");
353       return;
354     }
355 
356     rtc::scoped_refptr<webrtc::I420Buffer> buffer =
357         mBufferPool.CreateBuffer(aSize.width, aSize.height);
358     if (!buffer) {
359       CSFLogWarn(LOGTAG, "Creating a buffer for a black video frame failed");
360       return;
361     }
362 
363     DataSourceSurface::ScopedMap map(data, DataSourceSurface::READ);
364     if (!map.IsMapped()) {
365       CSFLogError(
366           LOGTAG,
367           "Reading DataSourceSurface from %s image with %s (%s) surface failed",
368           Stringify(format).c_str(), Stringify(surf->GetType()).c_str(),
369           Stringify(surf->GetFormat()).c_str());
370       return;
371     }
372 
373     int rv;
374     switch (surf->GetFormat()) {
375       case SurfaceFormat::B8G8R8A8:
376       case SurfaceFormat::B8G8R8X8:
377         rv = libyuv::ARGBToI420(static_cast<uint8*>(map.GetData()),
378                                 map.GetStride(), buffer->MutableDataY(),
379                                 buffer->StrideY(), buffer->MutableDataU(),
380                                 buffer->StrideU(), buffer->MutableDataV(),
381                                 buffer->StrideV(), aSize.width, aSize.height);
382         break;
383       case SurfaceFormat::R5G6B5_UINT16:
384         rv = libyuv::RGB565ToI420(static_cast<uint8*>(map.GetData()),
385                                   map.GetStride(), buffer->MutableDataY(),
386                                   buffer->StrideY(), buffer->MutableDataU(),
387                                   buffer->StrideU(), buffer->MutableDataV(),
388                                   buffer->StrideV(), aSize.width, aSize.height);
389         break;
390       default:
391         CSFLogError(LOGTAG, "Unsupported RGB video format %s",
392                     Stringify(surf->GetFormat()).c_str());
393         MOZ_ASSERT(PR_FALSE);
394         return;
395     }
396     if (rv != 0) {
397       CSFLogError(LOGTAG, "%s to I420 conversion failed",
398                   Stringify(surf->GetFormat()).c_str());
399       return;
400     }
401     CSFLogDebug(LOGTAG, "Sending an I420 video frame converted from %s",
402                 Stringify(surf->GetFormat()).c_str());
403     webrtc::VideoFrame frame(buffer, 0, 0,  // not setting timestamps
404                              webrtc::kVideoRotation_0);
405     VideoFrameConverted(frame);
406   }
407 
408   Atomic<int32_t, Relaxed> mLength;
409   const RefPtr<AutoTaskQueue> mTaskQueue;
410   webrtc::I420BufferPool mBufferPool;
411 
412   // Written and read from the queueing thread (normally MSG).
413   int32_t mLastImage;        // serial number of last Image
414   TimeStamp mLastFrameSent;  // The time we sent the last frame.
415 #ifdef DEBUG
416   uint32_t mThrottleCount;
417   uint32_t mThrottleRecord;
418 #endif
419 
420   // mMutex guards the below variables.
421   Mutex mMutex;
422   nsTArray<RefPtr<VideoConverterListener>> mListeners;
423 };
424 
425 // An async inserter for audio data, to avoid running audio codec encoders
426 // on the MSG/input audio thread.  Basically just bounces all the audio
427 // data to a single audio processing/input queue.  We could if we wanted to
428 // use multiple threads and a TaskQueue.
429 class AudioProxyThread {
430  public:
431   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
432 
AudioProxyThread(AudioSessionConduit * aConduit)433   explicit AudioProxyThread(AudioSessionConduit* aConduit)
434       : mConduit(aConduit),
435         mTaskQueue(new AutoTaskQueue(
436             GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER), "AudioProxy")),
437         mAudioConverter(nullptr) {
438     MOZ_ASSERT(mConduit);
439     MOZ_COUNT_CTOR(AudioProxyThread);
440   }
441 
442   // This function is the identity if aInputRate is supported.
443   // Else, it returns a rate that is supported, that ensure no loss in audio
444   // quality: the sampling rate returned is always greater to the inputed
445   // sampling-rate, if they differ..
AppropriateSendingRateForInputRate(uint32_t aInputRate)446   uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
447     AudioSessionConduit* conduit =
448         static_cast<AudioSessionConduit*>(mConduit.get());
449     if (conduit->IsSamplingFreqSupported(aInputRate)) {
450       return aInputRate;
451     }
452     if (aInputRate < 16000) {
453       return 16000;
454     } else if (aInputRate < 32000) {
455       return 32000;
456     } else if (aInputRate < 44100) {
457       return 44100;
458     } else {
459       return 48000;
460     }
461   }
462 
463   // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
464   // something the conduit can work with (or send silence if the track is not
465   // enabled), and send the audio in 10ms chunks to the conduit.
InternalProcessAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)466   void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
467                                  bool aEnabled) {
468     MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
469 
470     // Convert to interleaved 16-bits integer audio, with a maximum of two
471     // channels (since the WebRTC.org code below makes the assumption that the
472     // input audio is either mono or stereo), with a sample-rate rate that is
473     // 16, 32, 44.1, or 48kHz.
474     uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
475     int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
476 
477     // We take advantage of the fact that the common case (microphone directly
478     // to PeerConnection, that is, a normal call), the samples are already
479     // 16-bits mono, so the representation in interleaved and planar is the
480     // same, and we can just use that.
481     if (aEnabled && outputChannels == 1 &&
482         aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
483       const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
484       PacketizeAndSend(samples, transmissionRate, outputChannels,
485                        aChunk.mDuration);
486       return;
487     }
488 
489     uint32_t sampleCount = aChunk.mDuration * outputChannels;
490     if (mInterleavedAudio.Length() < sampleCount) {
491       mInterleavedAudio.SetLength(sampleCount);
492     }
493 
494     if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
495       PodZero(mInterleavedAudio.Elements(), sampleCount);
496     } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
497       DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
498                            aChunk.mVolume, outputChannels,
499                            mInterleavedAudio.Elements());
500     } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
501       DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
502                            aChunk.mVolume, outputChannels,
503                            mInterleavedAudio.Elements());
504     }
505     int16_t* inputAudio = mInterleavedAudio.Elements();
506     size_t inputAudioFrameCount = aChunk.mDuration;
507 
508     AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
509                             AudioConfig::FORMAT_S16);
510     AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
511                              transmissionRate, AudioConfig::FORMAT_S16);
512     // Resample to an acceptable sample-rate for the sending side
513     if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
514         mAudioConverter->OutputConfig() != outputConfig) {
515       mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
516     }
517 
518     int16_t* processedAudio = nullptr;
519     size_t framesProcessed =
520         mAudioConverter->Process(inputAudio, inputAudioFrameCount);
521 
522     if (framesProcessed == 0) {
523       // In place conversion not possible, use a buffer.
524       framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
525                                                  inputAudioFrameCount);
526       processedAudio = mOutputAudio.Data();
527     } else {
528       processedAudio = inputAudio;
529     }
530 
531     PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
532                      framesProcessed);
533   }
534 
535   // This packetizes aAudioData in 10ms chunks and sends it.
536   // aAudioData is interleaved audio data at a rate and with a channel count
537   // that is appropriate to send with the conduit.
PacketizeAndSend(const int16_t * aAudioData,uint32_t aRate,uint32_t aChannels,uint32_t aFrameCount)538   void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
539                         uint32_t aChannels, uint32_t aFrameCount) {
540     MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
541     MOZ_ASSERT(aChannels == 1 || aChannels == 2);
542     MOZ_ASSERT(aAudioData);
543 
544     uint32_t audio_10ms = aRate / 100;
545 
546     if (!mPacketizer || mPacketizer->PacketSize() != audio_10ms ||
547         mPacketizer->Channels() != aChannels) {
548       // It's the right thing to drop the bit of audio still in the packetizer:
549       // we don't want to send to the conduit audio that has two different
550       // rates while telling it that it has a constante rate.
551       mPacketizer =
552           MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
553       mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
554     }
555 
556     mPacketizer->Input(aAudioData, aFrameCount);
557 
558     while (mPacketizer->PacketsAvailable()) {
559       mPacketizer->Output(mPacket.get());
560       mConduit->SendAudioFrame(mPacket.get(), mPacketizer->PacketSize(), aRate,
561                                mPacketizer->Channels(), 0);
562     }
563   }
564 
QueueAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)565   void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
566                        bool aEnabled) {
567     RefPtr<AudioProxyThread> self = this;
568     nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
569         "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
570           self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
571         }));
572     MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
573     Unused << rv;
574   }
575 
576  protected:
~AudioProxyThread()577   virtual ~AudioProxyThread() {
578     // Conduits must be released on MainThread, and we might have the last
579     // reference We don't need to worry about runnables still trying to access
580     // the conduit, since the runnables hold a ref to AudioProxyThread.
581     NS_ReleaseOnMainThreadSystemGroup("AudioProxyThread::mConduit",
582                                       mConduit.forget());
583     MOZ_COUNT_DTOR(AudioProxyThread);
584   }
585 
586   RefPtr<AudioSessionConduit> mConduit;
587   const RefPtr<AutoTaskQueue> mTaskQueue;
588   // Only accessed on mTaskQueue
589   UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
590   // A buffer to hold a single packet of audio.
591   UniquePtr<int16_t[]> mPacket;
592   nsTArray<int16_t> mInterleavedAudio;
593   AlignedShortBuffer mOutputAudio;
594   UniquePtr<AudioConverter> mAudioConverter;
595 };
596 
597 static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
598 
MediaPipeline(const std::string & aPc,DirectionType aDirection,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)599 MediaPipeline::MediaPipeline(const std::string& aPc, DirectionType aDirection,
600                              nsCOMPtr<nsIEventTarget> aMainThread,
601                              nsCOMPtr<nsIEventTarget> aStsThread,
602                              RefPtr<MediaSessionConduit> aConduit)
603     : mDirection(aDirection),
604       mLevel(0),
605       mConduit(aConduit),
606       mRtp(nullptr, RTP),
607       mRtcp(nullptr, RTCP),
608       mMainThread(aMainThread),
609       mStsThread(aStsThread),
610       mTransport(
611           new PipelineTransport(this))  // PipelineTransport() will access
612                                         // this->mStsThread; moved here
613                                         // for safety
614       ,
615       mRtpPacketsSent(0),
616       mRtcpPacketsSent(0),
617       mRtpPacketsReceived(0),
618       mRtcpPacketsReceived(0),
619       mRtpBytesSent(0),
620       mRtpBytesReceived(0),
621       mPc(aPc),
622       mRtpParser(webrtc::RtpHeaderParser::Create()),
623       mPacketDumper(new PacketDumper(mPc)) {
624   if (mDirection == DirectionType::RECEIVE) {
625     mConduit->SetReceiverTransport(mTransport);
626   } else {
627     mConduit->SetTransmitterTransport(mTransport);
628   }
629 }
630 
~MediaPipeline()631 MediaPipeline::~MediaPipeline() {
632   CSFLogInfo(LOGTAG, "Destroying MediaPipeline: %s", mDescription.c_str());
633   NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
634                                     mConduit.forget());
635 }
636 
Shutdown_m()637 void MediaPipeline::Shutdown_m() {
638   Stop();
639   DetachMedia();
640 
641   RUN_ON_THREAD(mStsThread,
642                 WrapRunnable(RefPtr<MediaPipeline>(this),
643                              &MediaPipeline::DetachTransport_s),
644                 NS_DISPATCH_NORMAL);
645 }
646 
DetachTransport_s()647 void MediaPipeline::DetachTransport_s() {
648   ASSERT_ON_THREAD(mStsThread);
649 
650   CSFLogInfo(LOGTAG, "%s in %s", mDescription.c_str(), __FUNCTION__);
651 
652   disconnect_all();
653   mTransport->Detach();
654   mRtp.Detach();
655   mRtcp.Detach();
656 
657   // Make sure any cycles are broken
658   mPacketDumper = nullptr;
659 }
660 
AttachTransport_s()661 nsresult MediaPipeline::AttachTransport_s() {
662   ASSERT_ON_THREAD(mStsThread);
663   nsresult res;
664   MOZ_ASSERT(mRtp.mTransport);
665   MOZ_ASSERT(mRtcp.mTransport);
666   res = ConnectTransport_s(mRtp);
667   if (NS_FAILED(res)) {
668     return res;
669   }
670 
671   if (mRtcp.mTransport != mRtp.mTransport) {
672     res = ConnectTransport_s(mRtcp);
673     if (NS_FAILED(res)) {
674       return res;
675     }
676   }
677 
678   mTransport->Attach(this);
679 
680   return NS_OK;
681 }
682 
UpdateTransport_m(RefPtr<TransportFlow> aRtpTransport,RefPtr<TransportFlow> aRtcpTransport,nsAutoPtr<MediaPipelineFilter> aFilter)683 void MediaPipeline::UpdateTransport_m(RefPtr<TransportFlow> aRtpTransport,
684                                       RefPtr<TransportFlow> aRtcpTransport,
685                                       nsAutoPtr<MediaPipelineFilter> aFilter) {
686   RUN_ON_THREAD(mStsThread,
687                 WrapRunnable(RefPtr<MediaPipeline>(this),
688                              &MediaPipeline::UpdateTransport_s, aRtpTransport,
689                              aRtcpTransport, aFilter),
690                 NS_DISPATCH_NORMAL);
691 }
692 
UpdateTransport_s(RefPtr<TransportFlow> aRtpTransport,RefPtr<TransportFlow> aRtcpTransport,nsAutoPtr<MediaPipelineFilter> aFilter)693 void MediaPipeline::UpdateTransport_s(RefPtr<TransportFlow> aRtpTransport,
694                                       RefPtr<TransportFlow> aRtcpTransport,
695                                       nsAutoPtr<MediaPipelineFilter> aFilter) {
696   bool rtcp_mux = false;
697   if (!aRtcpTransport) {
698     aRtcpTransport = aRtpTransport;
699     rtcp_mux = true;
700   }
701 
702   if ((aRtpTransport != mRtp.mTransport) ||
703       (aRtcpTransport != mRtcp.mTransport)) {
704     disconnect_all();
705     mTransport->Detach();
706     mRtp.Detach();
707     mRtcp.Detach();
708     if (aRtpTransport && aRtcpTransport) {
709       mRtp = TransportInfo(aRtpTransport, rtcp_mux ? MUX : RTP);
710       mRtcp = TransportInfo(aRtcpTransport, rtcp_mux ? MUX : RTCP);
711       AttachTransport_s();
712     }
713   }
714 
715   if (mFilter && aFilter) {
716     // Use the new filter, but don't forget any remote SSRCs that we've learned
717     // by receiving traffic.
718     mFilter->Update(*aFilter);
719   } else {
720     mFilter = aFilter;
721   }
722 }
723 
AddRIDExtension_m(size_t aExtensionId)724 void MediaPipeline::AddRIDExtension_m(size_t aExtensionId) {
725   RUN_ON_THREAD(mStsThread,
726                 WrapRunnable(RefPtr<MediaPipeline>(this),
727                              &MediaPipeline::AddRIDExtension_s, aExtensionId),
728                 NS_DISPATCH_NORMAL);
729 }
730 
AddRIDExtension_s(size_t aExtensionId)731 void MediaPipeline::AddRIDExtension_s(size_t aExtensionId) {
732   mRtpParser->RegisterRtpHeaderExtension(webrtc::kRtpExtensionRtpStreamId,
733                                          aExtensionId);
734 }
735 
AddRIDFilter_m(const std::string & aRid)736 void MediaPipeline::AddRIDFilter_m(const std::string& aRid) {
737   RUN_ON_THREAD(mStsThread,
738                 WrapRunnable(RefPtr<MediaPipeline>(this),
739                              &MediaPipeline::AddRIDFilter_s, aRid),
740                 NS_DISPATCH_NORMAL);
741 }
742 
AddRIDFilter_s(const std::string & aRid)743 void MediaPipeline::AddRIDFilter_s(const std::string& aRid) {
744   mFilter = new MediaPipelineFilter;
745   mFilter->AddRemoteRtpStreamId(aRid);
746 }
747 
GetContributingSourceStats(const nsString & aInboundRtpStreamId,FallibleTArray<dom::RTCRTPContributingSourceStats> & aArr) const748 void MediaPipeline::GetContributingSourceStats(
749     const nsString& aInboundRtpStreamId,
750     FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
751   // Get the expiry from now
752   DOMHighResTimeStamp expiry = RtpCSRCStats::GetExpiryFromTime(GetNow());
753   for (auto info : mCsrcStats) {
754     if (!info.second.Expired(expiry)) {
755       RTCRTPContributingSourceStats stats;
756       info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
757       aArr.AppendElement(stats, fallible);
758     }
759   }
760 }
761 
StateChange(TransportFlow * aFlow,TransportLayer::State aState)762 void MediaPipeline::StateChange(TransportFlow* aFlow,
763                                 TransportLayer::State aState) {
764   TransportInfo* info = GetTransportInfo_s(aFlow);
765   MOZ_ASSERT(info);
766 
767   if (aState == TransportLayer::TS_OPEN) {
768     CSFLogInfo(LOGTAG, "Flow is ready");
769     TransportReady_s(*info);
770   } else if (aState == TransportLayer::TS_CLOSED ||
771              aState == TransportLayer::TS_ERROR) {
772     TransportFailed_s(*info);
773   }
774 }
775 
MakeRtpTypeToStringArray(const char ** aArray)776 static bool MakeRtpTypeToStringArray(const char** aArray) {
777   static const char* RTP_str = "RTP";
778   static const char* RTCP_str = "RTCP";
779   static const char* MUX_str = "RTP/RTCP mux";
780   aArray[MediaPipeline::RTP] = RTP_str;
781   aArray[MediaPipeline::RTCP] = RTCP_str;
782   aArray[MediaPipeline::MUX] = MUX_str;
783   return true;
784 }
785 
ToString(MediaPipeline::RtpType type)786 static const char* ToString(MediaPipeline::RtpType type) {
787   static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr};
788   // Dummy variable to cause init to happen only on first call
789   static bool dummy = MakeRtpTypeToStringArray(array);
790   (void)dummy;
791   return array[type];
792 }
793 
TransportReady_s(TransportInfo & aInfo)794 nsresult MediaPipeline::TransportReady_s(TransportInfo& aInfo) {
795   // TODO(ekr@rtfm.com): implement some kind of notification on
796   // failure. bug 852665.
797   if (aInfo.mState != StateType::MP_CONNECTING) {
798     CSFLogError(LOGTAG, "Transport ready for flow in wrong state:%s :%s",
799                 mDescription.c_str(), ToString(aInfo.mType));
800     return NS_ERROR_FAILURE;
801   }
802 
803   CSFLogInfo(LOGTAG, "Transport ready for pipeline %p flow %s: %s", this,
804              mDescription.c_str(), ToString(aInfo.mType));
805 
806   // TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure?
807   nsresult res;
808 
809   // Now instantiate the SRTP objects
810   TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(
811       aInfo.mTransport->GetLayer(TransportLayerDtls::ID()));
812   MOZ_ASSERT(dtls);  // DTLS is mandatory
813 
814   uint16_t cipher_suite;
815   res = dtls->GetSrtpCipher(&cipher_suite);
816   if (NS_FAILED(res)) {
817     CSFLogError(LOGTAG, "Failed to negotiate DTLS-SRTP. This is an error");
818     aInfo.mState = StateType::MP_CLOSED;
819     UpdateRtcpMuxState(aInfo);
820     return res;
821   }
822 
823   // SRTP Key Exporter as per RFC 5764 S 4.2
824   unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
825   res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "", srtp_block,
826                                    sizeof(srtp_block));
827   if (NS_FAILED(res)) {
828     CSFLogError(LOGTAG, "Failed to compute DTLS-SRTP keys. This is an error");
829     aInfo.mState = StateType::MP_CLOSED;
830     UpdateRtcpMuxState(aInfo);
831     MOZ_CRASH();  // TODO: Remove once we have enough field experience to
832                   // know it doesn't happen. bug 798797. Note that the
833                   // code after this never executes.
834     return res;
835   }
836 
837   // Slice and dice as per RFC 5764 S 4.2
838   unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
839   unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH];
840   int offset = 0;
841   memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
842   offset += SRTP_MASTER_KEY_LENGTH;
843   memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
844   offset += SRTP_MASTER_KEY_LENGTH;
845   memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset,
846          SRTP_MASTER_SALT_LENGTH);
847   offset += SRTP_MASTER_SALT_LENGTH;
848   memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset,
849          SRTP_MASTER_SALT_LENGTH);
850   offset += SRTP_MASTER_SALT_LENGTH;
851   MOZ_ASSERT(offset == sizeof(srtp_block));
852 
853   unsigned char* write_key;
854   unsigned char* read_key;
855 
856   if (dtls->role() == TransportLayerDtls::CLIENT) {
857     write_key = client_write_key;
858     read_key = server_write_key;
859   } else {
860     write_key = server_write_key;
861     read_key = client_write_key;
862   }
863 
864   MOZ_ASSERT(!aInfo.mSendSrtp && !aInfo.mRecvSrtp);
865   aInfo.mSendSrtp =
866       SrtpFlow::Create(cipher_suite, false, write_key, SRTP_TOTAL_KEY_LENGTH);
867   aInfo.mRecvSrtp =
868       SrtpFlow::Create(cipher_suite, true, read_key, SRTP_TOTAL_KEY_LENGTH);
869   if (!aInfo.mSendSrtp || !aInfo.mRecvSrtp) {
870     CSFLogError(LOGTAG, "Couldn't create SRTP flow for %s",
871                 ToString(aInfo.mType));
872     aInfo.mState = StateType::MP_CLOSED;
873     UpdateRtcpMuxState(aInfo);
874     return NS_ERROR_FAILURE;
875   }
876 
877   if (mDirection == DirectionType::RECEIVE) {
878     CSFLogInfo(LOGTAG, "Listening for %s packets received on %p",
879                ToString(aInfo.mType), dtls->downward());
880 
881     switch (aInfo.mType) {
882       case RTP:
883         dtls->downward()->SignalPacketReceived.connect(
884             this, &MediaPipeline::RtpPacketReceived);
885         break;
886       case RTCP:
887         dtls->downward()->SignalPacketReceived.connect(
888             this, &MediaPipeline::RtcpPacketReceived);
889         break;
890       case MUX:
891         dtls->downward()->SignalPacketReceived.connect(
892             this, &MediaPipeline::PacketReceived);
893         break;
894       default:
895         MOZ_CRASH();
896     }
897   }
898 
899   aInfo.mState = StateType::MP_OPEN;
900   UpdateRtcpMuxState(aInfo);
901   return NS_OK;
902 }
903 
TransportFailed_s(TransportInfo & aInfo)904 nsresult MediaPipeline::TransportFailed_s(TransportInfo& aInfo) {
905   ASSERT_ON_THREAD(mStsThread);
906 
907   aInfo.mState = StateType::MP_CLOSED;
908   UpdateRtcpMuxState(aInfo);
909 
910   CSFLogInfo(LOGTAG, "Transport closed for flow %s", ToString(aInfo.mType));
911 
912   NS_WARNING(
913       "MediaPipeline Transport failed. This is not properly cleaned up yet");
914 
915   // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
916   // connection was good and now it is bad.
917   // TODO(ekr@rtfm.com): Report up so that the PC knows we
918   // have experienced an error.
919 
920   return NS_OK;
921 }
922 
UpdateRtcpMuxState(TransportInfo & aInfo)923 void MediaPipeline::UpdateRtcpMuxState(TransportInfo& aInfo) {
924   if (aInfo.mType == MUX) {
925     if (aInfo.mTransport == mRtcp.mTransport) {
926       mRtcp.mState = aInfo.mState;
927       if (!mRtcp.mSendSrtp) {
928         mRtcp.mSendSrtp = aInfo.mSendSrtp;
929         mRtcp.mRecvSrtp = aInfo.mRecvSrtp;
930       }
931     }
932   }
933 }
934 
SendPacket(const TransportFlow * aFlow,const void * aData,int aLen)935 nsresult MediaPipeline::SendPacket(const TransportFlow* aFlow,
936                                    const void* aData, int aLen) {
937   ASSERT_ON_THREAD(mStsThread);
938 
939   // Note that we bypass the DTLS layer here
940   TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(
941       aFlow->GetLayer(TransportLayerDtls::ID()));
942   MOZ_ASSERT(dtls);
943 
944   TransportResult res = dtls->downward()->SendPacket(
945       static_cast<const unsigned char*>(aData), aLen);
946 
947   if (res != aLen) {
948     // Ignore blocking indications
949     if (res == TE_WOULDBLOCK) return NS_OK;
950 
951     CSFLogError(LOGTAG, "Failed write on stream %s", mDescription.c_str());
952     return NS_BASE_STREAM_CLOSED;
953   }
954 
955   return NS_OK;
956 }
957 
IncrementRtpPacketsSent(int32_t aBytes)958 void MediaPipeline::IncrementRtpPacketsSent(int32_t aBytes) {
959   ++mRtpPacketsSent;
960   mRtpBytesSent += aBytes;
961 
962   if (!(mRtpPacketsSent % 100)) {
963     CSFLogInfo(LOGTAG,
964                "RTP sent packet count for %s Pipeline %p Flow: %p: %u (%" PRId64
965                " bytes)",
966                mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
967                mRtpPacketsSent, mRtpBytesSent);
968   }
969 }
970 
IncrementRtcpPacketsSent()971 void MediaPipeline::IncrementRtcpPacketsSent() {
972   ++mRtcpPacketsSent;
973   if (!(mRtcpPacketsSent % 100)) {
974     CSFLogInfo(LOGTAG, "RTCP sent packet count for %s Pipeline %p Flow: %p: %u",
975                mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
976                mRtcpPacketsSent);
977   }
978 }
979 
IncrementRtpPacketsReceived(int32_t aBytes)980 void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
981   ++mRtpPacketsReceived;
982   mRtpBytesReceived += aBytes;
983   if (!(mRtpPacketsReceived % 100)) {
984     CSFLogInfo(
985         LOGTAG,
986         "RTP received packet count for %s Pipeline %p Flow: %p: %u (%" PRId64
987         " bytes)",
988         mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
989         mRtpPacketsReceived, mRtpBytesReceived);
990   }
991 }
992 
IncrementRtcpPacketsReceived()993 void MediaPipeline::IncrementRtcpPacketsReceived() {
994   ++mRtcpPacketsReceived;
995   if (!(mRtcpPacketsReceived % 100)) {
996     CSFLogInfo(LOGTAG,
997                "RTCP received packet count for %s Pipeline %p Flow: %p: %u",
998                mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
999                mRtcpPacketsReceived);
1000   }
1001 }
1002 
RtpPacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1003 void MediaPipeline::RtpPacketReceived(TransportLayer* aLayer,
1004                                       const unsigned char* aData, size_t aLen) {
1005   if (mDirection == DirectionType::TRANSMIT) {
1006     return;
1007   }
1008 
1009   if (!mTransport->Pipeline()) {
1010     CSFLogError(LOGTAG, "Discarding incoming packet; transport disconnected");
1011     return;
1012   }
1013 
1014   if (!mConduit) {
1015     CSFLogDebug(LOGTAG, "Discarding incoming packet; media disconnected");
1016     return;
1017   }
1018 
1019   if (mRtp.mState != StateType::MP_OPEN) {
1020     CSFLogError(LOGTAG, "Discarding incoming packet; pipeline not open");
1021     return;
1022   }
1023 
1024   if (mRtp.mTransport->state() != TransportLayer::TS_OPEN) {
1025     CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
1026     return;
1027   }
1028 
1029   // This should never happen.
1030   MOZ_ASSERT(mRtp.mRecvSrtp);
1031 
1032   if (!aLen) {
1033     return;
1034   }
1035 
1036   // Filter out everything but RTP/RTCP
1037   if (aData[0] < 128 || aData[0] > 191) {
1038     return;
1039   }
1040 
1041   webrtc::RTPHeader header;
1042   if (!mRtpParser->Parse(aData, aLen, &header, true)) {
1043     return;
1044   }
1045 
1046   if (mFilter && !mFilter->Filter(header)) {
1047     return;
1048   }
1049 
1050   // Make sure to only get the time once, and only if we need it by
1051   // using getTimestamp() for access
1052   DOMHighResTimeStamp now = 0.0;
1053   bool hasTime = false;
1054 
1055   // Remove expired RtpCSRCStats
1056   if (!mCsrcStats.empty()) {
1057     if (!hasTime) {
1058       now = GetNow();
1059       hasTime = true;
1060     }
1061     auto expiry = RtpCSRCStats::GetExpiryFromTime(now);
1062     for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
1063       if (p->second.Expired(expiry)) {
1064         p = mCsrcStats.erase(p);
1065         continue;
1066       }
1067       p++;
1068     }
1069   }
1070 
1071   // Add new RtpCSRCStats
1072   if (header.numCSRCs) {
1073     for (auto i = 0; i < header.numCSRCs; i++) {
1074       if (!hasTime) {
1075         now = GetNow();
1076         hasTime = true;
1077       }
1078       auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
1079       if (csrcInfo == mCsrcStats.end()) {
1080         mCsrcStats.insert(std::make_pair(
1081             header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now)));
1082       } else {
1083         csrcInfo->second.SetTimestamp(now);
1084       }
1085     }
1086   }
1087 
1088   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false, aData, aLen);
1089 
1090   // Make a copy rather than cast away constness
1091   auto innerData = MakeUnique<unsigned char[]>(aLen);
1092   memcpy(innerData.get(), aData, aLen);
1093   int outLen = 0;
1094   nsresult res =
1095       mRtp.mRecvSrtp->UnprotectRtp(innerData.get(), aLen, aLen, &outLen);
1096   if (!NS_SUCCEEDED(res)) {
1097     char tmp[16];
1098 
1099     SprintfLiteral(tmp, "%.2x %.2x %.2x %.2x", innerData[0], innerData[1],
1100                    innerData[2], innerData[3]);
1101 
1102     CSFLogError(LOGTAG, "Error unprotecting RTP in %s len= %zu [%s]",
1103                 mDescription.c_str(), aLen, tmp);
1104     return;
1105   }
1106   CSFLogDebug(LOGTAG, "%s received RTP packet.", mDescription.c_str());
1107   IncrementRtpPacketsReceived(outLen);
1108   OnRtpPacketReceived();
1109 
1110   RtpLogger::LogPacket(innerData.get(), outLen, true, true, header.headerLength,
1111                        mDescription);
1112 
1113   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false,
1114                       innerData.get(), outLen);
1115 
1116   (void)mConduit->ReceivedRTPPacket(innerData.get(), outLen,
1117                                     header.ssrc);  // Ignore error codes
1118 }
1119 
RtcpPacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1120 void MediaPipeline::RtcpPacketReceived(TransportLayer* aLayer,
1121                                        const unsigned char* aData,
1122                                        size_t aLen) {
1123   if (!mTransport->Pipeline()) {
1124     CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
1125     return;
1126   }
1127 
1128   if (!mConduit) {
1129     CSFLogDebug(LOGTAG, "Discarding incoming packet; media disconnected");
1130     return;
1131   }
1132 
1133   if (mRtcp.mState != StateType::MP_OPEN) {
1134     CSFLogDebug(LOGTAG, "Discarding incoming packet; pipeline not open");
1135     return;
1136   }
1137 
1138   if (mRtcp.mTransport->state() != TransportLayer::TS_OPEN) {
1139     CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
1140     return;
1141   }
1142 
1143   if (!aLen) {
1144     return;
1145   }
1146 
1147   // Filter out everything but RTP/RTCP
1148   if (aData[0] < 128 || aData[0] > 191) {
1149     return;
1150   }
1151 
1152   // We do not filter receiver reports, since the webrtc.org code for
1153   // senders already has logic to ignore RRs that do not apply.
1154   // TODO bug 1279153: remove SR check for reduced size RTCP
1155   if (mFilter && !mFilter->FilterSenderReport(aData, aLen)) {
1156     CSFLogWarn(LOGTAG, "Dropping incoming RTCP packet; filtered out");
1157     return;
1158   }
1159 
1160   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false, aData,
1161                       aLen);
1162 
1163   // Make a copy rather than cast away constness
1164   auto innerData = MakeUnique<unsigned char[]>(aLen);
1165   memcpy(innerData.get(), aData, aLen);
1166   int outLen;
1167 
1168   nsresult res =
1169       mRtcp.mRecvSrtp->UnprotectRtcp(innerData.get(), aLen, aLen, &outLen);
1170 
1171   if (!NS_SUCCEEDED(res)) return;
1172 
1173   CSFLogDebug(LOGTAG, "%s received RTCP packet.", mDescription.c_str());
1174   IncrementRtcpPacketsReceived();
1175 
1176   RtpLogger::LogPacket(innerData.get(), outLen, true, false, 0, mDescription);
1177 
1178   mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false, aData, aLen);
1179 
1180   MOZ_ASSERT(mRtcp.mRecvSrtp);  // This should never happen
1181 
1182   (void)mConduit->ReceivedRTCPPacket(innerData.get(),
1183                                      outLen);  // Ignore error codes
1184 }
1185 
IsRtp(const unsigned char * aData,size_t aLen) const1186 bool MediaPipeline::IsRtp(const unsigned char* aData, size_t aLen) const {
1187   if (aLen < 2) return false;
1188 
1189   // Check if this is a RTCP packet. Logic based on the types listed in
1190   // media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc
1191 
1192   // Anything outside this range is RTP.
1193   if ((aData[1] < 192) || (aData[1] > 207)) return true;
1194 
1195   if (aData[1] == 192)  // FIR
1196     return false;
1197 
1198   if (aData[1] == 193)  // NACK, but could also be RTP. This makes us sad
1199     return true;        // but it's how webrtc.org behaves.
1200 
1201   if (aData[1] == 194) return true;
1202 
1203   if (aData[1] == 195)  // IJ.
1204     return false;
1205 
1206   if ((aData[1] > 195) && (aData[1] < 200))  // the > 195 is redundant
1207     return true;
1208 
1209   if ((aData[1] >= 200) && (aData[1] <= 207))  // SR, RR, SDES, BYE,
1210     return false;                              // APP, RTPFB, PSFB, XR
1211 
1212   MOZ_ASSERT(false);  // Not reached, belt and suspenders.
1213   return true;
1214 }
1215 
PacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1216 void MediaPipeline::PacketReceived(TransportLayer* aLayer,
1217                                    const unsigned char* aData, size_t aLen) {
1218   if (!mTransport->Pipeline()) {
1219     CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
1220     return;
1221   }
1222 
1223   if (IsRtp(aData, aLen)) {
1224     RtpPacketReceived(aLayer, aData, aLen);
1225   } else {
1226     RtcpPacketReceived(aLayer, aData, aLen);
1227   }
1228 }
1229 
1230 class MediaPipelineTransmit::PipelineListener : public MediaStreamVideoSink {
1231   friend class MediaPipelineTransmit;
1232 
1233  public:
PipelineListener(const RefPtr<MediaSessionConduit> & aConduit)1234   explicit PipelineListener(const RefPtr<MediaSessionConduit>& aConduit)
1235       : mConduit(aConduit),
1236         mActive(false),
1237         mEnabled(false),
1238         mDirectConnect(false) {}
1239 
~PipelineListener()1240   ~PipelineListener() {
1241     NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
1242                                       mConduit.forget());
1243     if (mConverter) {
1244       mConverter->Shutdown();
1245     }
1246   }
1247 
SetActive(bool aActive)1248   void SetActive(bool aActive) { mActive = aActive; }
SetEnabled(bool aEnabled)1249   void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
1250 
1251   // These are needed since nested classes don't have access to any particular
1252   // instance of the parent
SetAudioProxy(const RefPtr<AudioProxyThread> & aProxy)1253   void SetAudioProxy(const RefPtr<AudioProxyThread>& aProxy) {
1254     mAudioProcessing = aProxy;
1255   }
1256 
SetVideoFrameConverter(const RefPtr<VideoFrameConverter> & aConverter)1257   void SetVideoFrameConverter(const RefPtr<VideoFrameConverter>& aConverter) {
1258     mConverter = aConverter;
1259   }
1260 
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)1261   void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
1262     MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
1263     static_cast<VideoSessionConduit*>(mConduit.get())
1264         ->SendVideoFrame(aVideoFrame);
1265   }
1266 
1267   // Implement MediaStreamTrackListener
1268   void NotifyQueuedChanges(MediaStreamGraph* aGraph, StreamTime aTrackOffset,
1269                            const MediaSegment& aQueuedMedia) override;
1270 
1271   // Implement DirectMediaStreamTrackListener
1272   void NotifyRealtimeTrackData(MediaStreamGraph* aGraph,
1273                                StreamTime aTrackOffset,
1274                                const MediaSegment& aMedia) override;
1275   void NotifyDirectListenerInstalled(InstallationResult aResult) override;
1276   void NotifyDirectListenerUninstalled() override;
1277 
1278   // Implement MediaStreamVideoSink
1279   void SetCurrentFrames(const VideoSegment& aSegment) override;
ClearFrames()1280   void ClearFrames() override {}
1281 
1282  private:
1283   void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
1284 
1285   RefPtr<MediaSessionConduit> mConduit;
1286   RefPtr<AudioProxyThread> mAudioProcessing;
1287   RefPtr<VideoFrameConverter> mConverter;
1288 
1289   // active is true if there is a transport to send on
1290   mozilla::Atomic<bool> mActive;
1291   // enabled is true if the media access control permits sending
1292   // actual content; when false you get black/silence
1293   mozilla::Atomic<bool> mEnabled;
1294 
1295   // Written and read on the MediaStreamGraph thread
1296   bool mDirectConnect;
1297 };
1298 
1299 // Implements VideoConverterListener for MediaPipeline.
1300 //
1301 // We pass converted frames on to MediaPipelineTransmit::PipelineListener
1302 // where they are further forwarded to VideoConduit.
1303 // MediaPipelineTransmit calls Detach() during shutdown to ensure there is
1304 // no cyclic dependencies between us and PipelineListener.
1305 class MediaPipelineTransmit::VideoFrameFeeder : public VideoConverterListener {
1306  public:
VideoFrameFeeder(const RefPtr<PipelineListener> & aListener)1307   explicit VideoFrameFeeder(const RefPtr<PipelineListener>& aListener)
1308       : mMutex("VideoFrameFeeder"), mListener(aListener) {
1309     MOZ_COUNT_CTOR(VideoFrameFeeder);
1310   }
1311 
Detach()1312   void Detach() {
1313     MutexAutoLock lock(mMutex);
1314 
1315     mListener = nullptr;
1316   }
1317 
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)1318   void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) override {
1319     MutexAutoLock lock(mMutex);
1320 
1321     if (!mListener) {
1322       return;
1323     }
1324 
1325     mListener->OnVideoFrameConverted(aVideoFrame);
1326   }
1327 
1328  protected:
~VideoFrameFeeder()1329   virtual ~VideoFrameFeeder() { MOZ_COUNT_DTOR(VideoFrameFeeder); }
1330 
1331   Mutex mMutex;  // Protects the member below.
1332   RefPtr<PipelineListener> mListener;
1333 };
1334 
MediaPipelineTransmit(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,bool aIsVideo,dom::MediaStreamTrack * aDomTrack,RefPtr<MediaSessionConduit> aConduit)1335 MediaPipelineTransmit::MediaPipelineTransmit(
1336     const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
1337     nsCOMPtr<nsIEventTarget> aStsThread, bool aIsVideo,
1338     dom::MediaStreamTrack* aDomTrack, RefPtr<MediaSessionConduit> aConduit)
1339     : MediaPipeline(aPc, DirectionType::TRANSMIT, aMainThread, aStsThread,
1340                     aConduit),
1341       mIsVideo(aIsVideo),
1342       mListener(new PipelineListener(aConduit)),
1343       mFeeder(aIsVideo ? MakeAndAddRef<VideoFrameFeeder>(mListener)
1344                        : nullptr)  // For video we send frames to an
1345                                    // async VideoFrameConverter that
1346                                    // calls back to a VideoFrameFeeder
1347                                    // that feeds I420 frames to
1348                                    // VideoConduit.
1349       ,
1350       mDomTrack(aDomTrack),
1351       mTransmitting(false) {
1352   SetDescription();
1353   if (!IsVideo()) {
1354     mAudioProcessing = MakeAndAddRef<AudioProxyThread>(
1355         static_cast<AudioSessionConduit*>(aConduit.get()));
1356     mListener->SetAudioProxy(mAudioProcessing);
1357   } else {  // Video
1358     mConverter = MakeAndAddRef<VideoFrameConverter>();
1359     mConverter->AddListener(mFeeder);
1360     mListener->SetVideoFrameConverter(mConverter);
1361   }
1362 }
1363 
~MediaPipelineTransmit()1364 MediaPipelineTransmit::~MediaPipelineTransmit() {
1365   if (mFeeder) {
1366     mFeeder->Detach();
1367   }
1368 
1369   MOZ_ASSERT(!mDomTrack);
1370 }
1371 
SetDescription_s(const std::string & description)1372 void MediaPipeline::SetDescription_s(const std::string& description) {
1373   mDescription = description;
1374 }
1375 
SetDescription()1376 void MediaPipelineTransmit::SetDescription() {
1377   std::string description;
1378   description = mPc + "| ";
1379   description += mConduit->type() == MediaSessionConduit::AUDIO
1380                      ? "Transmit audio["
1381                      : "Transmit video[";
1382 
1383   if (!mDomTrack) {
1384     description += "no track]";
1385     return;
1386   }
1387 
1388   nsString nsTrackId;
1389   mDomTrack->GetId(nsTrackId);
1390   std::string trackId(NS_ConvertUTF16toUTF8(nsTrackId).get());
1391   description += trackId;
1392   description += "]";
1393 
1394   RUN_ON_THREAD(
1395       mStsThread,
1396       WrapRunnable(RefPtr<MediaPipeline>(this),
1397                    &MediaPipelineTransmit::SetDescription_s, description),
1398       NS_DISPATCH_NORMAL);
1399 }
1400 
Stop()1401 void MediaPipelineTransmit::Stop() {
1402   ASSERT_ON_THREAD(mMainThread);
1403 
1404   if (!mDomTrack || !mTransmitting) {
1405     return;
1406   }
1407 
1408   mTransmitting = false;
1409 
1410   if (mDomTrack->AsAudioStreamTrack()) {
1411     mDomTrack->RemoveDirectListener(mListener);
1412     mDomTrack->RemoveListener(mListener);
1413   } else if (VideoStreamTrack* video = mDomTrack->AsVideoStreamTrack()) {
1414     video->RemoveVideoOutput(mListener);
1415   } else {
1416     MOZ_ASSERT(false, "Unknown track type");
1417   }
1418 
1419   mConduit->StopTransmitting();
1420 }
1421 
Start()1422 void MediaPipelineTransmit::Start() {
1423   ASSERT_ON_THREAD(mMainThread);
1424 
1425   if (!mDomTrack || mTransmitting) {
1426     return;
1427   }
1428 
1429   mTransmitting = true;
1430 
1431   mConduit->StartTransmitting();
1432 
1433   // TODO(ekr@rtfm.com): Check for errors
1434   CSFLogDebug(
1435       LOGTAG, "Attaching pipeline to track %p conduit type=%s", this,
1436       (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video"));
1437 
1438 #if !defined(MOZILLA_EXTERNAL_LINKAGE)
1439   // With full duplex we don't risk having audio come in late to the MSG
1440   // so we won't need a direct listener.
1441   const bool enableDirectListener =
1442       !Preferences::GetBool("media.navigator.audio.full_duplex", false);
1443 #else
1444   const bool enableDirectListener = true;
1445 #endif
1446 
1447   if (mDomTrack->AsAudioStreamTrack()) {
1448     if (enableDirectListener) {
1449       // Register the Listener directly with the source if we can.
1450       // We also register it as a non-direct listener so we fall back to that
1451       // if installing the direct listener fails. As a direct listener we get
1452       // access to direct unqueued (and not resampled) data.
1453       mDomTrack->AddDirectListener(mListener);
1454     }
1455     mDomTrack->AddListener(mListener);
1456   } else if (VideoStreamTrack* video = mDomTrack->AsVideoStreamTrack()) {
1457     video->AddVideoOutput(mListener);
1458   } else {
1459     MOZ_ASSERT(false, "Unknown track type");
1460   }
1461 }
1462 
IsVideo() const1463 bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
1464 
UpdateSinkIdentity_m(const MediaStreamTrack * aTrack,nsIPrincipal * aPrincipal,const PeerIdentity * aSinkIdentity)1465 void MediaPipelineTransmit::UpdateSinkIdentity_m(
1466     const MediaStreamTrack* aTrack, nsIPrincipal* aPrincipal,
1467     const PeerIdentity* aSinkIdentity) {
1468   ASSERT_ON_THREAD(mMainThread);
1469 
1470   if (aTrack != nullptr && aTrack != mDomTrack) {
1471     // If a track is specified, then it might not be for this pipeline,
1472     // since we receive notifications for all tracks on the PC.
1473     // nullptr means that the PeerIdentity has changed and shall be applied
1474     // to all tracks of the PC.
1475     return;
1476   }
1477 
1478   bool enableTrack = aPrincipal->Subsumes(mDomTrack->GetPrincipal());
1479   if (!enableTrack) {
1480     // first try didn't work, but there's a chance that this is still available
1481     // if our track is bound to a peerIdentity, and the peer connection (our
1482     // sink) is bound to the same identity, then we can enable the track.
1483     const PeerIdentity* trackIdentity = mDomTrack->GetPeerIdentity();
1484     if (aSinkIdentity && trackIdentity) {
1485       enableTrack = (*aSinkIdentity == *trackIdentity);
1486     }
1487   }
1488 
1489   mListener->SetEnabled(enableTrack);
1490 }
1491 
DetachMedia()1492 void MediaPipelineTransmit::DetachMedia() {
1493   ASSERT_ON_THREAD(mMainThread);
1494   mDomTrack = nullptr;
1495   // Let the listener be destroyed with the pipeline (or later).
1496 }
1497 
TransportReady_s(TransportInfo & aInfo)1498 nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo& aInfo) {
1499   ASSERT_ON_THREAD(mStsThread);
1500   // Call base ready function.
1501   MediaPipeline::TransportReady_s(aInfo);
1502 
1503   // Should not be set for a transmitter
1504   if (&aInfo == &mRtp) {
1505     mListener->SetActive(true);
1506   }
1507 
1508   return NS_OK;
1509 }
1510 
ReplaceTrack(RefPtr<MediaStreamTrack> & aDomTrack)1511 nsresult MediaPipelineTransmit::ReplaceTrack(
1512     RefPtr<MediaStreamTrack>& aDomTrack) {
1513   // MainThread, checked in calls we make
1514   if (aDomTrack) {
1515     nsString nsTrackId;
1516     aDomTrack->GetId(nsTrackId);
1517     std::string track_id(NS_ConvertUTF16toUTF8(nsTrackId).get());
1518     CSFLogDebug(
1519         LOGTAG, "Reattaching pipeline to track %p track %s conduit type: %s",
1520         &aDomTrack, track_id.c_str(),
1521         (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video"));
1522   }
1523 
1524   RefPtr<dom::MediaStreamTrack> oldTrack = mDomTrack;
1525   bool wasTransmitting = oldTrack && mTransmitting;
1526   Stop();
1527   mDomTrack = aDomTrack;
1528   SetDescription();
1529 
1530   if (wasTransmitting) {
1531     Start();
1532   }
1533   return NS_OK;
1534 }
1535 
ConnectTransport_s(TransportInfo & aInfo)1536 nsresult MediaPipeline::ConnectTransport_s(TransportInfo& aInfo) {
1537   MOZ_ASSERT(aInfo.mTransport);
1538   ASSERT_ON_THREAD(mStsThread);
1539 
1540   // Look to see if the transport is ready
1541   if (aInfo.mTransport->state() == TransportLayer::TS_OPEN) {
1542     nsresult res = TransportReady_s(aInfo);
1543     if (NS_FAILED(res)) {
1544       CSFLogError(LOGTAG, "Error calling TransportReady(); res=%u in %s",
1545                   static_cast<uint32_t>(res), __FUNCTION__);
1546       return res;
1547     }
1548   } else if (aInfo.mTransport->state() == TransportLayer::TS_ERROR) {
1549     CSFLogError(LOGTAG, "%s transport is already in error state",
1550                 ToString(aInfo.mType));
1551     TransportFailed_s(aInfo);
1552     return NS_ERROR_FAILURE;
1553   }
1554 
1555   aInfo.mTransport->SignalStateChange.connect(this,
1556                                               &MediaPipeline::StateChange);
1557 
1558   return NS_OK;
1559 }
1560 
GetTransportInfo_s(TransportFlow * aFlow)1561 MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s(
1562     TransportFlow* aFlow) {
1563   ASSERT_ON_THREAD(mStsThread);
1564   if (aFlow == mRtp.mTransport) {
1565     return &mRtp;
1566   }
1567 
1568   if (aFlow == mRtcp.mTransport) {
1569     return &mRtcp;
1570   }
1571 
1572   return nullptr;
1573 }
1574 
SendRtpPacket(const uint8_t * aData,size_t aLen)1575 nsresult MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData,
1576                                                          size_t aLen) {
1577   nsAutoPtr<DataBuffer> buf(
1578       new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
1579 
1580   RUN_ON_THREAD(
1581       mStsThread,
1582       WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
1583                    &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf,
1584                    true),
1585       NS_DISPATCH_NORMAL);
1586 
1587   return NS_OK;
1588 }
1589 
SendRtpRtcpPacket_s(nsAutoPtr<DataBuffer> aData,bool aIsRtp)1590 nsresult MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s(
1591     nsAutoPtr<DataBuffer> aData, bool aIsRtp) {
1592   ASSERT_ON_THREAD(mStsThread);
1593   if (!mPipeline) {
1594     return NS_OK;  // Detached
1595   }
1596   TransportInfo& transport = aIsRtp ? mPipeline->mRtp : mPipeline->mRtcp;
1597 
1598   if (!transport.mSendSrtp) {
1599     CSFLogDebug(LOGTAG, "Couldn't write RTP/RTCP packet; SRTP not set up yet");
1600     return NS_OK;
1601   }
1602 
1603   MOZ_ASSERT(transport.mTransport);
1604   NS_ENSURE_TRUE(transport.mTransport, NS_ERROR_NULL_POINTER);
1605 
1606   // libsrtp enciphers in place, so we need a big enough buffer.
1607   MOZ_ASSERT(aData->capacity() >= aData->len() + SRTP_MAX_EXPANSION);
1608 
1609   if (RtpLogger::IsPacketLoggingOn()) {
1610     int headerLen = 12;
1611     webrtc::RTPHeader header;
1612     if (mPipeline->mRtpParser &&
1613         mPipeline->mRtpParser->Parse(aData->data(), aData->len(), &header)) {
1614       headerLen = header.headerLength;
1615     }
1616     RtpLogger::LogPacket(aData->data(), aData->len(), false, aIsRtp, headerLen,
1617                          mPipeline->mDescription);
1618   }
1619 
1620   int out_len;
1621   nsresult res;
1622   if (aIsRtp) {
1623     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1624                                    dom::mozPacketDumpType::Rtp, true,
1625                                    aData->data(), aData->len());
1626 
1627     res = transport.mSendSrtp->ProtectRtp(aData->data(), aData->len(),
1628                                           aData->capacity(), &out_len);
1629   } else {
1630     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1631                                    dom::mozPacketDumpType::Rtcp, true,
1632                                    aData->data(), aData->len());
1633 
1634     res = transport.mSendSrtp->ProtectRtcp(aData->data(), aData->len(),
1635                                            aData->capacity(), &out_len);
1636   }
1637   if (!NS_SUCCEEDED(res)) {
1638     return res;
1639   }
1640 
1641   // paranoia; don't have uninitialized bytes included in data->len()
1642   aData->SetLength(out_len);
1643 
1644   CSFLogDebug(LOGTAG, "%s sending %s packet", mPipeline->mDescription.c_str(),
1645               (aIsRtp ? "RTP" : "RTCP"));
1646   if (aIsRtp) {
1647     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1648                                    dom::mozPacketDumpType::Srtp, true,
1649                                    aData->data(), out_len);
1650 
1651     mPipeline->IncrementRtpPacketsSent(out_len);
1652   } else {
1653     mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1654                                    dom::mozPacketDumpType::Srtcp, true,
1655                                    aData->data(), out_len);
1656 
1657     mPipeline->IncrementRtcpPacketsSent();
1658   }
1659   return mPipeline->SendPacket(transport.mTransport, aData->data(), out_len);
1660 }
1661 
SendRtcpPacket(const uint8_t * aData,size_t aLen)1662 nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(const uint8_t* aData,
1663                                                           size_t aLen) {
1664   nsAutoPtr<DataBuffer> buf(
1665       new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
1666 
1667   RUN_ON_THREAD(
1668       mStsThread,
1669       WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
1670                    &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf,
1671                    false),
1672       NS_DISPATCH_NORMAL);
1673 
1674   return NS_OK;
1675 }
1676 
1677 // Called if we're attached with AddDirectListener()
NotifyRealtimeTrackData(MediaStreamGraph * aGraph,StreamTime aOffset,const MediaSegment & aMedia)1678 void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
1679     MediaStreamGraph* aGraph, StreamTime aOffset, const MediaSegment& aMedia) {
1680   CSFLogDebug(
1681       LOGTAG,
1682       "MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
1683       ", duration=%" PRId64,
1684       this, aOffset, aMedia.GetDuration());
1685 
1686   if (aMedia.GetType() == MediaSegment::VIDEO) {
1687     // We have to call the upstream NotifyRealtimeTrackData and
1688     // MediaStreamVideoSink will route them to SetCurrentFrames.
1689     MediaStreamVideoSink::NotifyRealtimeTrackData(aGraph, aOffset, aMedia);
1690     return;
1691   }
1692 
1693   NewData(aMedia, aGraph->GraphRate());
1694 }
1695 
NotifyQueuedChanges(MediaStreamGraph * aGraph,StreamTime aOffset,const MediaSegment & aQueuedMedia)1696 void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
1697     MediaStreamGraph* aGraph, StreamTime aOffset,
1698     const MediaSegment& aQueuedMedia) {
1699   CSFLogDebug(LOGTAG, "MediaPipeline::NotifyQueuedChanges()");
1700 
1701   if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
1702     // We always get video from SetCurrentFrames().
1703     return;
1704   }
1705 
1706   if (mDirectConnect) {
1707     // ignore non-direct data if we're also getting direct data
1708     return;
1709   }
1710 
1711   size_t rate;
1712   if (aGraph) {
1713     rate = aGraph->GraphRate();
1714   } else {
1715     // When running tests, graph may be null. In that case use a default.
1716     rate = 16000;
1717   }
1718   NewData(aQueuedMedia, rate);
1719 }
1720 
NotifyDirectListenerInstalled(InstallationResult aResult)1721 void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
1722     InstallationResult aResult) {
1723   CSFLogInfo(
1724       LOGTAG,
1725       "MediaPipeline::NotifyDirectListenerInstalled() listener=%p, result=%d",
1726       this, static_cast<int32_t>(aResult));
1727 
1728   mDirectConnect = InstallationResult::SUCCESS == aResult;
1729 }
1730 
1731 void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerUninstalled()1732     NotifyDirectListenerUninstalled() {
1733   CSFLogInfo(LOGTAG,
1734              "MediaPipeline::NotifyDirectListenerUninstalled() listener=%p",
1735              this);
1736 
1737   mDirectConnect = false;
1738 }
1739 
NewData(const MediaSegment & aMedia,TrackRate aRate)1740 void MediaPipelineTransmit::PipelineListener::NewData(
1741     const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
1742   if (!mActive) {
1743     CSFLogDebug(LOGTAG, "Discarding packets because transport not ready");
1744     return;
1745   }
1746 
1747   if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
1748                                ? MediaSessionConduit::AUDIO
1749                                : MediaSessionConduit::VIDEO)) {
1750     MOZ_ASSERT(false,
1751                "The media type should always be correct since the "
1752                "listener is locked to a specific track");
1753     return;
1754   }
1755 
1756   // TODO(ekr@rtfm.com): For now assume that we have only one
1757   // track type and it's destined for us
1758   // See bug 784517
1759   if (aMedia.GetType() == MediaSegment::AUDIO) {
1760     MOZ_RELEASE_ASSERT(aRate > 0);
1761 
1762     const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
1763     for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
1764          iter.Next()) {
1765       mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
1766     }
1767   } else {
1768     const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
1769     for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
1770          iter.Next()) {
1771       mConverter->QueueVideoChunk(*iter, !mEnabled);
1772     }
1773   }
1774 }
1775 
SetCurrentFrames(const VideoSegment & aSegment)1776 void MediaPipelineTransmit::PipelineListener::SetCurrentFrames(
1777     const VideoSegment& aSegment) {
1778   NewData(aSegment);
1779 }
1780 
1781 class GenericReceiveListener : public MediaStreamListener {
1782  public:
GenericReceiveListener(dom::MediaStreamTrack * aTrack)1783   explicit GenericReceiveListener(dom::MediaStreamTrack* aTrack)
1784       : mTrack(aTrack),
1785         mTrackId(aTrack->GetInputTrackId()),
1786         mSource(mTrack->GetInputStream()->AsSourceStream()),
1787         mPlayedTicks(0),
1788         mPrincipalHandle(PRINCIPAL_HANDLE_NONE),
1789         mListening(false),
1790         mMaybeTrackNeedsUnmute(true) {
1791     MOZ_RELEASE_ASSERT(mSource, "Must be used with a SourceMediaStream");
1792   }
1793 
~GenericReceiveListener()1794   virtual ~GenericReceiveListener() {
1795     NS_ReleaseOnMainThreadSystemGroup("GenericReceiveListener::track_",
1796                                       mTrack.forget());
1797   }
1798 
AddTrackToSource(uint32_t aRate=0)1799   void AddTrackToSource(uint32_t aRate = 0) {
1800     MOZ_ASSERT((aRate != 0 && mTrack->AsAudioStreamTrack()) ||
1801                mTrack->AsVideoStreamTrack());
1802 
1803     if (mTrack->AsAudioStreamTrack()) {
1804       mSource->AddAudioTrack(mTrackId, aRate, 0, new AudioSegment());
1805     } else if (mTrack->AsVideoStreamTrack()) {
1806       mSource->AddTrack(mTrackId, 0, new VideoSegment());
1807     }
1808     CSFLogDebug(LOGTAG,
1809                 "GenericReceiveListener added %s track %d (%p) to stream %p",
1810                 mTrack->AsAudioStreamTrack() ? "audio" : "video", mTrackId,
1811                 mTrack.get(), mSource.get());
1812 
1813     mSource->AdvanceKnownTracksTime(STREAM_TIME_MAX);
1814     mSource->AddListener(this);
1815   }
1816 
AddSelf()1817   void AddSelf() {
1818     if (!mListening) {
1819       mListening = true;
1820       mSource->SetPullEnabled(true);
1821       mMaybeTrackNeedsUnmute = true;
1822     }
1823   }
1824 
RemoveSelf()1825   void RemoveSelf() {
1826     if (mListening) {
1827       mListening = false;
1828       mSource->SetPullEnabled(false);
1829     }
1830   }
1831 
OnRtpReceived()1832   void OnRtpReceived() {
1833     if (mMaybeTrackNeedsUnmute) {
1834       mMaybeTrackNeedsUnmute = false;
1835       NS_DispatchToMainThread(
1836           NewRunnableMethod("GenericReceiveListener::OnRtpReceived_m", this,
1837                             &GenericReceiveListener::OnRtpReceived_m));
1838     }
1839   }
1840 
OnRtpReceived_m()1841   void OnRtpReceived_m() {
1842     if (mListening && mTrack->Muted()) {
1843       mTrack->MutedChanged(false);
1844     }
1845   }
1846 
EndTrack()1847   void EndTrack() {
1848     CSFLogDebug(LOGTAG, "GenericReceiveListener ending track");
1849 
1850     // This breaks the cycle with the SourceMediaStream
1851     mSource->RemoveListener(this);
1852     mSource->EndTrack(mTrackId);
1853   }
1854 
1855   // Must be called on the main thread
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)1856   void SetPrincipalHandle_m(const PrincipalHandle& aPrincipalHandle) {
1857     class Message : public ControlMessage {
1858      public:
1859       Message(GenericReceiveListener* aListener,
1860               const PrincipalHandle& aPrincipalHandle)
1861           : ControlMessage(nullptr),
1862             mListener(aListener),
1863             mPrincipalHandle(aPrincipalHandle) {}
1864 
1865       void Run() override {
1866         mListener->SetPrincipalHandle_msg(mPrincipalHandle);
1867       }
1868 
1869       const RefPtr<GenericReceiveListener> mListener;
1870       PrincipalHandle mPrincipalHandle;
1871     };
1872 
1873     mTrack->GraphImpl()->AppendMessage(
1874         MakeUnique<Message>(this, aPrincipalHandle));
1875   }
1876 
1877   // Must be called on the MediaStreamGraph thread
SetPrincipalHandle_msg(const PrincipalHandle & aPrincipalHandle)1878   void SetPrincipalHandle_msg(const PrincipalHandle& aPrincipalHandle) {
1879     mPrincipalHandle = aPrincipalHandle;
1880   }
1881 
1882  protected:
1883   RefPtr<dom::MediaStreamTrack> mTrack;
1884   const TrackID mTrackId;
1885   const RefPtr<SourceMediaStream> mSource;
1886   TrackTicks mPlayedTicks;
1887   PrincipalHandle mPrincipalHandle;
1888   bool mListening;
1889   Atomic<bool> mMaybeTrackNeedsUnmute;
1890 };
1891 
MediaPipelineReceive(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)1892 MediaPipelineReceive::MediaPipelineReceive(const std::string& aPc,
1893                                            nsCOMPtr<nsIEventTarget> aMainThread,
1894                                            nsCOMPtr<nsIEventTarget> aStsThread,
1895                                            RefPtr<MediaSessionConduit> aConduit)
1896     : MediaPipeline(aPc, DirectionType::RECEIVE, aMainThread, aStsThread,
1897                     aConduit) {}
1898 
~MediaPipelineReceive()1899 MediaPipelineReceive::~MediaPipelineReceive() {}
1900 
1901 class MediaPipelineReceiveAudio::PipelineListener
1902     : public GenericReceiveListener {
1903  public:
PipelineListener(dom::MediaStreamTrack * aTrack,const RefPtr<MediaSessionConduit> & aConduit)1904   PipelineListener(dom::MediaStreamTrack* aTrack,
1905                    const RefPtr<MediaSessionConduit>& aConduit)
1906       : GenericReceiveListener(aTrack),
1907         mConduit(aConduit)
1908         // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
1909         // This is an artificial limitation, it would however require more
1910         // changes to support any rates. If the sampling rate is not-supported,
1911         // we will use 48kHz instead.
1912         ,
1913         mRate(static_cast<AudioSessionConduit*>(mConduit.get())
1914                       ->IsSamplingFreqSupported(mSource->GraphRate())
1915                   ? mSource->GraphRate()
1916                   : WEBRTC_MAX_SAMPLE_RATE),
1917         mTaskQueue(new AutoTaskQueue(
1918             GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER),
1919             "AudioPipelineListener")),
1920         mLastLog(0) {
1921     AddTrackToSource(mRate);
1922   }
1923 
1924   // Implement MediaStreamListener
NotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)1925   void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
1926     NotifyPullImpl(aDesiredTime);
1927   }
1928 
AsyncNotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)1929   RefPtr<SourceMediaStream::NotifyPullPromise> AsyncNotifyPull(
1930       MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
1931     RefPtr<PipelineListener> self = this;
1932     return InvokeAsync(mTaskQueue, __func__, [self, aDesiredTime]() {
1933       self->NotifyPullImpl(aDesiredTime);
1934       return SourceMediaStream::NotifyPullPromise::CreateAndResolve(true,
1935                                                                     __func__);
1936     });
1937   }
1938 
1939  private:
~PipelineListener()1940   ~PipelineListener() {
1941     NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
1942                                       mConduit.forget());
1943   }
1944 
NotifyPullImpl(StreamTime aDesiredTime)1945   void NotifyPullImpl(StreamTime aDesiredTime) {
1946     uint32_t samplesPer10ms = mRate / 100;
1947 
1948     // mSource's rate is not necessarily the same as the graph rate, since there
1949     // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
1950     // 48kHz are supported. The audio frames we get here is going to be
1951     // resampled when inserted into the graph.
1952     TrackTicks desired = mSource->TimeToTicksRoundUp(mRate, aDesiredTime);
1953     TrackTicks framesNeeded = desired - mPlayedTicks;
1954 
1955     while (framesNeeded >= 0) {
1956       const int scratchBufferLength =
1957           AUDIO_SAMPLE_BUFFER_MAX_BYTES / sizeof(int16_t);
1958       int16_t scratchBuffer[scratchBufferLength];
1959 
1960       int samplesLength = scratchBufferLength;
1961 
1962       // This fetches 10ms of data, either mono or stereo
1963       MediaConduitErrorCode err =
1964           static_cast<AudioSessionConduit*>(mConduit.get())
1965               ->GetAudioFrame(
1966                   scratchBuffer, mRate,
1967                   0,  // TODO(ekr@rtfm.com): better estimate of "capture"
1968                       // (really playout) delay
1969                   samplesLength);
1970 
1971       if (err != kMediaConduitNoError) {
1972         // Insert silence on conduit/GIPS failure (extremely unlikely)
1973         CSFLogError(LOGTAG,
1974                     "Audio conduit failed (%d) to return data @ %" PRId64
1975                     " (desired %" PRId64 " -> %f)",
1976                     err, mPlayedTicks, aDesiredTime,
1977                     mSource->StreamTimeToSeconds(aDesiredTime));
1978         // if this is not enough we'll loop and provide more
1979         samplesLength = samplesPer10ms;
1980         PodArrayZero(scratchBuffer);
1981       }
1982 
1983       MOZ_RELEASE_ASSERT(samplesLength <= scratchBufferLength);
1984 
1985       CSFLogDebug(LOGTAG, "Audio conduit returned buffer of length %u",
1986                   samplesLength);
1987 
1988       RefPtr<SharedBuffer> samples =
1989           SharedBuffer::Create(samplesLength * sizeof(uint16_t));
1990       int16_t* samplesData = static_cast<int16_t*>(samples->Data());
1991       AudioSegment segment;
1992       // We derive the number of channels of the stream from the number of
1993       // samples the AudioConduit gives us, considering it gives us packets of
1994       // 10ms and we know the rate.
1995       uint32_t channelCount = samplesLength / samplesPer10ms;
1996       AutoTArray<int16_t*, 2> channels;
1997       AutoTArray<const int16_t*, 2> outputChannels;
1998       size_t frames = samplesLength / channelCount;
1999 
2000       channels.SetLength(channelCount);
2001 
2002       size_t offset = 0;
2003       for (size_t i = 0; i < channelCount; i++) {
2004         channels[i] = samplesData + offset;
2005         offset += frames;
2006       }
2007 
2008       DeinterleaveAndConvertBuffer(scratchBuffer, frames, channelCount,
2009                                    channels.Elements());
2010 
2011       outputChannels.AppendElements(channels);
2012 
2013       segment.AppendFrames(samples.forget(), outputChannels, frames,
2014                            mPrincipalHandle);
2015 
2016       // Handle track not actually added yet or removed/finished
2017       if (mSource->AppendToTrack(mTrackId, &segment)) {
2018         framesNeeded -= frames;
2019         mPlayedTicks += frames;
2020         if (MOZ_LOG_TEST(AudioLogModule(), LogLevel::Debug)) {
2021           if (mPlayedTicks > mLastLog + mRate) {
2022             MOZ_LOG(AudioLogModule(), LogLevel::Debug,
2023                     ("%p: Inserting samples into track %d, total = "
2024                      "%" PRIu64,
2025                      (void*)this, mTrackId, mPlayedTicks));
2026             mLastLog = mPlayedTicks;
2027           }
2028         }
2029       } else {
2030         CSFLogError(LOGTAG, "AppendToTrack failed");
2031         // we can't un-read the data, but that's ok since we don't want to
2032         // buffer - but don't i-loop!
2033         break;
2034       }
2035     }
2036   }
2037 
2038   RefPtr<MediaSessionConduit> mConduit;
2039   const TrackRate mRate;
2040   const RefPtr<AutoTaskQueue> mTaskQueue;
2041   // Graph's current sampling rate
2042   TrackTicks mLastLog = 0;  // mPlayedTicks when we last logged
2043 };
2044 
MediaPipelineReceiveAudio(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<AudioSessionConduit> aConduit,dom::MediaStreamTrack * aTrack)2045 MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
2046     const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
2047     nsCOMPtr<nsIEventTarget> aStsThread, RefPtr<AudioSessionConduit> aConduit,
2048     dom::MediaStreamTrack* aTrack)
2049     : MediaPipelineReceive(aPc, aMainThread, aStsThread, aConduit),
2050       mListener(aTrack ? new PipelineListener(aTrack, mConduit) : nullptr) {
2051   mDescription = mPc + "| Receive audio";
2052 }
2053 
DetachMedia()2054 void MediaPipelineReceiveAudio::DetachMedia() {
2055   ASSERT_ON_THREAD(mMainThread);
2056   if (mListener) {
2057     mListener->EndTrack();
2058     mListener = nullptr;
2059   }
2060 }
2061 
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)2062 void MediaPipelineReceiveAudio::SetPrincipalHandle_m(
2063     const PrincipalHandle& aPrincipalHandle) {
2064   if (mListener) {
2065     mListener->SetPrincipalHandle_m(aPrincipalHandle);
2066   }
2067 }
2068 
Start()2069 void MediaPipelineReceiveAudio::Start() {
2070   mConduit->StartReceiving();
2071   if (mListener) {
2072     mListener->AddSelf();
2073   }
2074 }
2075 
Stop()2076 void MediaPipelineReceiveAudio::Stop() {
2077   if (mListener) {
2078     mListener->RemoveSelf();
2079   }
2080   mConduit->StopReceiving();
2081 }
2082 
OnRtpPacketReceived()2083 void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
2084   if (mListener) {
2085     mListener->OnRtpReceived();
2086   }
2087 }
2088 
2089 class MediaPipelineReceiveVideo::PipelineListener
2090     : public GenericReceiveListener {
2091  public:
PipelineListener(dom::MediaStreamTrack * aTrack)2092   explicit PipelineListener(dom::MediaStreamTrack* aTrack)
2093       : GenericReceiveListener(aTrack),
2094         mImageContainer(
2095             LayerManager::CreateImageContainer(ImageContainer::ASYNCHRONOUS)),
2096         mMutex("Video PipelineListener") {
2097     AddTrackToSource();
2098   }
2099 
2100   // Implement MediaStreamListener
NotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)2101   void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
2102     MutexAutoLock lock(mMutex);
2103 
2104     RefPtr<Image> image = mImage;
2105     StreamTime delta = aDesiredTime - mPlayedTicks;
2106 
2107     // Don't append if we've already provided a frame that supposedly
2108     // goes past the current aDesiredTime Doing so means a negative
2109     // delta and thus messes up handling of the graph
2110     if (delta > 0) {
2111       VideoSegment segment;
2112       IntSize size = image ? image->GetSize() : IntSize(mWidth, mHeight);
2113       segment.AppendFrame(image.forget(), delta, size, mPrincipalHandle);
2114       // Handle track not actually added yet or removed/finished
2115       if (!mSource->AppendToTrack(mTrackId, &segment)) {
2116         CSFLogError(LOGTAG, "AppendToTrack failed");
2117         return;
2118       }
2119       mPlayedTicks = aDesiredTime;
2120     }
2121   }
2122 
2123   // Accessors for external writes from the renderer
FrameSizeChange(unsigned int aWidth,unsigned int aHeight,unsigned int aNumberOfStreams)2124   void FrameSizeChange(unsigned int aWidth, unsigned int aHeight,
2125                        unsigned int aNumberOfStreams) {
2126     MutexAutoLock enter(mMutex);
2127 
2128     mWidth = aWidth;
2129     mHeight = aHeight;
2130   }
2131 
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)2132   void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
2133                         uint32_t aTimeStamp, int64_t aRenderTime) {
2134     if (aBuffer.native_handle()) {
2135       // We assume that only native handles are used with the
2136       // WebrtcMediaDataDecoderCodec decoder.
2137       RefPtr<Image> image = static_cast<Image*>(aBuffer.native_handle());
2138       MutexAutoLock lock(mMutex);
2139       mImage = image;
2140       return;
2141     }
2142 
2143     MOZ_ASSERT(aBuffer.DataY());
2144     // Create a video frame using |buffer|.
2145     RefPtr<PlanarYCbCrImage> yuvImage =
2146         mImageContainer->CreatePlanarYCbCrImage();
2147 
2148     PlanarYCbCrData yuvData;
2149     yuvData.mYChannel = const_cast<uint8_t*>(aBuffer.DataY());
2150     yuvData.mYSize = IntSize(aBuffer.width(), aBuffer.height());
2151     yuvData.mYStride = aBuffer.StrideY();
2152     MOZ_ASSERT(aBuffer.StrideU() == aBuffer.StrideV());
2153     yuvData.mCbCrStride = aBuffer.StrideU();
2154     yuvData.mCbChannel = const_cast<uint8_t*>(aBuffer.DataU());
2155     yuvData.mCrChannel = const_cast<uint8_t*>(aBuffer.DataV());
2156     yuvData.mCbCrSize =
2157         IntSize((aBuffer.width() + 1) >> 1, (aBuffer.height() + 1) >> 1);
2158     yuvData.mPicX = 0;
2159     yuvData.mPicY = 0;
2160     yuvData.mPicSize = IntSize(aBuffer.width(), aBuffer.height());
2161     yuvData.mStereoMode = StereoMode::MONO;
2162 
2163     if (!yuvImage->CopyData(yuvData)) {
2164       MOZ_ASSERT(false);
2165       return;
2166     }
2167 
2168     MutexAutoLock lock(mMutex);
2169     mImage = yuvImage;
2170   }
2171 
2172  private:
2173   int mWidth;
2174   int mHeight;
2175   RefPtr<layers::ImageContainer> mImageContainer;
2176   RefPtr<layers::Image> mImage;
2177   Mutex mMutex;  // Mutex for processing WebRTC frames.
2178                  // Protects mImage against:
2179                  // - Writing from the GIPS thread
2180                  // - Reading from the MSG thread
2181 };
2182 
2183 class MediaPipelineReceiveVideo::PipelineRenderer
2184     : public mozilla::VideoRenderer {
2185  public:
PipelineRenderer(MediaPipelineReceiveVideo * aPipeline)2186   explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
2187       : mPipeline(aPipeline) {}
2188 
Detach()2189   void Detach() { mPipeline = nullptr; }
2190 
2191   // Implement VideoRenderer
FrameSizeChange(unsigned int aWidth,unsigned int aHeight,unsigned int aNumberOfStreams)2192   void FrameSizeChange(unsigned int aWidth, unsigned int aHeight,
2193                        unsigned int aNumberOfStreams) override {
2194     mPipeline->mListener->FrameSizeChange(aWidth, aHeight, aNumberOfStreams);
2195   }
2196 
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)2197   void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
2198                         uint32_t aTimeStamp, int64_t aRenderTime) override {
2199     mPipeline->mListener->RenderVideoFrame(aBuffer, aTimeStamp, aRenderTime);
2200   }
2201 
2202  private:
2203   MediaPipelineReceiveVideo* mPipeline;  // Raw pointer to avoid cycles
2204 };
2205 
MediaPipelineReceiveVideo(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<VideoSessionConduit> aConduit,dom::MediaStreamTrack * aTrack)2206 MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
2207     const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
2208     nsCOMPtr<nsIEventTarget> aStsThread, RefPtr<VideoSessionConduit> aConduit,
2209     dom::MediaStreamTrack* aTrack)
2210     : MediaPipelineReceive(aPc, aMainThread, aStsThread, aConduit),
2211       mRenderer(new PipelineRenderer(this)),
2212       mListener(aTrack ? new PipelineListener(aTrack) : nullptr) {
2213   mDescription = mPc + "| Receive video";
2214   aConduit->AttachRenderer(mRenderer);
2215 }
2216 
DetachMedia()2217 void MediaPipelineReceiveVideo::DetachMedia() {
2218   ASSERT_ON_THREAD(mMainThread);
2219 
2220   // stop generating video and thus stop invoking the PipelineRenderer
2221   // and PipelineListener - the renderer has a raw ptr to the Pipeline to
2222   // avoid cycles, and the render callbacks are invoked from a different
2223   // thread so simple null-checks would cause TSAN bugs without locks.
2224   static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
2225   if (mListener) {
2226     mListener->EndTrack();
2227     mListener = nullptr;
2228   }
2229 }
2230 
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)2231 void MediaPipelineReceiveVideo::SetPrincipalHandle_m(
2232     const PrincipalHandle& aPrincipalHandle) {
2233   if (mListener) {
2234     mListener->SetPrincipalHandle_m(aPrincipalHandle);
2235   }
2236 }
2237 
Start()2238 void MediaPipelineReceiveVideo::Start() {
2239   mConduit->StartReceiving();
2240   if (mListener) {
2241     mListener->AddSelf();
2242   }
2243 }
2244 
Stop()2245 void MediaPipelineReceiveVideo::Stop() {
2246   if (mListener) {
2247     mListener->RemoveSelf();
2248   }
2249   mConduit->StopReceiving();
2250 }
2251 
OnRtpPacketReceived()2252 void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
2253   if (mListener) {
2254     mListener->OnRtpReceived();
2255   }
2256 }
2257 
GetNow()2258 DOMHighResTimeStamp MediaPipeline::GetNow() {
2259   return webrtc::Clock::GetRealTimeClock()->TimeInMilliseconds();
2260 }
2261 
GetExpiryFromTime(const DOMHighResTimeStamp aTime)2262 DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
2263     const DOMHighResTimeStamp aTime) {
2264   // DOMHighResTimeStamp is a unit measured in ms
2265   return aTime - EXPIRY_TIME_MILLISECONDS;
2266 }
2267 
RtpCSRCStats(const uint32_t aCsrc,const DOMHighResTimeStamp aTime)2268 MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
2269                                           const DOMHighResTimeStamp aTime)
2270     : mCsrc(aCsrc), mTimestamp(aTime) {}
2271 
GetWebidlInstance(dom::RTCRTPContributingSourceStats & aWebidlObj,const nsString & aInboundRtpStreamId) const2272 void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
2273     dom::RTCRTPContributingSourceStats& aWebidlObj,
2274     const nsString& aInboundRtpStreamId) const {
2275   nsString statId = NS_LITERAL_STRING("csrc_") + aInboundRtpStreamId;
2276   statId.AppendLiteral("_");
2277   statId.AppendInt(mCsrc);
2278   aWebidlObj.mId.Construct(statId);
2279   aWebidlObj.mType.Construct(RTCStatsType::Csrc);
2280   aWebidlObj.mTimestamp.Construct(mTimestamp);
2281   aWebidlObj.mContributorSsrc.Construct(mCsrc);
2282   aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
2283 }
2284 
2285 }  // namespace mozilla
2286