1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
4 * You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 // Original author: ekr@rtfm.com
7
8 #include "MediaPipeline.h"
9
10 #include <inttypes.h>
11 #include <math.h>
12
13 #include "AudioSegment.h"
14 #include "AudioConverter.h"
15 #include "AutoTaskQueue.h"
16 #include "CSFLog.h"
17 #include "DOMMediaStream.h"
18 #include "ImageContainer.h"
19 #include "ImageTypes.h"
20 #include "Layers.h"
21 #include "LayersLogging.h"
22 #include "MediaEngine.h"
23 #include "MediaPipelineFilter.h"
24 #include "MediaSegment.h"
25 #include "MediaStreamGraphImpl.h"
26 #include "MediaStreamListener.h"
27 #include "MediaStreamTrack.h"
28 #include "MediaStreamVideoSink.h"
29 #include "RtpLogger.h"
30 #include "VideoSegment.h"
31 #include "VideoStreamTrack.h"
32 #include "VideoUtils.h"
33 #include "databuffer.h"
34 #include "libyuv/convert.h"
35 #include "mozilla/PeerIdentity.h"
36 #include "mozilla/Preferences.h"
37 #include "mozilla/SharedThreadPool.h"
38 #include "mozilla/Sprintf.h"
39 #include "mozilla/UniquePtr.h"
40 #include "mozilla/UniquePtrExtensions.h"
41 #include "mozilla/dom/RTCStatsReportBinding.h"
42 #include "mozilla/gfx/Point.h"
43 #include "mozilla/gfx/Types.h"
44 #include "nsError.h"
45 #include "nsThreadUtils.h"
46 #include "nspr.h"
47 #include "runnable_utils.h"
48 #include "srtp.h"
49 #include "transportflow.h"
50 #include "transportlayer.h"
51 #include "transportlayerdtls.h"
52 #include "transportlayerice.h"
53
54 #include "webrtc/base/bind.h"
55 #include "webrtc/base/keep_ref_until_done.h"
56 #include "webrtc/common_types.h"
57 #include "webrtc/common_video/include/i420_buffer_pool.h"
58 #include "webrtc/common_video/include/video_frame_buffer.h"
59 #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
60
61 // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
62 // 48KHz)
63 #define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
64 static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
65 AUDIO_SAMPLE_BUFFER_MAX_BYTES,
66 "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
67
68 // The number of frame buffers VideoFrameConverter may create before returning
69 // errors.
70 // Sometimes these are released synchronously but they can be forwarded all the
71 // way to the encoder for asynchronous encoding. With a pool size of 5,
72 // we allow 1 buffer for the current conversion, and 4 buffers to be queued at
73 // the encoder.
74 #define CONVERTER_BUFFER_POOL_SIZE 5
75
76 using namespace mozilla;
77 using namespace mozilla::dom;
78 using namespace mozilla::gfx;
79 using namespace mozilla::layers;
80
81 static const char* mpLogTag = "MediaPipeline";
82 #ifdef LOGTAG
83 #undef LOGTAG
84 #endif
85 #define LOGTAG mpLogTag
86
87 namespace mozilla {
88 extern mozilla::LogModule* AudioLogModule();
89
90 class VideoConverterListener {
91 public:
92 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoConverterListener)
93
94 virtual void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) = 0;
95
96 protected:
~VideoConverterListener()97 virtual ~VideoConverterListener() {}
98 };
99
100 // An async video frame format converter.
101 //
102 // Input is typically a MediaStream(Track)Listener driven by MediaStreamGraph.
103 //
104 // We keep track of the size of the TaskQueue so we can drop frames if
105 // conversion is taking too long.
106 //
107 // Output is passed through to all added VideoConverterListeners on a TaskQueue
108 // thread whenever a frame is converted.
109 class VideoFrameConverter {
110 public:
111 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoFrameConverter)
112
VideoFrameConverter()113 VideoFrameConverter()
114 : mLength(0),
115 mTaskQueue(new AutoTaskQueue(
116 GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER),
117 "VideoFrameConverter")),
118 mBufferPool(false, CONVERTER_BUFFER_POOL_SIZE),
119 mLastImage(
120 -1) // -1 is not a guaranteed invalid serial. See bug 1262134.
121 #ifdef DEBUG
122 ,
123 mThrottleCount(0),
124 mThrottleRecord(0)
125 #endif
126 ,
127 mMutex("VideoFrameConverter") {
128 MOZ_COUNT_CTOR(VideoFrameConverter);
129 }
130
QueueVideoChunk(const VideoChunk & aChunk,bool aForceBlack)131 void QueueVideoChunk(const VideoChunk& aChunk, bool aForceBlack) {
132 IntSize size = aChunk.mFrame.GetIntrinsicSize();
133 if (size.width == 0 || size.width == 0) {
134 return;
135 }
136
137 if (aChunk.IsNull()) {
138 aForceBlack = true;
139 } else {
140 aForceBlack = aChunk.mFrame.GetForceBlack();
141 }
142
143 int32_t serial;
144 if (aForceBlack) {
145 // Reset the last-img check.
146 // -1 is not a guaranteed invalid serial. See bug 1262134.
147 serial = -1;
148 } else {
149 serial = aChunk.mFrame.GetImage()->GetSerial();
150 }
151
152 const double duplicateMinFps = 1.0;
153 TimeStamp t = aChunk.mTimeStamp;
154 MOZ_ASSERT(!t.IsNull());
155 if (!t.IsNull() && serial == mLastImage && !mLastFrameSent.IsNull() &&
156 (t - mLastFrameSent).ToSeconds() < (1.0 / duplicateMinFps)) {
157 // We get passed duplicate frames every ~10ms even with no frame change.
158
159 // After disabling, or when the source is not producing many frames,
160 // we still want *some* frames to flow to the other side.
161 // It could happen that we drop the packet that carried the first disabled
162 // frame, for instance. Note that this still requires the application to
163 // send a frame, or it doesn't trigger at all.
164 return;
165 }
166 mLastFrameSent = t;
167 mLastImage = serial;
168
169 // A throttling limit of 1 allows us to convert 2 frames concurrently.
170 // It's short enough to not build up too significant a delay, while
171 // giving us a margin to not cause some machines to drop every other frame.
172 const int32_t queueThrottlingLimit = 1;
173 if (mLength > queueThrottlingLimit) {
174 CSFLogDebug(LOGTAG,
175 "VideoFrameConverter %p queue is full. Throttling by "
176 "throwing away a frame.",
177 this);
178 #ifdef DEBUG
179 ++mThrottleCount;
180 mThrottleRecord = std::max(mThrottleCount, mThrottleRecord);
181 #endif
182 return;
183 }
184
185 #ifdef DEBUG
186 if (mThrottleCount > 0) {
187 if (mThrottleCount > 5) {
188 // Log at a higher level when we have large drops.
189 CSFLogInfo(LOGTAG,
190 "VideoFrameConverter %p stopped throttling after throwing "
191 "away %d frames. Longest throttle so far was %d frames.",
192 this, mThrottleCount, mThrottleRecord);
193 } else {
194 CSFLogDebug(LOGTAG,
195 "VideoFrameConverter %p stopped throttling after throwing "
196 "away %d frames. Longest throttle so far was %d frames.",
197 this, mThrottleCount, mThrottleRecord);
198 }
199 mThrottleCount = 0;
200 }
201 #endif
202
203 ++mLength; // Atomic
204
205 nsCOMPtr<nsIRunnable> runnable =
206 NewRunnableMethod<StoreRefPtrPassByPtr<Image>, IntSize, bool>(
207 "VideoFrameConverter::ProcessVideoFrame", this,
208 &VideoFrameConverter::ProcessVideoFrame, aChunk.mFrame.GetImage(),
209 size, aForceBlack);
210 nsresult rv = mTaskQueue->Dispatch(runnable.forget());
211 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
212 Unused << rv;
213 }
214
AddListener(VideoConverterListener * aListener)215 void AddListener(VideoConverterListener* aListener) {
216 MutexAutoLock lock(mMutex);
217
218 MOZ_ASSERT(!mListeners.Contains(aListener));
219 mListeners.AppendElement(aListener);
220 }
221
RemoveListener(VideoConverterListener * aListener)222 bool RemoveListener(VideoConverterListener* aListener) {
223 MutexAutoLock lock(mMutex);
224
225 return mListeners.RemoveElement(aListener);
226 }
227
Shutdown()228 void Shutdown() {
229 MutexAutoLock lock(mMutex);
230 mListeners.Clear();
231 }
232
233 protected:
~VideoFrameConverter()234 virtual ~VideoFrameConverter() { MOZ_COUNT_DTOR(VideoFrameConverter); }
235
DeleteBuffer(uint8 * aData)236 static void DeleteBuffer(uint8* aData) { delete[] aData; }
237
238 // This takes ownership of the buffer and attached it to the VideoFrame we
239 // send to the listeners
VideoFrameConverted(UniquePtr<uint8[]> aBuffer,unsigned int aVideoFrameLength,unsigned short aWidth,unsigned short aHeight,VideoType aVideoType,uint64_t aCaptureTime)240 void VideoFrameConverted(UniquePtr<uint8[]> aBuffer,
241 unsigned int aVideoFrameLength,
242 unsigned short aWidth, unsigned short aHeight,
243 VideoType aVideoType, uint64_t aCaptureTime) {
244 // check for parameter sanity
245 if (!aBuffer || aVideoFrameLength == 0 || aWidth == 0 || aHeight == 0) {
246 CSFLogError(LOGTAG, "%s Invalid Parameters", __FUNCTION__);
247 MOZ_ASSERT(false);
248 return;
249 }
250 MOZ_ASSERT(aVideoType == VideoType::kVideoI420);
251
252 const int stride_y = aWidth;
253 const int stride_uv = (aWidth + 1) / 2;
254
255 const uint8_t* buffer_y = aBuffer.get();
256 const uint8_t* buffer_u = buffer_y + stride_y * aHeight;
257 const uint8_t* buffer_v = buffer_u + stride_uv * ((aHeight + 1) / 2);
258 rtc::scoped_refptr<webrtc::WrappedI420Buffer> video_frame_buffer(
259 new rtc::RefCountedObject<webrtc::WrappedI420Buffer>(
260 aWidth, aHeight, buffer_y, stride_y, buffer_u, stride_uv, buffer_v,
261 stride_uv, rtc::Bind(&DeleteBuffer, aBuffer.release())));
262
263 webrtc::VideoFrame video_frame(video_frame_buffer, aCaptureTime,
264 aCaptureTime,
265 webrtc::kVideoRotation_0); // XXX
266 VideoFrameConverted(video_frame);
267 }
268
VideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)269 void VideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
270 MutexAutoLock lock(mMutex);
271
272 for (RefPtr<VideoConverterListener>& listener : mListeners) {
273 listener->OnVideoFrameConverted(aVideoFrame);
274 }
275 }
276
ProcessVideoFrame(Image * aImage,IntSize aSize,bool aForceBlack)277 void ProcessVideoFrame(Image* aImage, IntSize aSize, bool aForceBlack) {
278 --mLength; // Atomic
279 MOZ_ASSERT(mLength >= 0);
280
281 if (aForceBlack) {
282 // Send a black image.
283 rtc::scoped_refptr<webrtc::I420Buffer> buffer =
284 mBufferPool.CreateBuffer(aSize.width, aSize.height);
285 if (!buffer) {
286 MOZ_DIAGNOSTIC_ASSERT(false,
287 "Buffers not leaving scope except for "
288 "reconfig, should never leak");
289 CSFLogWarn(LOGTAG, "Creating a buffer for a black video frame failed");
290 return;
291 }
292
293 CSFLogDebug(LOGTAG, "Sending a black video frame");
294 webrtc::I420Buffer::SetBlack(buffer);
295 webrtc::VideoFrame frame(buffer, 0, 0, // not setting timestamps
296 webrtc::kVideoRotation_0);
297 VideoFrameConverted(frame);
298 return;
299 }
300
301 if (!aImage) {
302 MOZ_ASSERT_UNREACHABLE("Must have image if not forcing black");
303 return;
304 }
305
306 ImageFormat format = aImage->GetFormat();
307 if (format == ImageFormat::PLANAR_YCBCR) {
308 // Cast away constness b/c some of the accessors are non-const
309 const PlanarYCbCrData* data =
310 static_cast<const PlanarYCbCrImage*>(aImage)->GetData();
311 if (data) {
312 uint8_t* y = data->mYChannel;
313 uint8_t* cb = data->mCbChannel;
314 uint8_t* cr = data->mCrChannel;
315 int32_t yStride = data->mYStride;
316 int32_t cbCrStride = data->mCbCrStride;
317 uint32_t width = aImage->GetSize().width;
318 uint32_t height = aImage->GetSize().height;
319
320 rtc::scoped_refptr<webrtc::WrappedI420Buffer> video_frame_buffer(
321 new rtc::RefCountedObject<webrtc::WrappedI420Buffer>(
322 width, height, y, yStride, cb, cbCrStride, cr, cbCrStride,
323 rtc::KeepRefUntilDone(aImage)));
324
325 webrtc::VideoFrame i420_frame(video_frame_buffer, 0,
326 0, // not setting timestamps
327 webrtc::kVideoRotation_0);
328 CSFLogDebug(LOGTAG, "Sending an I420 video frame");
329 VideoFrameConverted(i420_frame);
330 return;
331 }
332 }
333
334 RefPtr<SourceSurface> surf = aImage->GetAsSourceSurface();
335 if (!surf) {
336 CSFLogError(LOGTAG, "Getting surface from %s image failed",
337 Stringify(format).c_str());
338 return;
339 }
340
341 RefPtr<DataSourceSurface> data = surf->GetDataSurface();
342 if (!data) {
343 CSFLogError(
344 LOGTAG,
345 "Getting data surface from %s image with %s (%s) surface failed",
346 Stringify(format).c_str(), Stringify(surf->GetType()).c_str(),
347 Stringify(surf->GetFormat()).c_str());
348 return;
349 }
350
351 if (aImage->GetSize() != aSize) {
352 MOZ_DIAGNOSTIC_ASSERT(false, "Unexpected intended size");
353 return;
354 }
355
356 rtc::scoped_refptr<webrtc::I420Buffer> buffer =
357 mBufferPool.CreateBuffer(aSize.width, aSize.height);
358 if (!buffer) {
359 CSFLogWarn(LOGTAG, "Creating a buffer for a black video frame failed");
360 return;
361 }
362
363 DataSourceSurface::ScopedMap map(data, DataSourceSurface::READ);
364 if (!map.IsMapped()) {
365 CSFLogError(
366 LOGTAG,
367 "Reading DataSourceSurface from %s image with %s (%s) surface failed",
368 Stringify(format).c_str(), Stringify(surf->GetType()).c_str(),
369 Stringify(surf->GetFormat()).c_str());
370 return;
371 }
372
373 int rv;
374 switch (surf->GetFormat()) {
375 case SurfaceFormat::B8G8R8A8:
376 case SurfaceFormat::B8G8R8X8:
377 rv = libyuv::ARGBToI420(static_cast<uint8*>(map.GetData()),
378 map.GetStride(), buffer->MutableDataY(),
379 buffer->StrideY(), buffer->MutableDataU(),
380 buffer->StrideU(), buffer->MutableDataV(),
381 buffer->StrideV(), aSize.width, aSize.height);
382 break;
383 case SurfaceFormat::R5G6B5_UINT16:
384 rv = libyuv::RGB565ToI420(static_cast<uint8*>(map.GetData()),
385 map.GetStride(), buffer->MutableDataY(),
386 buffer->StrideY(), buffer->MutableDataU(),
387 buffer->StrideU(), buffer->MutableDataV(),
388 buffer->StrideV(), aSize.width, aSize.height);
389 break;
390 default:
391 CSFLogError(LOGTAG, "Unsupported RGB video format %s",
392 Stringify(surf->GetFormat()).c_str());
393 MOZ_ASSERT(PR_FALSE);
394 return;
395 }
396 if (rv != 0) {
397 CSFLogError(LOGTAG, "%s to I420 conversion failed",
398 Stringify(surf->GetFormat()).c_str());
399 return;
400 }
401 CSFLogDebug(LOGTAG, "Sending an I420 video frame converted from %s",
402 Stringify(surf->GetFormat()).c_str());
403 webrtc::VideoFrame frame(buffer, 0, 0, // not setting timestamps
404 webrtc::kVideoRotation_0);
405 VideoFrameConverted(frame);
406 }
407
408 Atomic<int32_t, Relaxed> mLength;
409 const RefPtr<AutoTaskQueue> mTaskQueue;
410 webrtc::I420BufferPool mBufferPool;
411
412 // Written and read from the queueing thread (normally MSG).
413 int32_t mLastImage; // serial number of last Image
414 TimeStamp mLastFrameSent; // The time we sent the last frame.
415 #ifdef DEBUG
416 uint32_t mThrottleCount;
417 uint32_t mThrottleRecord;
418 #endif
419
420 // mMutex guards the below variables.
421 Mutex mMutex;
422 nsTArray<RefPtr<VideoConverterListener>> mListeners;
423 };
424
425 // An async inserter for audio data, to avoid running audio codec encoders
426 // on the MSG/input audio thread. Basically just bounces all the audio
427 // data to a single audio processing/input queue. We could if we wanted to
428 // use multiple threads and a TaskQueue.
429 class AudioProxyThread {
430 public:
431 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
432
AudioProxyThread(AudioSessionConduit * aConduit)433 explicit AudioProxyThread(AudioSessionConduit* aConduit)
434 : mConduit(aConduit),
435 mTaskQueue(new AutoTaskQueue(
436 GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER), "AudioProxy")),
437 mAudioConverter(nullptr) {
438 MOZ_ASSERT(mConduit);
439 MOZ_COUNT_CTOR(AudioProxyThread);
440 }
441
442 // This function is the identity if aInputRate is supported.
443 // Else, it returns a rate that is supported, that ensure no loss in audio
444 // quality: the sampling rate returned is always greater to the inputed
445 // sampling-rate, if they differ..
AppropriateSendingRateForInputRate(uint32_t aInputRate)446 uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
447 AudioSessionConduit* conduit =
448 static_cast<AudioSessionConduit*>(mConduit.get());
449 if (conduit->IsSamplingFreqSupported(aInputRate)) {
450 return aInputRate;
451 }
452 if (aInputRate < 16000) {
453 return 16000;
454 } else if (aInputRate < 32000) {
455 return 32000;
456 } else if (aInputRate < 44100) {
457 return 44100;
458 } else {
459 return 48000;
460 }
461 }
462
463 // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
464 // something the conduit can work with (or send silence if the track is not
465 // enabled), and send the audio in 10ms chunks to the conduit.
InternalProcessAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)466 void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
467 bool aEnabled) {
468 MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
469
470 // Convert to interleaved 16-bits integer audio, with a maximum of two
471 // channels (since the WebRTC.org code below makes the assumption that the
472 // input audio is either mono or stereo), with a sample-rate rate that is
473 // 16, 32, 44.1, or 48kHz.
474 uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
475 int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
476
477 // We take advantage of the fact that the common case (microphone directly
478 // to PeerConnection, that is, a normal call), the samples are already
479 // 16-bits mono, so the representation in interleaved and planar is the
480 // same, and we can just use that.
481 if (aEnabled && outputChannels == 1 &&
482 aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
483 const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
484 PacketizeAndSend(samples, transmissionRate, outputChannels,
485 aChunk.mDuration);
486 return;
487 }
488
489 uint32_t sampleCount = aChunk.mDuration * outputChannels;
490 if (mInterleavedAudio.Length() < sampleCount) {
491 mInterleavedAudio.SetLength(sampleCount);
492 }
493
494 if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
495 PodZero(mInterleavedAudio.Elements(), sampleCount);
496 } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
497 DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
498 aChunk.mVolume, outputChannels,
499 mInterleavedAudio.Elements());
500 } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
501 DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
502 aChunk.mVolume, outputChannels,
503 mInterleavedAudio.Elements());
504 }
505 int16_t* inputAudio = mInterleavedAudio.Elements();
506 size_t inputAudioFrameCount = aChunk.mDuration;
507
508 AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
509 AudioConfig::FORMAT_S16);
510 AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
511 transmissionRate, AudioConfig::FORMAT_S16);
512 // Resample to an acceptable sample-rate for the sending side
513 if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
514 mAudioConverter->OutputConfig() != outputConfig) {
515 mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
516 }
517
518 int16_t* processedAudio = nullptr;
519 size_t framesProcessed =
520 mAudioConverter->Process(inputAudio, inputAudioFrameCount);
521
522 if (framesProcessed == 0) {
523 // In place conversion not possible, use a buffer.
524 framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
525 inputAudioFrameCount);
526 processedAudio = mOutputAudio.Data();
527 } else {
528 processedAudio = inputAudio;
529 }
530
531 PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
532 framesProcessed);
533 }
534
535 // This packetizes aAudioData in 10ms chunks and sends it.
536 // aAudioData is interleaved audio data at a rate and with a channel count
537 // that is appropriate to send with the conduit.
PacketizeAndSend(const int16_t * aAudioData,uint32_t aRate,uint32_t aChannels,uint32_t aFrameCount)538 void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
539 uint32_t aChannels, uint32_t aFrameCount) {
540 MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
541 MOZ_ASSERT(aChannels == 1 || aChannels == 2);
542 MOZ_ASSERT(aAudioData);
543
544 uint32_t audio_10ms = aRate / 100;
545
546 if (!mPacketizer || mPacketizer->PacketSize() != audio_10ms ||
547 mPacketizer->Channels() != aChannels) {
548 // It's the right thing to drop the bit of audio still in the packetizer:
549 // we don't want to send to the conduit audio that has two different
550 // rates while telling it that it has a constante rate.
551 mPacketizer =
552 MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
553 mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
554 }
555
556 mPacketizer->Input(aAudioData, aFrameCount);
557
558 while (mPacketizer->PacketsAvailable()) {
559 mPacketizer->Output(mPacket.get());
560 mConduit->SendAudioFrame(mPacket.get(), mPacketizer->PacketSize(), aRate,
561 mPacketizer->Channels(), 0);
562 }
563 }
564
QueueAudioChunk(TrackRate aRate,const AudioChunk & aChunk,bool aEnabled)565 void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
566 bool aEnabled) {
567 RefPtr<AudioProxyThread> self = this;
568 nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
569 "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
570 self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
571 }));
572 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
573 Unused << rv;
574 }
575
576 protected:
~AudioProxyThread()577 virtual ~AudioProxyThread() {
578 // Conduits must be released on MainThread, and we might have the last
579 // reference We don't need to worry about runnables still trying to access
580 // the conduit, since the runnables hold a ref to AudioProxyThread.
581 NS_ReleaseOnMainThreadSystemGroup("AudioProxyThread::mConduit",
582 mConduit.forget());
583 MOZ_COUNT_DTOR(AudioProxyThread);
584 }
585
586 RefPtr<AudioSessionConduit> mConduit;
587 const RefPtr<AutoTaskQueue> mTaskQueue;
588 // Only accessed on mTaskQueue
589 UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
590 // A buffer to hold a single packet of audio.
591 UniquePtr<int16_t[]> mPacket;
592 nsTArray<int16_t> mInterleavedAudio;
593 AlignedShortBuffer mOutputAudio;
594 UniquePtr<AudioConverter> mAudioConverter;
595 };
596
597 static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
598
MediaPipeline(const std::string & aPc,DirectionType aDirection,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)599 MediaPipeline::MediaPipeline(const std::string& aPc, DirectionType aDirection,
600 nsCOMPtr<nsIEventTarget> aMainThread,
601 nsCOMPtr<nsIEventTarget> aStsThread,
602 RefPtr<MediaSessionConduit> aConduit)
603 : mDirection(aDirection),
604 mLevel(0),
605 mConduit(aConduit),
606 mRtp(nullptr, RTP),
607 mRtcp(nullptr, RTCP),
608 mMainThread(aMainThread),
609 mStsThread(aStsThread),
610 mTransport(
611 new PipelineTransport(this)) // PipelineTransport() will access
612 // this->mStsThread; moved here
613 // for safety
614 ,
615 mRtpPacketsSent(0),
616 mRtcpPacketsSent(0),
617 mRtpPacketsReceived(0),
618 mRtcpPacketsReceived(0),
619 mRtpBytesSent(0),
620 mRtpBytesReceived(0),
621 mPc(aPc),
622 mRtpParser(webrtc::RtpHeaderParser::Create()),
623 mPacketDumper(new PacketDumper(mPc)) {
624 if (mDirection == DirectionType::RECEIVE) {
625 mConduit->SetReceiverTransport(mTransport);
626 } else {
627 mConduit->SetTransmitterTransport(mTransport);
628 }
629 }
630
~MediaPipeline()631 MediaPipeline::~MediaPipeline() {
632 CSFLogInfo(LOGTAG, "Destroying MediaPipeline: %s", mDescription.c_str());
633 NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
634 mConduit.forget());
635 }
636
Shutdown_m()637 void MediaPipeline::Shutdown_m() {
638 Stop();
639 DetachMedia();
640
641 RUN_ON_THREAD(mStsThread,
642 WrapRunnable(RefPtr<MediaPipeline>(this),
643 &MediaPipeline::DetachTransport_s),
644 NS_DISPATCH_NORMAL);
645 }
646
DetachTransport_s()647 void MediaPipeline::DetachTransport_s() {
648 ASSERT_ON_THREAD(mStsThread);
649
650 CSFLogInfo(LOGTAG, "%s in %s", mDescription.c_str(), __FUNCTION__);
651
652 disconnect_all();
653 mTransport->Detach();
654 mRtp.Detach();
655 mRtcp.Detach();
656
657 // Make sure any cycles are broken
658 mPacketDumper = nullptr;
659 }
660
AttachTransport_s()661 nsresult MediaPipeline::AttachTransport_s() {
662 ASSERT_ON_THREAD(mStsThread);
663 nsresult res;
664 MOZ_ASSERT(mRtp.mTransport);
665 MOZ_ASSERT(mRtcp.mTransport);
666 res = ConnectTransport_s(mRtp);
667 if (NS_FAILED(res)) {
668 return res;
669 }
670
671 if (mRtcp.mTransport != mRtp.mTransport) {
672 res = ConnectTransport_s(mRtcp);
673 if (NS_FAILED(res)) {
674 return res;
675 }
676 }
677
678 mTransport->Attach(this);
679
680 return NS_OK;
681 }
682
UpdateTransport_m(RefPtr<TransportFlow> aRtpTransport,RefPtr<TransportFlow> aRtcpTransport,nsAutoPtr<MediaPipelineFilter> aFilter)683 void MediaPipeline::UpdateTransport_m(RefPtr<TransportFlow> aRtpTransport,
684 RefPtr<TransportFlow> aRtcpTransport,
685 nsAutoPtr<MediaPipelineFilter> aFilter) {
686 RUN_ON_THREAD(mStsThread,
687 WrapRunnable(RefPtr<MediaPipeline>(this),
688 &MediaPipeline::UpdateTransport_s, aRtpTransport,
689 aRtcpTransport, aFilter),
690 NS_DISPATCH_NORMAL);
691 }
692
UpdateTransport_s(RefPtr<TransportFlow> aRtpTransport,RefPtr<TransportFlow> aRtcpTransport,nsAutoPtr<MediaPipelineFilter> aFilter)693 void MediaPipeline::UpdateTransport_s(RefPtr<TransportFlow> aRtpTransport,
694 RefPtr<TransportFlow> aRtcpTransport,
695 nsAutoPtr<MediaPipelineFilter> aFilter) {
696 bool rtcp_mux = false;
697 if (!aRtcpTransport) {
698 aRtcpTransport = aRtpTransport;
699 rtcp_mux = true;
700 }
701
702 if ((aRtpTransport != mRtp.mTransport) ||
703 (aRtcpTransport != mRtcp.mTransport)) {
704 disconnect_all();
705 mTransport->Detach();
706 mRtp.Detach();
707 mRtcp.Detach();
708 if (aRtpTransport && aRtcpTransport) {
709 mRtp = TransportInfo(aRtpTransport, rtcp_mux ? MUX : RTP);
710 mRtcp = TransportInfo(aRtcpTransport, rtcp_mux ? MUX : RTCP);
711 AttachTransport_s();
712 }
713 }
714
715 if (mFilter && aFilter) {
716 // Use the new filter, but don't forget any remote SSRCs that we've learned
717 // by receiving traffic.
718 mFilter->Update(*aFilter);
719 } else {
720 mFilter = aFilter;
721 }
722 }
723
AddRIDExtension_m(size_t aExtensionId)724 void MediaPipeline::AddRIDExtension_m(size_t aExtensionId) {
725 RUN_ON_THREAD(mStsThread,
726 WrapRunnable(RefPtr<MediaPipeline>(this),
727 &MediaPipeline::AddRIDExtension_s, aExtensionId),
728 NS_DISPATCH_NORMAL);
729 }
730
AddRIDExtension_s(size_t aExtensionId)731 void MediaPipeline::AddRIDExtension_s(size_t aExtensionId) {
732 mRtpParser->RegisterRtpHeaderExtension(webrtc::kRtpExtensionRtpStreamId,
733 aExtensionId);
734 }
735
AddRIDFilter_m(const std::string & aRid)736 void MediaPipeline::AddRIDFilter_m(const std::string& aRid) {
737 RUN_ON_THREAD(mStsThread,
738 WrapRunnable(RefPtr<MediaPipeline>(this),
739 &MediaPipeline::AddRIDFilter_s, aRid),
740 NS_DISPATCH_NORMAL);
741 }
742
AddRIDFilter_s(const std::string & aRid)743 void MediaPipeline::AddRIDFilter_s(const std::string& aRid) {
744 mFilter = new MediaPipelineFilter;
745 mFilter->AddRemoteRtpStreamId(aRid);
746 }
747
GetContributingSourceStats(const nsString & aInboundRtpStreamId,FallibleTArray<dom::RTCRTPContributingSourceStats> & aArr) const748 void MediaPipeline::GetContributingSourceStats(
749 const nsString& aInboundRtpStreamId,
750 FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
751 // Get the expiry from now
752 DOMHighResTimeStamp expiry = RtpCSRCStats::GetExpiryFromTime(GetNow());
753 for (auto info : mCsrcStats) {
754 if (!info.second.Expired(expiry)) {
755 RTCRTPContributingSourceStats stats;
756 info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
757 aArr.AppendElement(stats, fallible);
758 }
759 }
760 }
761
StateChange(TransportFlow * aFlow,TransportLayer::State aState)762 void MediaPipeline::StateChange(TransportFlow* aFlow,
763 TransportLayer::State aState) {
764 TransportInfo* info = GetTransportInfo_s(aFlow);
765 MOZ_ASSERT(info);
766
767 if (aState == TransportLayer::TS_OPEN) {
768 CSFLogInfo(LOGTAG, "Flow is ready");
769 TransportReady_s(*info);
770 } else if (aState == TransportLayer::TS_CLOSED ||
771 aState == TransportLayer::TS_ERROR) {
772 TransportFailed_s(*info);
773 }
774 }
775
MakeRtpTypeToStringArray(const char ** aArray)776 static bool MakeRtpTypeToStringArray(const char** aArray) {
777 static const char* RTP_str = "RTP";
778 static const char* RTCP_str = "RTCP";
779 static const char* MUX_str = "RTP/RTCP mux";
780 aArray[MediaPipeline::RTP] = RTP_str;
781 aArray[MediaPipeline::RTCP] = RTCP_str;
782 aArray[MediaPipeline::MUX] = MUX_str;
783 return true;
784 }
785
ToString(MediaPipeline::RtpType type)786 static const char* ToString(MediaPipeline::RtpType type) {
787 static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr};
788 // Dummy variable to cause init to happen only on first call
789 static bool dummy = MakeRtpTypeToStringArray(array);
790 (void)dummy;
791 return array[type];
792 }
793
TransportReady_s(TransportInfo & aInfo)794 nsresult MediaPipeline::TransportReady_s(TransportInfo& aInfo) {
795 // TODO(ekr@rtfm.com): implement some kind of notification on
796 // failure. bug 852665.
797 if (aInfo.mState != StateType::MP_CONNECTING) {
798 CSFLogError(LOGTAG, "Transport ready for flow in wrong state:%s :%s",
799 mDescription.c_str(), ToString(aInfo.mType));
800 return NS_ERROR_FAILURE;
801 }
802
803 CSFLogInfo(LOGTAG, "Transport ready for pipeline %p flow %s: %s", this,
804 mDescription.c_str(), ToString(aInfo.mType));
805
806 // TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure?
807 nsresult res;
808
809 // Now instantiate the SRTP objects
810 TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(
811 aInfo.mTransport->GetLayer(TransportLayerDtls::ID()));
812 MOZ_ASSERT(dtls); // DTLS is mandatory
813
814 uint16_t cipher_suite;
815 res = dtls->GetSrtpCipher(&cipher_suite);
816 if (NS_FAILED(res)) {
817 CSFLogError(LOGTAG, "Failed to negotiate DTLS-SRTP. This is an error");
818 aInfo.mState = StateType::MP_CLOSED;
819 UpdateRtcpMuxState(aInfo);
820 return res;
821 }
822
823 // SRTP Key Exporter as per RFC 5764 S 4.2
824 unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
825 res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "", srtp_block,
826 sizeof(srtp_block));
827 if (NS_FAILED(res)) {
828 CSFLogError(LOGTAG, "Failed to compute DTLS-SRTP keys. This is an error");
829 aInfo.mState = StateType::MP_CLOSED;
830 UpdateRtcpMuxState(aInfo);
831 MOZ_CRASH(); // TODO: Remove once we have enough field experience to
832 // know it doesn't happen. bug 798797. Note that the
833 // code after this never executes.
834 return res;
835 }
836
837 // Slice and dice as per RFC 5764 S 4.2
838 unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
839 unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH];
840 int offset = 0;
841 memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
842 offset += SRTP_MASTER_KEY_LENGTH;
843 memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
844 offset += SRTP_MASTER_KEY_LENGTH;
845 memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset,
846 SRTP_MASTER_SALT_LENGTH);
847 offset += SRTP_MASTER_SALT_LENGTH;
848 memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset,
849 SRTP_MASTER_SALT_LENGTH);
850 offset += SRTP_MASTER_SALT_LENGTH;
851 MOZ_ASSERT(offset == sizeof(srtp_block));
852
853 unsigned char* write_key;
854 unsigned char* read_key;
855
856 if (dtls->role() == TransportLayerDtls::CLIENT) {
857 write_key = client_write_key;
858 read_key = server_write_key;
859 } else {
860 write_key = server_write_key;
861 read_key = client_write_key;
862 }
863
864 MOZ_ASSERT(!aInfo.mSendSrtp && !aInfo.mRecvSrtp);
865 aInfo.mSendSrtp =
866 SrtpFlow::Create(cipher_suite, false, write_key, SRTP_TOTAL_KEY_LENGTH);
867 aInfo.mRecvSrtp =
868 SrtpFlow::Create(cipher_suite, true, read_key, SRTP_TOTAL_KEY_LENGTH);
869 if (!aInfo.mSendSrtp || !aInfo.mRecvSrtp) {
870 CSFLogError(LOGTAG, "Couldn't create SRTP flow for %s",
871 ToString(aInfo.mType));
872 aInfo.mState = StateType::MP_CLOSED;
873 UpdateRtcpMuxState(aInfo);
874 return NS_ERROR_FAILURE;
875 }
876
877 if (mDirection == DirectionType::RECEIVE) {
878 CSFLogInfo(LOGTAG, "Listening for %s packets received on %p",
879 ToString(aInfo.mType), dtls->downward());
880
881 switch (aInfo.mType) {
882 case RTP:
883 dtls->downward()->SignalPacketReceived.connect(
884 this, &MediaPipeline::RtpPacketReceived);
885 break;
886 case RTCP:
887 dtls->downward()->SignalPacketReceived.connect(
888 this, &MediaPipeline::RtcpPacketReceived);
889 break;
890 case MUX:
891 dtls->downward()->SignalPacketReceived.connect(
892 this, &MediaPipeline::PacketReceived);
893 break;
894 default:
895 MOZ_CRASH();
896 }
897 }
898
899 aInfo.mState = StateType::MP_OPEN;
900 UpdateRtcpMuxState(aInfo);
901 return NS_OK;
902 }
903
TransportFailed_s(TransportInfo & aInfo)904 nsresult MediaPipeline::TransportFailed_s(TransportInfo& aInfo) {
905 ASSERT_ON_THREAD(mStsThread);
906
907 aInfo.mState = StateType::MP_CLOSED;
908 UpdateRtcpMuxState(aInfo);
909
910 CSFLogInfo(LOGTAG, "Transport closed for flow %s", ToString(aInfo.mType));
911
912 NS_WARNING(
913 "MediaPipeline Transport failed. This is not properly cleaned up yet");
914
915 // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
916 // connection was good and now it is bad.
917 // TODO(ekr@rtfm.com): Report up so that the PC knows we
918 // have experienced an error.
919
920 return NS_OK;
921 }
922
UpdateRtcpMuxState(TransportInfo & aInfo)923 void MediaPipeline::UpdateRtcpMuxState(TransportInfo& aInfo) {
924 if (aInfo.mType == MUX) {
925 if (aInfo.mTransport == mRtcp.mTransport) {
926 mRtcp.mState = aInfo.mState;
927 if (!mRtcp.mSendSrtp) {
928 mRtcp.mSendSrtp = aInfo.mSendSrtp;
929 mRtcp.mRecvSrtp = aInfo.mRecvSrtp;
930 }
931 }
932 }
933 }
934
SendPacket(const TransportFlow * aFlow,const void * aData,int aLen)935 nsresult MediaPipeline::SendPacket(const TransportFlow* aFlow,
936 const void* aData, int aLen) {
937 ASSERT_ON_THREAD(mStsThread);
938
939 // Note that we bypass the DTLS layer here
940 TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(
941 aFlow->GetLayer(TransportLayerDtls::ID()));
942 MOZ_ASSERT(dtls);
943
944 TransportResult res = dtls->downward()->SendPacket(
945 static_cast<const unsigned char*>(aData), aLen);
946
947 if (res != aLen) {
948 // Ignore blocking indications
949 if (res == TE_WOULDBLOCK) return NS_OK;
950
951 CSFLogError(LOGTAG, "Failed write on stream %s", mDescription.c_str());
952 return NS_BASE_STREAM_CLOSED;
953 }
954
955 return NS_OK;
956 }
957
IncrementRtpPacketsSent(int32_t aBytes)958 void MediaPipeline::IncrementRtpPacketsSent(int32_t aBytes) {
959 ++mRtpPacketsSent;
960 mRtpBytesSent += aBytes;
961
962 if (!(mRtpPacketsSent % 100)) {
963 CSFLogInfo(LOGTAG,
964 "RTP sent packet count for %s Pipeline %p Flow: %p: %u (%" PRId64
965 " bytes)",
966 mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
967 mRtpPacketsSent, mRtpBytesSent);
968 }
969 }
970
IncrementRtcpPacketsSent()971 void MediaPipeline::IncrementRtcpPacketsSent() {
972 ++mRtcpPacketsSent;
973 if (!(mRtcpPacketsSent % 100)) {
974 CSFLogInfo(LOGTAG, "RTCP sent packet count for %s Pipeline %p Flow: %p: %u",
975 mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
976 mRtcpPacketsSent);
977 }
978 }
979
IncrementRtpPacketsReceived(int32_t aBytes)980 void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
981 ++mRtpPacketsReceived;
982 mRtpBytesReceived += aBytes;
983 if (!(mRtpPacketsReceived % 100)) {
984 CSFLogInfo(
985 LOGTAG,
986 "RTP received packet count for %s Pipeline %p Flow: %p: %u (%" PRId64
987 " bytes)",
988 mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
989 mRtpPacketsReceived, mRtpBytesReceived);
990 }
991 }
992
IncrementRtcpPacketsReceived()993 void MediaPipeline::IncrementRtcpPacketsReceived() {
994 ++mRtcpPacketsReceived;
995 if (!(mRtcpPacketsReceived % 100)) {
996 CSFLogInfo(LOGTAG,
997 "RTCP received packet count for %s Pipeline %p Flow: %p: %u",
998 mDescription.c_str(), this, static_cast<void*>(mRtp.mTransport),
999 mRtcpPacketsReceived);
1000 }
1001 }
1002
RtpPacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1003 void MediaPipeline::RtpPacketReceived(TransportLayer* aLayer,
1004 const unsigned char* aData, size_t aLen) {
1005 if (mDirection == DirectionType::TRANSMIT) {
1006 return;
1007 }
1008
1009 if (!mTransport->Pipeline()) {
1010 CSFLogError(LOGTAG, "Discarding incoming packet; transport disconnected");
1011 return;
1012 }
1013
1014 if (!mConduit) {
1015 CSFLogDebug(LOGTAG, "Discarding incoming packet; media disconnected");
1016 return;
1017 }
1018
1019 if (mRtp.mState != StateType::MP_OPEN) {
1020 CSFLogError(LOGTAG, "Discarding incoming packet; pipeline not open");
1021 return;
1022 }
1023
1024 if (mRtp.mTransport->state() != TransportLayer::TS_OPEN) {
1025 CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
1026 return;
1027 }
1028
1029 // This should never happen.
1030 MOZ_ASSERT(mRtp.mRecvSrtp);
1031
1032 if (!aLen) {
1033 return;
1034 }
1035
1036 // Filter out everything but RTP/RTCP
1037 if (aData[0] < 128 || aData[0] > 191) {
1038 return;
1039 }
1040
1041 webrtc::RTPHeader header;
1042 if (!mRtpParser->Parse(aData, aLen, &header, true)) {
1043 return;
1044 }
1045
1046 if (mFilter && !mFilter->Filter(header)) {
1047 return;
1048 }
1049
1050 // Make sure to only get the time once, and only if we need it by
1051 // using getTimestamp() for access
1052 DOMHighResTimeStamp now = 0.0;
1053 bool hasTime = false;
1054
1055 // Remove expired RtpCSRCStats
1056 if (!mCsrcStats.empty()) {
1057 if (!hasTime) {
1058 now = GetNow();
1059 hasTime = true;
1060 }
1061 auto expiry = RtpCSRCStats::GetExpiryFromTime(now);
1062 for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
1063 if (p->second.Expired(expiry)) {
1064 p = mCsrcStats.erase(p);
1065 continue;
1066 }
1067 p++;
1068 }
1069 }
1070
1071 // Add new RtpCSRCStats
1072 if (header.numCSRCs) {
1073 for (auto i = 0; i < header.numCSRCs; i++) {
1074 if (!hasTime) {
1075 now = GetNow();
1076 hasTime = true;
1077 }
1078 auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
1079 if (csrcInfo == mCsrcStats.end()) {
1080 mCsrcStats.insert(std::make_pair(
1081 header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now)));
1082 } else {
1083 csrcInfo->second.SetTimestamp(now);
1084 }
1085 }
1086 }
1087
1088 mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false, aData, aLen);
1089
1090 // Make a copy rather than cast away constness
1091 auto innerData = MakeUnique<unsigned char[]>(aLen);
1092 memcpy(innerData.get(), aData, aLen);
1093 int outLen = 0;
1094 nsresult res =
1095 mRtp.mRecvSrtp->UnprotectRtp(innerData.get(), aLen, aLen, &outLen);
1096 if (!NS_SUCCEEDED(res)) {
1097 char tmp[16];
1098
1099 SprintfLiteral(tmp, "%.2x %.2x %.2x %.2x", innerData[0], innerData[1],
1100 innerData[2], innerData[3]);
1101
1102 CSFLogError(LOGTAG, "Error unprotecting RTP in %s len= %zu [%s]",
1103 mDescription.c_str(), aLen, tmp);
1104 return;
1105 }
1106 CSFLogDebug(LOGTAG, "%s received RTP packet.", mDescription.c_str());
1107 IncrementRtpPacketsReceived(outLen);
1108 OnRtpPacketReceived();
1109
1110 RtpLogger::LogPacket(innerData.get(), outLen, true, true, header.headerLength,
1111 mDescription);
1112
1113 mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false,
1114 innerData.get(), outLen);
1115
1116 (void)mConduit->ReceivedRTPPacket(innerData.get(), outLen,
1117 header.ssrc); // Ignore error codes
1118 }
1119
RtcpPacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1120 void MediaPipeline::RtcpPacketReceived(TransportLayer* aLayer,
1121 const unsigned char* aData,
1122 size_t aLen) {
1123 if (!mTransport->Pipeline()) {
1124 CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
1125 return;
1126 }
1127
1128 if (!mConduit) {
1129 CSFLogDebug(LOGTAG, "Discarding incoming packet; media disconnected");
1130 return;
1131 }
1132
1133 if (mRtcp.mState != StateType::MP_OPEN) {
1134 CSFLogDebug(LOGTAG, "Discarding incoming packet; pipeline not open");
1135 return;
1136 }
1137
1138 if (mRtcp.mTransport->state() != TransportLayer::TS_OPEN) {
1139 CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
1140 return;
1141 }
1142
1143 if (!aLen) {
1144 return;
1145 }
1146
1147 // Filter out everything but RTP/RTCP
1148 if (aData[0] < 128 || aData[0] > 191) {
1149 return;
1150 }
1151
1152 // We do not filter receiver reports, since the webrtc.org code for
1153 // senders already has logic to ignore RRs that do not apply.
1154 // TODO bug 1279153: remove SR check for reduced size RTCP
1155 if (mFilter && !mFilter->FilterSenderReport(aData, aLen)) {
1156 CSFLogWarn(LOGTAG, "Dropping incoming RTCP packet; filtered out");
1157 return;
1158 }
1159
1160 mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false, aData,
1161 aLen);
1162
1163 // Make a copy rather than cast away constness
1164 auto innerData = MakeUnique<unsigned char[]>(aLen);
1165 memcpy(innerData.get(), aData, aLen);
1166 int outLen;
1167
1168 nsresult res =
1169 mRtcp.mRecvSrtp->UnprotectRtcp(innerData.get(), aLen, aLen, &outLen);
1170
1171 if (!NS_SUCCEEDED(res)) return;
1172
1173 CSFLogDebug(LOGTAG, "%s received RTCP packet.", mDescription.c_str());
1174 IncrementRtcpPacketsReceived();
1175
1176 RtpLogger::LogPacket(innerData.get(), outLen, true, false, 0, mDescription);
1177
1178 mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false, aData, aLen);
1179
1180 MOZ_ASSERT(mRtcp.mRecvSrtp); // This should never happen
1181
1182 (void)mConduit->ReceivedRTCPPacket(innerData.get(),
1183 outLen); // Ignore error codes
1184 }
1185
IsRtp(const unsigned char * aData,size_t aLen) const1186 bool MediaPipeline::IsRtp(const unsigned char* aData, size_t aLen) const {
1187 if (aLen < 2) return false;
1188
1189 // Check if this is a RTCP packet. Logic based on the types listed in
1190 // media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc
1191
1192 // Anything outside this range is RTP.
1193 if ((aData[1] < 192) || (aData[1] > 207)) return true;
1194
1195 if (aData[1] == 192) // FIR
1196 return false;
1197
1198 if (aData[1] == 193) // NACK, but could also be RTP. This makes us sad
1199 return true; // but it's how webrtc.org behaves.
1200
1201 if (aData[1] == 194) return true;
1202
1203 if (aData[1] == 195) // IJ.
1204 return false;
1205
1206 if ((aData[1] > 195) && (aData[1] < 200)) // the > 195 is redundant
1207 return true;
1208
1209 if ((aData[1] >= 200) && (aData[1] <= 207)) // SR, RR, SDES, BYE,
1210 return false; // APP, RTPFB, PSFB, XR
1211
1212 MOZ_ASSERT(false); // Not reached, belt and suspenders.
1213 return true;
1214 }
1215
PacketReceived(TransportLayer * aLayer,const unsigned char * aData,size_t aLen)1216 void MediaPipeline::PacketReceived(TransportLayer* aLayer,
1217 const unsigned char* aData, size_t aLen) {
1218 if (!mTransport->Pipeline()) {
1219 CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
1220 return;
1221 }
1222
1223 if (IsRtp(aData, aLen)) {
1224 RtpPacketReceived(aLayer, aData, aLen);
1225 } else {
1226 RtcpPacketReceived(aLayer, aData, aLen);
1227 }
1228 }
1229
1230 class MediaPipelineTransmit::PipelineListener : public MediaStreamVideoSink {
1231 friend class MediaPipelineTransmit;
1232
1233 public:
PipelineListener(const RefPtr<MediaSessionConduit> & aConduit)1234 explicit PipelineListener(const RefPtr<MediaSessionConduit>& aConduit)
1235 : mConduit(aConduit),
1236 mActive(false),
1237 mEnabled(false),
1238 mDirectConnect(false) {}
1239
~PipelineListener()1240 ~PipelineListener() {
1241 NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
1242 mConduit.forget());
1243 if (mConverter) {
1244 mConverter->Shutdown();
1245 }
1246 }
1247
SetActive(bool aActive)1248 void SetActive(bool aActive) { mActive = aActive; }
SetEnabled(bool aEnabled)1249 void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
1250
1251 // These are needed since nested classes don't have access to any particular
1252 // instance of the parent
SetAudioProxy(const RefPtr<AudioProxyThread> & aProxy)1253 void SetAudioProxy(const RefPtr<AudioProxyThread>& aProxy) {
1254 mAudioProcessing = aProxy;
1255 }
1256
SetVideoFrameConverter(const RefPtr<VideoFrameConverter> & aConverter)1257 void SetVideoFrameConverter(const RefPtr<VideoFrameConverter>& aConverter) {
1258 mConverter = aConverter;
1259 }
1260
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)1261 void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
1262 MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
1263 static_cast<VideoSessionConduit*>(mConduit.get())
1264 ->SendVideoFrame(aVideoFrame);
1265 }
1266
1267 // Implement MediaStreamTrackListener
1268 void NotifyQueuedChanges(MediaStreamGraph* aGraph, StreamTime aTrackOffset,
1269 const MediaSegment& aQueuedMedia) override;
1270
1271 // Implement DirectMediaStreamTrackListener
1272 void NotifyRealtimeTrackData(MediaStreamGraph* aGraph,
1273 StreamTime aTrackOffset,
1274 const MediaSegment& aMedia) override;
1275 void NotifyDirectListenerInstalled(InstallationResult aResult) override;
1276 void NotifyDirectListenerUninstalled() override;
1277
1278 // Implement MediaStreamVideoSink
1279 void SetCurrentFrames(const VideoSegment& aSegment) override;
ClearFrames()1280 void ClearFrames() override {}
1281
1282 private:
1283 void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
1284
1285 RefPtr<MediaSessionConduit> mConduit;
1286 RefPtr<AudioProxyThread> mAudioProcessing;
1287 RefPtr<VideoFrameConverter> mConverter;
1288
1289 // active is true if there is a transport to send on
1290 mozilla::Atomic<bool> mActive;
1291 // enabled is true if the media access control permits sending
1292 // actual content; when false you get black/silence
1293 mozilla::Atomic<bool> mEnabled;
1294
1295 // Written and read on the MediaStreamGraph thread
1296 bool mDirectConnect;
1297 };
1298
1299 // Implements VideoConverterListener for MediaPipeline.
1300 //
1301 // We pass converted frames on to MediaPipelineTransmit::PipelineListener
1302 // where they are further forwarded to VideoConduit.
1303 // MediaPipelineTransmit calls Detach() during shutdown to ensure there is
1304 // no cyclic dependencies between us and PipelineListener.
1305 class MediaPipelineTransmit::VideoFrameFeeder : public VideoConverterListener {
1306 public:
VideoFrameFeeder(const RefPtr<PipelineListener> & aListener)1307 explicit VideoFrameFeeder(const RefPtr<PipelineListener>& aListener)
1308 : mMutex("VideoFrameFeeder"), mListener(aListener) {
1309 MOZ_COUNT_CTOR(VideoFrameFeeder);
1310 }
1311
Detach()1312 void Detach() {
1313 MutexAutoLock lock(mMutex);
1314
1315 mListener = nullptr;
1316 }
1317
OnVideoFrameConverted(const webrtc::VideoFrame & aVideoFrame)1318 void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) override {
1319 MutexAutoLock lock(mMutex);
1320
1321 if (!mListener) {
1322 return;
1323 }
1324
1325 mListener->OnVideoFrameConverted(aVideoFrame);
1326 }
1327
1328 protected:
~VideoFrameFeeder()1329 virtual ~VideoFrameFeeder() { MOZ_COUNT_DTOR(VideoFrameFeeder); }
1330
1331 Mutex mMutex; // Protects the member below.
1332 RefPtr<PipelineListener> mListener;
1333 };
1334
MediaPipelineTransmit(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,bool aIsVideo,dom::MediaStreamTrack * aDomTrack,RefPtr<MediaSessionConduit> aConduit)1335 MediaPipelineTransmit::MediaPipelineTransmit(
1336 const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
1337 nsCOMPtr<nsIEventTarget> aStsThread, bool aIsVideo,
1338 dom::MediaStreamTrack* aDomTrack, RefPtr<MediaSessionConduit> aConduit)
1339 : MediaPipeline(aPc, DirectionType::TRANSMIT, aMainThread, aStsThread,
1340 aConduit),
1341 mIsVideo(aIsVideo),
1342 mListener(new PipelineListener(aConduit)),
1343 mFeeder(aIsVideo ? MakeAndAddRef<VideoFrameFeeder>(mListener)
1344 : nullptr) // For video we send frames to an
1345 // async VideoFrameConverter that
1346 // calls back to a VideoFrameFeeder
1347 // that feeds I420 frames to
1348 // VideoConduit.
1349 ,
1350 mDomTrack(aDomTrack),
1351 mTransmitting(false) {
1352 SetDescription();
1353 if (!IsVideo()) {
1354 mAudioProcessing = MakeAndAddRef<AudioProxyThread>(
1355 static_cast<AudioSessionConduit*>(aConduit.get()));
1356 mListener->SetAudioProxy(mAudioProcessing);
1357 } else { // Video
1358 mConverter = MakeAndAddRef<VideoFrameConverter>();
1359 mConverter->AddListener(mFeeder);
1360 mListener->SetVideoFrameConverter(mConverter);
1361 }
1362 }
1363
~MediaPipelineTransmit()1364 MediaPipelineTransmit::~MediaPipelineTransmit() {
1365 if (mFeeder) {
1366 mFeeder->Detach();
1367 }
1368
1369 MOZ_ASSERT(!mDomTrack);
1370 }
1371
SetDescription_s(const std::string & description)1372 void MediaPipeline::SetDescription_s(const std::string& description) {
1373 mDescription = description;
1374 }
1375
SetDescription()1376 void MediaPipelineTransmit::SetDescription() {
1377 std::string description;
1378 description = mPc + "| ";
1379 description += mConduit->type() == MediaSessionConduit::AUDIO
1380 ? "Transmit audio["
1381 : "Transmit video[";
1382
1383 if (!mDomTrack) {
1384 description += "no track]";
1385 return;
1386 }
1387
1388 nsString nsTrackId;
1389 mDomTrack->GetId(nsTrackId);
1390 std::string trackId(NS_ConvertUTF16toUTF8(nsTrackId).get());
1391 description += trackId;
1392 description += "]";
1393
1394 RUN_ON_THREAD(
1395 mStsThread,
1396 WrapRunnable(RefPtr<MediaPipeline>(this),
1397 &MediaPipelineTransmit::SetDescription_s, description),
1398 NS_DISPATCH_NORMAL);
1399 }
1400
Stop()1401 void MediaPipelineTransmit::Stop() {
1402 ASSERT_ON_THREAD(mMainThread);
1403
1404 if (!mDomTrack || !mTransmitting) {
1405 return;
1406 }
1407
1408 mTransmitting = false;
1409
1410 if (mDomTrack->AsAudioStreamTrack()) {
1411 mDomTrack->RemoveDirectListener(mListener);
1412 mDomTrack->RemoveListener(mListener);
1413 } else if (VideoStreamTrack* video = mDomTrack->AsVideoStreamTrack()) {
1414 video->RemoveVideoOutput(mListener);
1415 } else {
1416 MOZ_ASSERT(false, "Unknown track type");
1417 }
1418
1419 mConduit->StopTransmitting();
1420 }
1421
Start()1422 void MediaPipelineTransmit::Start() {
1423 ASSERT_ON_THREAD(mMainThread);
1424
1425 if (!mDomTrack || mTransmitting) {
1426 return;
1427 }
1428
1429 mTransmitting = true;
1430
1431 mConduit->StartTransmitting();
1432
1433 // TODO(ekr@rtfm.com): Check for errors
1434 CSFLogDebug(
1435 LOGTAG, "Attaching pipeline to track %p conduit type=%s", this,
1436 (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video"));
1437
1438 #if !defined(MOZILLA_EXTERNAL_LINKAGE)
1439 // With full duplex we don't risk having audio come in late to the MSG
1440 // so we won't need a direct listener.
1441 const bool enableDirectListener =
1442 !Preferences::GetBool("media.navigator.audio.full_duplex", false);
1443 #else
1444 const bool enableDirectListener = true;
1445 #endif
1446
1447 if (mDomTrack->AsAudioStreamTrack()) {
1448 if (enableDirectListener) {
1449 // Register the Listener directly with the source if we can.
1450 // We also register it as a non-direct listener so we fall back to that
1451 // if installing the direct listener fails. As a direct listener we get
1452 // access to direct unqueued (and not resampled) data.
1453 mDomTrack->AddDirectListener(mListener);
1454 }
1455 mDomTrack->AddListener(mListener);
1456 } else if (VideoStreamTrack* video = mDomTrack->AsVideoStreamTrack()) {
1457 video->AddVideoOutput(mListener);
1458 } else {
1459 MOZ_ASSERT(false, "Unknown track type");
1460 }
1461 }
1462
IsVideo() const1463 bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
1464
UpdateSinkIdentity_m(const MediaStreamTrack * aTrack,nsIPrincipal * aPrincipal,const PeerIdentity * aSinkIdentity)1465 void MediaPipelineTransmit::UpdateSinkIdentity_m(
1466 const MediaStreamTrack* aTrack, nsIPrincipal* aPrincipal,
1467 const PeerIdentity* aSinkIdentity) {
1468 ASSERT_ON_THREAD(mMainThread);
1469
1470 if (aTrack != nullptr && aTrack != mDomTrack) {
1471 // If a track is specified, then it might not be for this pipeline,
1472 // since we receive notifications for all tracks on the PC.
1473 // nullptr means that the PeerIdentity has changed and shall be applied
1474 // to all tracks of the PC.
1475 return;
1476 }
1477
1478 bool enableTrack = aPrincipal->Subsumes(mDomTrack->GetPrincipal());
1479 if (!enableTrack) {
1480 // first try didn't work, but there's a chance that this is still available
1481 // if our track is bound to a peerIdentity, and the peer connection (our
1482 // sink) is bound to the same identity, then we can enable the track.
1483 const PeerIdentity* trackIdentity = mDomTrack->GetPeerIdentity();
1484 if (aSinkIdentity && trackIdentity) {
1485 enableTrack = (*aSinkIdentity == *trackIdentity);
1486 }
1487 }
1488
1489 mListener->SetEnabled(enableTrack);
1490 }
1491
DetachMedia()1492 void MediaPipelineTransmit::DetachMedia() {
1493 ASSERT_ON_THREAD(mMainThread);
1494 mDomTrack = nullptr;
1495 // Let the listener be destroyed with the pipeline (or later).
1496 }
1497
TransportReady_s(TransportInfo & aInfo)1498 nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo& aInfo) {
1499 ASSERT_ON_THREAD(mStsThread);
1500 // Call base ready function.
1501 MediaPipeline::TransportReady_s(aInfo);
1502
1503 // Should not be set for a transmitter
1504 if (&aInfo == &mRtp) {
1505 mListener->SetActive(true);
1506 }
1507
1508 return NS_OK;
1509 }
1510
ReplaceTrack(RefPtr<MediaStreamTrack> & aDomTrack)1511 nsresult MediaPipelineTransmit::ReplaceTrack(
1512 RefPtr<MediaStreamTrack>& aDomTrack) {
1513 // MainThread, checked in calls we make
1514 if (aDomTrack) {
1515 nsString nsTrackId;
1516 aDomTrack->GetId(nsTrackId);
1517 std::string track_id(NS_ConvertUTF16toUTF8(nsTrackId).get());
1518 CSFLogDebug(
1519 LOGTAG, "Reattaching pipeline to track %p track %s conduit type: %s",
1520 &aDomTrack, track_id.c_str(),
1521 (mConduit->type() == MediaSessionConduit::AUDIO ? "audio" : "video"));
1522 }
1523
1524 RefPtr<dom::MediaStreamTrack> oldTrack = mDomTrack;
1525 bool wasTransmitting = oldTrack && mTransmitting;
1526 Stop();
1527 mDomTrack = aDomTrack;
1528 SetDescription();
1529
1530 if (wasTransmitting) {
1531 Start();
1532 }
1533 return NS_OK;
1534 }
1535
ConnectTransport_s(TransportInfo & aInfo)1536 nsresult MediaPipeline::ConnectTransport_s(TransportInfo& aInfo) {
1537 MOZ_ASSERT(aInfo.mTransport);
1538 ASSERT_ON_THREAD(mStsThread);
1539
1540 // Look to see if the transport is ready
1541 if (aInfo.mTransport->state() == TransportLayer::TS_OPEN) {
1542 nsresult res = TransportReady_s(aInfo);
1543 if (NS_FAILED(res)) {
1544 CSFLogError(LOGTAG, "Error calling TransportReady(); res=%u in %s",
1545 static_cast<uint32_t>(res), __FUNCTION__);
1546 return res;
1547 }
1548 } else if (aInfo.mTransport->state() == TransportLayer::TS_ERROR) {
1549 CSFLogError(LOGTAG, "%s transport is already in error state",
1550 ToString(aInfo.mType));
1551 TransportFailed_s(aInfo);
1552 return NS_ERROR_FAILURE;
1553 }
1554
1555 aInfo.mTransport->SignalStateChange.connect(this,
1556 &MediaPipeline::StateChange);
1557
1558 return NS_OK;
1559 }
1560
GetTransportInfo_s(TransportFlow * aFlow)1561 MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s(
1562 TransportFlow* aFlow) {
1563 ASSERT_ON_THREAD(mStsThread);
1564 if (aFlow == mRtp.mTransport) {
1565 return &mRtp;
1566 }
1567
1568 if (aFlow == mRtcp.mTransport) {
1569 return &mRtcp;
1570 }
1571
1572 return nullptr;
1573 }
1574
SendRtpPacket(const uint8_t * aData,size_t aLen)1575 nsresult MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData,
1576 size_t aLen) {
1577 nsAutoPtr<DataBuffer> buf(
1578 new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
1579
1580 RUN_ON_THREAD(
1581 mStsThread,
1582 WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
1583 &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf,
1584 true),
1585 NS_DISPATCH_NORMAL);
1586
1587 return NS_OK;
1588 }
1589
SendRtpRtcpPacket_s(nsAutoPtr<DataBuffer> aData,bool aIsRtp)1590 nsresult MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s(
1591 nsAutoPtr<DataBuffer> aData, bool aIsRtp) {
1592 ASSERT_ON_THREAD(mStsThread);
1593 if (!mPipeline) {
1594 return NS_OK; // Detached
1595 }
1596 TransportInfo& transport = aIsRtp ? mPipeline->mRtp : mPipeline->mRtcp;
1597
1598 if (!transport.mSendSrtp) {
1599 CSFLogDebug(LOGTAG, "Couldn't write RTP/RTCP packet; SRTP not set up yet");
1600 return NS_OK;
1601 }
1602
1603 MOZ_ASSERT(transport.mTransport);
1604 NS_ENSURE_TRUE(transport.mTransport, NS_ERROR_NULL_POINTER);
1605
1606 // libsrtp enciphers in place, so we need a big enough buffer.
1607 MOZ_ASSERT(aData->capacity() >= aData->len() + SRTP_MAX_EXPANSION);
1608
1609 if (RtpLogger::IsPacketLoggingOn()) {
1610 int headerLen = 12;
1611 webrtc::RTPHeader header;
1612 if (mPipeline->mRtpParser &&
1613 mPipeline->mRtpParser->Parse(aData->data(), aData->len(), &header)) {
1614 headerLen = header.headerLength;
1615 }
1616 RtpLogger::LogPacket(aData->data(), aData->len(), false, aIsRtp, headerLen,
1617 mPipeline->mDescription);
1618 }
1619
1620 int out_len;
1621 nsresult res;
1622 if (aIsRtp) {
1623 mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1624 dom::mozPacketDumpType::Rtp, true,
1625 aData->data(), aData->len());
1626
1627 res = transport.mSendSrtp->ProtectRtp(aData->data(), aData->len(),
1628 aData->capacity(), &out_len);
1629 } else {
1630 mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1631 dom::mozPacketDumpType::Rtcp, true,
1632 aData->data(), aData->len());
1633
1634 res = transport.mSendSrtp->ProtectRtcp(aData->data(), aData->len(),
1635 aData->capacity(), &out_len);
1636 }
1637 if (!NS_SUCCEEDED(res)) {
1638 return res;
1639 }
1640
1641 // paranoia; don't have uninitialized bytes included in data->len()
1642 aData->SetLength(out_len);
1643
1644 CSFLogDebug(LOGTAG, "%s sending %s packet", mPipeline->mDescription.c_str(),
1645 (aIsRtp ? "RTP" : "RTCP"));
1646 if (aIsRtp) {
1647 mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1648 dom::mozPacketDumpType::Srtp, true,
1649 aData->data(), out_len);
1650
1651 mPipeline->IncrementRtpPacketsSent(out_len);
1652 } else {
1653 mPipeline->mPacketDumper->Dump(mPipeline->Level(),
1654 dom::mozPacketDumpType::Srtcp, true,
1655 aData->data(), out_len);
1656
1657 mPipeline->IncrementRtcpPacketsSent();
1658 }
1659 return mPipeline->SendPacket(transport.mTransport, aData->data(), out_len);
1660 }
1661
SendRtcpPacket(const uint8_t * aData,size_t aLen)1662 nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(const uint8_t* aData,
1663 size_t aLen) {
1664 nsAutoPtr<DataBuffer> buf(
1665 new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
1666
1667 RUN_ON_THREAD(
1668 mStsThread,
1669 WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
1670 &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf,
1671 false),
1672 NS_DISPATCH_NORMAL);
1673
1674 return NS_OK;
1675 }
1676
1677 // Called if we're attached with AddDirectListener()
NotifyRealtimeTrackData(MediaStreamGraph * aGraph,StreamTime aOffset,const MediaSegment & aMedia)1678 void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
1679 MediaStreamGraph* aGraph, StreamTime aOffset, const MediaSegment& aMedia) {
1680 CSFLogDebug(
1681 LOGTAG,
1682 "MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
1683 ", duration=%" PRId64,
1684 this, aOffset, aMedia.GetDuration());
1685
1686 if (aMedia.GetType() == MediaSegment::VIDEO) {
1687 // We have to call the upstream NotifyRealtimeTrackData and
1688 // MediaStreamVideoSink will route them to SetCurrentFrames.
1689 MediaStreamVideoSink::NotifyRealtimeTrackData(aGraph, aOffset, aMedia);
1690 return;
1691 }
1692
1693 NewData(aMedia, aGraph->GraphRate());
1694 }
1695
NotifyQueuedChanges(MediaStreamGraph * aGraph,StreamTime aOffset,const MediaSegment & aQueuedMedia)1696 void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
1697 MediaStreamGraph* aGraph, StreamTime aOffset,
1698 const MediaSegment& aQueuedMedia) {
1699 CSFLogDebug(LOGTAG, "MediaPipeline::NotifyQueuedChanges()");
1700
1701 if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
1702 // We always get video from SetCurrentFrames().
1703 return;
1704 }
1705
1706 if (mDirectConnect) {
1707 // ignore non-direct data if we're also getting direct data
1708 return;
1709 }
1710
1711 size_t rate;
1712 if (aGraph) {
1713 rate = aGraph->GraphRate();
1714 } else {
1715 // When running tests, graph may be null. In that case use a default.
1716 rate = 16000;
1717 }
1718 NewData(aQueuedMedia, rate);
1719 }
1720
NotifyDirectListenerInstalled(InstallationResult aResult)1721 void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
1722 InstallationResult aResult) {
1723 CSFLogInfo(
1724 LOGTAG,
1725 "MediaPipeline::NotifyDirectListenerInstalled() listener=%p, result=%d",
1726 this, static_cast<int32_t>(aResult));
1727
1728 mDirectConnect = InstallationResult::SUCCESS == aResult;
1729 }
1730
1731 void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerUninstalled()1732 NotifyDirectListenerUninstalled() {
1733 CSFLogInfo(LOGTAG,
1734 "MediaPipeline::NotifyDirectListenerUninstalled() listener=%p",
1735 this);
1736
1737 mDirectConnect = false;
1738 }
1739
NewData(const MediaSegment & aMedia,TrackRate aRate)1740 void MediaPipelineTransmit::PipelineListener::NewData(
1741 const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
1742 if (!mActive) {
1743 CSFLogDebug(LOGTAG, "Discarding packets because transport not ready");
1744 return;
1745 }
1746
1747 if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
1748 ? MediaSessionConduit::AUDIO
1749 : MediaSessionConduit::VIDEO)) {
1750 MOZ_ASSERT(false,
1751 "The media type should always be correct since the "
1752 "listener is locked to a specific track");
1753 return;
1754 }
1755
1756 // TODO(ekr@rtfm.com): For now assume that we have only one
1757 // track type and it's destined for us
1758 // See bug 784517
1759 if (aMedia.GetType() == MediaSegment::AUDIO) {
1760 MOZ_RELEASE_ASSERT(aRate > 0);
1761
1762 const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
1763 for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
1764 iter.Next()) {
1765 mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
1766 }
1767 } else {
1768 const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
1769 for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
1770 iter.Next()) {
1771 mConverter->QueueVideoChunk(*iter, !mEnabled);
1772 }
1773 }
1774 }
1775
SetCurrentFrames(const VideoSegment & aSegment)1776 void MediaPipelineTransmit::PipelineListener::SetCurrentFrames(
1777 const VideoSegment& aSegment) {
1778 NewData(aSegment);
1779 }
1780
1781 class GenericReceiveListener : public MediaStreamListener {
1782 public:
GenericReceiveListener(dom::MediaStreamTrack * aTrack)1783 explicit GenericReceiveListener(dom::MediaStreamTrack* aTrack)
1784 : mTrack(aTrack),
1785 mTrackId(aTrack->GetInputTrackId()),
1786 mSource(mTrack->GetInputStream()->AsSourceStream()),
1787 mPlayedTicks(0),
1788 mPrincipalHandle(PRINCIPAL_HANDLE_NONE),
1789 mListening(false),
1790 mMaybeTrackNeedsUnmute(true) {
1791 MOZ_RELEASE_ASSERT(mSource, "Must be used with a SourceMediaStream");
1792 }
1793
~GenericReceiveListener()1794 virtual ~GenericReceiveListener() {
1795 NS_ReleaseOnMainThreadSystemGroup("GenericReceiveListener::track_",
1796 mTrack.forget());
1797 }
1798
AddTrackToSource(uint32_t aRate=0)1799 void AddTrackToSource(uint32_t aRate = 0) {
1800 MOZ_ASSERT((aRate != 0 && mTrack->AsAudioStreamTrack()) ||
1801 mTrack->AsVideoStreamTrack());
1802
1803 if (mTrack->AsAudioStreamTrack()) {
1804 mSource->AddAudioTrack(mTrackId, aRate, 0, new AudioSegment());
1805 } else if (mTrack->AsVideoStreamTrack()) {
1806 mSource->AddTrack(mTrackId, 0, new VideoSegment());
1807 }
1808 CSFLogDebug(LOGTAG,
1809 "GenericReceiveListener added %s track %d (%p) to stream %p",
1810 mTrack->AsAudioStreamTrack() ? "audio" : "video", mTrackId,
1811 mTrack.get(), mSource.get());
1812
1813 mSource->AdvanceKnownTracksTime(STREAM_TIME_MAX);
1814 mSource->AddListener(this);
1815 }
1816
AddSelf()1817 void AddSelf() {
1818 if (!mListening) {
1819 mListening = true;
1820 mSource->SetPullEnabled(true);
1821 mMaybeTrackNeedsUnmute = true;
1822 }
1823 }
1824
RemoveSelf()1825 void RemoveSelf() {
1826 if (mListening) {
1827 mListening = false;
1828 mSource->SetPullEnabled(false);
1829 }
1830 }
1831
OnRtpReceived()1832 void OnRtpReceived() {
1833 if (mMaybeTrackNeedsUnmute) {
1834 mMaybeTrackNeedsUnmute = false;
1835 NS_DispatchToMainThread(
1836 NewRunnableMethod("GenericReceiveListener::OnRtpReceived_m", this,
1837 &GenericReceiveListener::OnRtpReceived_m));
1838 }
1839 }
1840
OnRtpReceived_m()1841 void OnRtpReceived_m() {
1842 if (mListening && mTrack->Muted()) {
1843 mTrack->MutedChanged(false);
1844 }
1845 }
1846
EndTrack()1847 void EndTrack() {
1848 CSFLogDebug(LOGTAG, "GenericReceiveListener ending track");
1849
1850 // This breaks the cycle with the SourceMediaStream
1851 mSource->RemoveListener(this);
1852 mSource->EndTrack(mTrackId);
1853 }
1854
1855 // Must be called on the main thread
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)1856 void SetPrincipalHandle_m(const PrincipalHandle& aPrincipalHandle) {
1857 class Message : public ControlMessage {
1858 public:
1859 Message(GenericReceiveListener* aListener,
1860 const PrincipalHandle& aPrincipalHandle)
1861 : ControlMessage(nullptr),
1862 mListener(aListener),
1863 mPrincipalHandle(aPrincipalHandle) {}
1864
1865 void Run() override {
1866 mListener->SetPrincipalHandle_msg(mPrincipalHandle);
1867 }
1868
1869 const RefPtr<GenericReceiveListener> mListener;
1870 PrincipalHandle mPrincipalHandle;
1871 };
1872
1873 mTrack->GraphImpl()->AppendMessage(
1874 MakeUnique<Message>(this, aPrincipalHandle));
1875 }
1876
1877 // Must be called on the MediaStreamGraph thread
SetPrincipalHandle_msg(const PrincipalHandle & aPrincipalHandle)1878 void SetPrincipalHandle_msg(const PrincipalHandle& aPrincipalHandle) {
1879 mPrincipalHandle = aPrincipalHandle;
1880 }
1881
1882 protected:
1883 RefPtr<dom::MediaStreamTrack> mTrack;
1884 const TrackID mTrackId;
1885 const RefPtr<SourceMediaStream> mSource;
1886 TrackTicks mPlayedTicks;
1887 PrincipalHandle mPrincipalHandle;
1888 bool mListening;
1889 Atomic<bool> mMaybeTrackNeedsUnmute;
1890 };
1891
MediaPipelineReceive(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<MediaSessionConduit> aConduit)1892 MediaPipelineReceive::MediaPipelineReceive(const std::string& aPc,
1893 nsCOMPtr<nsIEventTarget> aMainThread,
1894 nsCOMPtr<nsIEventTarget> aStsThread,
1895 RefPtr<MediaSessionConduit> aConduit)
1896 : MediaPipeline(aPc, DirectionType::RECEIVE, aMainThread, aStsThread,
1897 aConduit) {}
1898
~MediaPipelineReceive()1899 MediaPipelineReceive::~MediaPipelineReceive() {}
1900
1901 class MediaPipelineReceiveAudio::PipelineListener
1902 : public GenericReceiveListener {
1903 public:
PipelineListener(dom::MediaStreamTrack * aTrack,const RefPtr<MediaSessionConduit> & aConduit)1904 PipelineListener(dom::MediaStreamTrack* aTrack,
1905 const RefPtr<MediaSessionConduit>& aConduit)
1906 : GenericReceiveListener(aTrack),
1907 mConduit(aConduit)
1908 // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
1909 // This is an artificial limitation, it would however require more
1910 // changes to support any rates. If the sampling rate is not-supported,
1911 // we will use 48kHz instead.
1912 ,
1913 mRate(static_cast<AudioSessionConduit*>(mConduit.get())
1914 ->IsSamplingFreqSupported(mSource->GraphRate())
1915 ? mSource->GraphRate()
1916 : WEBRTC_MAX_SAMPLE_RATE),
1917 mTaskQueue(new AutoTaskQueue(
1918 GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER),
1919 "AudioPipelineListener")),
1920 mLastLog(0) {
1921 AddTrackToSource(mRate);
1922 }
1923
1924 // Implement MediaStreamListener
NotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)1925 void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
1926 NotifyPullImpl(aDesiredTime);
1927 }
1928
AsyncNotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)1929 RefPtr<SourceMediaStream::NotifyPullPromise> AsyncNotifyPull(
1930 MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
1931 RefPtr<PipelineListener> self = this;
1932 return InvokeAsync(mTaskQueue, __func__, [self, aDesiredTime]() {
1933 self->NotifyPullImpl(aDesiredTime);
1934 return SourceMediaStream::NotifyPullPromise::CreateAndResolve(true,
1935 __func__);
1936 });
1937 }
1938
1939 private:
~PipelineListener()1940 ~PipelineListener() {
1941 NS_ReleaseOnMainThreadSystemGroup("MediaPipeline::mConduit",
1942 mConduit.forget());
1943 }
1944
NotifyPullImpl(StreamTime aDesiredTime)1945 void NotifyPullImpl(StreamTime aDesiredTime) {
1946 uint32_t samplesPer10ms = mRate / 100;
1947
1948 // mSource's rate is not necessarily the same as the graph rate, since there
1949 // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
1950 // 48kHz are supported. The audio frames we get here is going to be
1951 // resampled when inserted into the graph.
1952 TrackTicks desired = mSource->TimeToTicksRoundUp(mRate, aDesiredTime);
1953 TrackTicks framesNeeded = desired - mPlayedTicks;
1954
1955 while (framesNeeded >= 0) {
1956 const int scratchBufferLength =
1957 AUDIO_SAMPLE_BUFFER_MAX_BYTES / sizeof(int16_t);
1958 int16_t scratchBuffer[scratchBufferLength];
1959
1960 int samplesLength = scratchBufferLength;
1961
1962 // This fetches 10ms of data, either mono or stereo
1963 MediaConduitErrorCode err =
1964 static_cast<AudioSessionConduit*>(mConduit.get())
1965 ->GetAudioFrame(
1966 scratchBuffer, mRate,
1967 0, // TODO(ekr@rtfm.com): better estimate of "capture"
1968 // (really playout) delay
1969 samplesLength);
1970
1971 if (err != kMediaConduitNoError) {
1972 // Insert silence on conduit/GIPS failure (extremely unlikely)
1973 CSFLogError(LOGTAG,
1974 "Audio conduit failed (%d) to return data @ %" PRId64
1975 " (desired %" PRId64 " -> %f)",
1976 err, mPlayedTicks, aDesiredTime,
1977 mSource->StreamTimeToSeconds(aDesiredTime));
1978 // if this is not enough we'll loop and provide more
1979 samplesLength = samplesPer10ms;
1980 PodArrayZero(scratchBuffer);
1981 }
1982
1983 MOZ_RELEASE_ASSERT(samplesLength <= scratchBufferLength);
1984
1985 CSFLogDebug(LOGTAG, "Audio conduit returned buffer of length %u",
1986 samplesLength);
1987
1988 RefPtr<SharedBuffer> samples =
1989 SharedBuffer::Create(samplesLength * sizeof(uint16_t));
1990 int16_t* samplesData = static_cast<int16_t*>(samples->Data());
1991 AudioSegment segment;
1992 // We derive the number of channels of the stream from the number of
1993 // samples the AudioConduit gives us, considering it gives us packets of
1994 // 10ms and we know the rate.
1995 uint32_t channelCount = samplesLength / samplesPer10ms;
1996 AutoTArray<int16_t*, 2> channels;
1997 AutoTArray<const int16_t*, 2> outputChannels;
1998 size_t frames = samplesLength / channelCount;
1999
2000 channels.SetLength(channelCount);
2001
2002 size_t offset = 0;
2003 for (size_t i = 0; i < channelCount; i++) {
2004 channels[i] = samplesData + offset;
2005 offset += frames;
2006 }
2007
2008 DeinterleaveAndConvertBuffer(scratchBuffer, frames, channelCount,
2009 channels.Elements());
2010
2011 outputChannels.AppendElements(channels);
2012
2013 segment.AppendFrames(samples.forget(), outputChannels, frames,
2014 mPrincipalHandle);
2015
2016 // Handle track not actually added yet or removed/finished
2017 if (mSource->AppendToTrack(mTrackId, &segment)) {
2018 framesNeeded -= frames;
2019 mPlayedTicks += frames;
2020 if (MOZ_LOG_TEST(AudioLogModule(), LogLevel::Debug)) {
2021 if (mPlayedTicks > mLastLog + mRate) {
2022 MOZ_LOG(AudioLogModule(), LogLevel::Debug,
2023 ("%p: Inserting samples into track %d, total = "
2024 "%" PRIu64,
2025 (void*)this, mTrackId, mPlayedTicks));
2026 mLastLog = mPlayedTicks;
2027 }
2028 }
2029 } else {
2030 CSFLogError(LOGTAG, "AppendToTrack failed");
2031 // we can't un-read the data, but that's ok since we don't want to
2032 // buffer - but don't i-loop!
2033 break;
2034 }
2035 }
2036 }
2037
2038 RefPtr<MediaSessionConduit> mConduit;
2039 const TrackRate mRate;
2040 const RefPtr<AutoTaskQueue> mTaskQueue;
2041 // Graph's current sampling rate
2042 TrackTicks mLastLog = 0; // mPlayedTicks when we last logged
2043 };
2044
MediaPipelineReceiveAudio(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<AudioSessionConduit> aConduit,dom::MediaStreamTrack * aTrack)2045 MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
2046 const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
2047 nsCOMPtr<nsIEventTarget> aStsThread, RefPtr<AudioSessionConduit> aConduit,
2048 dom::MediaStreamTrack* aTrack)
2049 : MediaPipelineReceive(aPc, aMainThread, aStsThread, aConduit),
2050 mListener(aTrack ? new PipelineListener(aTrack, mConduit) : nullptr) {
2051 mDescription = mPc + "| Receive audio";
2052 }
2053
DetachMedia()2054 void MediaPipelineReceiveAudio::DetachMedia() {
2055 ASSERT_ON_THREAD(mMainThread);
2056 if (mListener) {
2057 mListener->EndTrack();
2058 mListener = nullptr;
2059 }
2060 }
2061
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)2062 void MediaPipelineReceiveAudio::SetPrincipalHandle_m(
2063 const PrincipalHandle& aPrincipalHandle) {
2064 if (mListener) {
2065 mListener->SetPrincipalHandle_m(aPrincipalHandle);
2066 }
2067 }
2068
Start()2069 void MediaPipelineReceiveAudio::Start() {
2070 mConduit->StartReceiving();
2071 if (mListener) {
2072 mListener->AddSelf();
2073 }
2074 }
2075
Stop()2076 void MediaPipelineReceiveAudio::Stop() {
2077 if (mListener) {
2078 mListener->RemoveSelf();
2079 }
2080 mConduit->StopReceiving();
2081 }
2082
OnRtpPacketReceived()2083 void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
2084 if (mListener) {
2085 mListener->OnRtpReceived();
2086 }
2087 }
2088
2089 class MediaPipelineReceiveVideo::PipelineListener
2090 : public GenericReceiveListener {
2091 public:
PipelineListener(dom::MediaStreamTrack * aTrack)2092 explicit PipelineListener(dom::MediaStreamTrack* aTrack)
2093 : GenericReceiveListener(aTrack),
2094 mImageContainer(
2095 LayerManager::CreateImageContainer(ImageContainer::ASYNCHRONOUS)),
2096 mMutex("Video PipelineListener") {
2097 AddTrackToSource();
2098 }
2099
2100 // Implement MediaStreamListener
NotifyPull(MediaStreamGraph * aGraph,StreamTime aDesiredTime)2101 void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) override {
2102 MutexAutoLock lock(mMutex);
2103
2104 RefPtr<Image> image = mImage;
2105 StreamTime delta = aDesiredTime - mPlayedTicks;
2106
2107 // Don't append if we've already provided a frame that supposedly
2108 // goes past the current aDesiredTime Doing so means a negative
2109 // delta and thus messes up handling of the graph
2110 if (delta > 0) {
2111 VideoSegment segment;
2112 IntSize size = image ? image->GetSize() : IntSize(mWidth, mHeight);
2113 segment.AppendFrame(image.forget(), delta, size, mPrincipalHandle);
2114 // Handle track not actually added yet or removed/finished
2115 if (!mSource->AppendToTrack(mTrackId, &segment)) {
2116 CSFLogError(LOGTAG, "AppendToTrack failed");
2117 return;
2118 }
2119 mPlayedTicks = aDesiredTime;
2120 }
2121 }
2122
2123 // Accessors for external writes from the renderer
FrameSizeChange(unsigned int aWidth,unsigned int aHeight,unsigned int aNumberOfStreams)2124 void FrameSizeChange(unsigned int aWidth, unsigned int aHeight,
2125 unsigned int aNumberOfStreams) {
2126 MutexAutoLock enter(mMutex);
2127
2128 mWidth = aWidth;
2129 mHeight = aHeight;
2130 }
2131
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)2132 void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
2133 uint32_t aTimeStamp, int64_t aRenderTime) {
2134 if (aBuffer.native_handle()) {
2135 // We assume that only native handles are used with the
2136 // WebrtcMediaDataDecoderCodec decoder.
2137 RefPtr<Image> image = static_cast<Image*>(aBuffer.native_handle());
2138 MutexAutoLock lock(mMutex);
2139 mImage = image;
2140 return;
2141 }
2142
2143 MOZ_ASSERT(aBuffer.DataY());
2144 // Create a video frame using |buffer|.
2145 RefPtr<PlanarYCbCrImage> yuvImage =
2146 mImageContainer->CreatePlanarYCbCrImage();
2147
2148 PlanarYCbCrData yuvData;
2149 yuvData.mYChannel = const_cast<uint8_t*>(aBuffer.DataY());
2150 yuvData.mYSize = IntSize(aBuffer.width(), aBuffer.height());
2151 yuvData.mYStride = aBuffer.StrideY();
2152 MOZ_ASSERT(aBuffer.StrideU() == aBuffer.StrideV());
2153 yuvData.mCbCrStride = aBuffer.StrideU();
2154 yuvData.mCbChannel = const_cast<uint8_t*>(aBuffer.DataU());
2155 yuvData.mCrChannel = const_cast<uint8_t*>(aBuffer.DataV());
2156 yuvData.mCbCrSize =
2157 IntSize((aBuffer.width() + 1) >> 1, (aBuffer.height() + 1) >> 1);
2158 yuvData.mPicX = 0;
2159 yuvData.mPicY = 0;
2160 yuvData.mPicSize = IntSize(aBuffer.width(), aBuffer.height());
2161 yuvData.mStereoMode = StereoMode::MONO;
2162
2163 if (!yuvImage->CopyData(yuvData)) {
2164 MOZ_ASSERT(false);
2165 return;
2166 }
2167
2168 MutexAutoLock lock(mMutex);
2169 mImage = yuvImage;
2170 }
2171
2172 private:
2173 int mWidth;
2174 int mHeight;
2175 RefPtr<layers::ImageContainer> mImageContainer;
2176 RefPtr<layers::Image> mImage;
2177 Mutex mMutex; // Mutex for processing WebRTC frames.
2178 // Protects mImage against:
2179 // - Writing from the GIPS thread
2180 // - Reading from the MSG thread
2181 };
2182
2183 class MediaPipelineReceiveVideo::PipelineRenderer
2184 : public mozilla::VideoRenderer {
2185 public:
PipelineRenderer(MediaPipelineReceiveVideo * aPipeline)2186 explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
2187 : mPipeline(aPipeline) {}
2188
Detach()2189 void Detach() { mPipeline = nullptr; }
2190
2191 // Implement VideoRenderer
FrameSizeChange(unsigned int aWidth,unsigned int aHeight,unsigned int aNumberOfStreams)2192 void FrameSizeChange(unsigned int aWidth, unsigned int aHeight,
2193 unsigned int aNumberOfStreams) override {
2194 mPipeline->mListener->FrameSizeChange(aWidth, aHeight, aNumberOfStreams);
2195 }
2196
RenderVideoFrame(const webrtc::VideoFrameBuffer & aBuffer,uint32_t aTimeStamp,int64_t aRenderTime)2197 void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
2198 uint32_t aTimeStamp, int64_t aRenderTime) override {
2199 mPipeline->mListener->RenderVideoFrame(aBuffer, aTimeStamp, aRenderTime);
2200 }
2201
2202 private:
2203 MediaPipelineReceiveVideo* mPipeline; // Raw pointer to avoid cycles
2204 };
2205
MediaPipelineReceiveVideo(const std::string & aPc,nsCOMPtr<nsIEventTarget> aMainThread,nsCOMPtr<nsIEventTarget> aStsThread,RefPtr<VideoSessionConduit> aConduit,dom::MediaStreamTrack * aTrack)2206 MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
2207 const std::string& aPc, nsCOMPtr<nsIEventTarget> aMainThread,
2208 nsCOMPtr<nsIEventTarget> aStsThread, RefPtr<VideoSessionConduit> aConduit,
2209 dom::MediaStreamTrack* aTrack)
2210 : MediaPipelineReceive(aPc, aMainThread, aStsThread, aConduit),
2211 mRenderer(new PipelineRenderer(this)),
2212 mListener(aTrack ? new PipelineListener(aTrack) : nullptr) {
2213 mDescription = mPc + "| Receive video";
2214 aConduit->AttachRenderer(mRenderer);
2215 }
2216
DetachMedia()2217 void MediaPipelineReceiveVideo::DetachMedia() {
2218 ASSERT_ON_THREAD(mMainThread);
2219
2220 // stop generating video and thus stop invoking the PipelineRenderer
2221 // and PipelineListener - the renderer has a raw ptr to the Pipeline to
2222 // avoid cycles, and the render callbacks are invoked from a different
2223 // thread so simple null-checks would cause TSAN bugs without locks.
2224 static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
2225 if (mListener) {
2226 mListener->EndTrack();
2227 mListener = nullptr;
2228 }
2229 }
2230
SetPrincipalHandle_m(const PrincipalHandle & aPrincipalHandle)2231 void MediaPipelineReceiveVideo::SetPrincipalHandle_m(
2232 const PrincipalHandle& aPrincipalHandle) {
2233 if (mListener) {
2234 mListener->SetPrincipalHandle_m(aPrincipalHandle);
2235 }
2236 }
2237
Start()2238 void MediaPipelineReceiveVideo::Start() {
2239 mConduit->StartReceiving();
2240 if (mListener) {
2241 mListener->AddSelf();
2242 }
2243 }
2244
Stop()2245 void MediaPipelineReceiveVideo::Stop() {
2246 if (mListener) {
2247 mListener->RemoveSelf();
2248 }
2249 mConduit->StopReceiving();
2250 }
2251
OnRtpPacketReceived()2252 void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
2253 if (mListener) {
2254 mListener->OnRtpReceived();
2255 }
2256 }
2257
GetNow()2258 DOMHighResTimeStamp MediaPipeline::GetNow() {
2259 return webrtc::Clock::GetRealTimeClock()->TimeInMilliseconds();
2260 }
2261
GetExpiryFromTime(const DOMHighResTimeStamp aTime)2262 DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
2263 const DOMHighResTimeStamp aTime) {
2264 // DOMHighResTimeStamp is a unit measured in ms
2265 return aTime - EXPIRY_TIME_MILLISECONDS;
2266 }
2267
RtpCSRCStats(const uint32_t aCsrc,const DOMHighResTimeStamp aTime)2268 MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
2269 const DOMHighResTimeStamp aTime)
2270 : mCsrc(aCsrc), mTimestamp(aTime) {}
2271
GetWebidlInstance(dom::RTCRTPContributingSourceStats & aWebidlObj,const nsString & aInboundRtpStreamId) const2272 void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
2273 dom::RTCRTPContributingSourceStats& aWebidlObj,
2274 const nsString& aInboundRtpStreamId) const {
2275 nsString statId = NS_LITERAL_STRING("csrc_") + aInboundRtpStreamId;
2276 statId.AppendLiteral("_");
2277 statId.AppendInt(mCsrc);
2278 aWebidlObj.mId.Construct(statId);
2279 aWebidlObj.mType.Construct(RTCStatsType::Csrc);
2280 aWebidlObj.mTimestamp.Construct(mTimestamp);
2281 aWebidlObj.mContributorSsrc.Construct(mCsrc);
2282 aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
2283 }
2284
2285 } // namespace mozilla
2286