1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
4  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 // Original author: ekr@rtfm.com
7 
8 #ifndef mediapipeline_h__
9 #define mediapipeline_h__
10 
11 #include <map>
12 
13 #include "sigslot.h"
14 #include "transportlayer.h"  // For TransportLayer::State
15 
16 #include "signaling/src/media-conduit/MediaConduitInterface.h"
17 #include "mozilla/ReentrantMonitor.h"
18 #include "mozilla/Atomics.h"
19 #include "SrtpFlow.h"  // For SRTP_MAX_EXPANSION
20 #include "mediapacket.h"
21 #include "mtransport/runnable_utils.h"
22 #include "AudioPacketizer.h"
23 #include "MediaPipelineFilter.h"
24 #include "MediaSegment.h"
25 #include "signaling/src/peerconnection/PacketDumper.h"
26 
27 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
28 
29 // Should come from MediaEngine.h, but that's a pain to include here
30 // because of the MOZILLA_EXTERNAL_LINKAGE stuff.
31 #define WEBRTC_MAX_SAMPLE_RATE 48000
32 
33 class nsIPrincipal;
34 
35 namespace mozilla {
36 class AudioProxyThread;
37 class MediaInputPort;
38 class MediaPipelineFilter;
39 class MediaTransportHandler;
40 class PeerIdentity;
41 class ProcessedMediaTrack;
42 class SourceMediaTrack;
43 class VideoFrameConverter;
44 
45 namespace dom {
46 class MediaStreamTrack;
47 struct RTCRTPContributingSourceStats;
48 }  // namespace dom
49 
50 // A class that represents the pipeline of audio and video
51 // The dataflow looks like:
52 //
53 // TRANSMIT
54 // CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network
55 //
56 // RECEIVE
57 // network -> transport -> [us] -> conduit -> [us] -> stream -> Playout
58 //
59 // The boxes labeled [us] are just bridge logic implemented in this class
60 //
61 // We have to deal with a number of threads:
62 //
63 // GSM:
64 //   * Assembles the pipeline
65 // SocketTransportService
66 //   * Receives notification that ICE and DTLS have completed
67 //   * Processes incoming network data and passes it to the conduit
68 //   * Processes outgoing RTP and RTCP
69 // MediaTrackGraph
70 //   * Receives outgoing data from the MediaTrackGraph
71 //   * Receives pull requests for more data from the
72 //     MediaTrackGraph
73 // One or another GIPS threads
74 //   * Receives RTCP messages to send to the other side
75 //   * Processes video frames GIPS wants to render
76 //
77 // For a transmitting conduit, "output" is RTP and "input" is RTCP.
78 // For a receiving conduit, "input" is RTP and "output" is RTCP.
79 //
80 
81 class MediaPipeline : public sigslot::has_slots<> {
82  public:
83   enum class DirectionType { TRANSMIT, RECEIVE };
84   MediaPipeline(const std::string& aPc,
85                 RefPtr<MediaTransportHandler> aTransportHandler,
86                 DirectionType aDirection,
87                 RefPtr<nsISerialEventTarget> aMainThread,
88                 RefPtr<nsISerialEventTarget> aStsThread,
89                 RefPtr<MediaSessionConduit> aConduit);
90 
91   virtual void Start() = 0;
92   virtual RefPtr<GenericPromise> Stop() = 0;
DetachMedia()93   virtual void DetachMedia() {}
94 
SetLevel(size_t aLevel)95   void SetLevel(size_t aLevel) { mLevel = aLevel; }
96 
97   // Must be called on the main thread.
98   void Shutdown_m();
99 
100   void UpdateTransport_m(const std::string& aTransportId,
101                          UniquePtr<MediaPipelineFilter>&& aFilter);
102 
103   void UpdateTransport_s(const std::string& aTransportId,
104                          UniquePtr<MediaPipelineFilter>&& aFilter);
105 
106   // Used only for testing; adds RTP header extension for RTP Stream Id with
107   // the given id.
108   void AddRIDExtension_m(size_t aExtensionId);
109   void AddRIDExtension_s(size_t aExtensionId);
110   // Used only for testing; installs a MediaPipelineFilter that filters
111   // everything but the given RID
112   void AddRIDFilter_m(const std::string& aRid);
113   void AddRIDFilter_s(const std::string& aRid);
114 
Direction()115   virtual DirectionType Direction() const { return mDirection; }
Level()116   int Level() const { return mLevel; }
117   virtual bool IsVideo() const = 0;
118 
119   class RtpCSRCStats {
120    public:
121     // Gets an expiration time for CRC info given a reference time,
122     //   this reference time would normally be the time of calling.
123     //   This value can then be used to check if a RtpCSRCStats
124     //   has expired via Expired(...)
125     static DOMHighResTimeStamp GetExpiryFromTime(
126         const DOMHighResTimeStamp aTime);
127 
128     RtpCSRCStats(const uint32_t aCsrc, const DOMHighResTimeStamp aTime);
129     ~RtpCSRCStats() = default;
130     // Initialize a webidl representation suitable for adding to a report.
131     //   This assumes that the webidl object is empty.
132     // @param aWebidlObj the webidl binding object to popluate
133     // @param aInboundRtpStreamId the associated RTCInboundRTPStreamStats.id
134     void GetWebidlInstance(dom::RTCRTPContributingSourceStats& aWebidlObj,
135                            const nsString& aInboundRtpStreamId) const;
SetTimestamp(const DOMHighResTimeStamp aTime)136     void SetTimestamp(const DOMHighResTimeStamp aTime) { mTimestamp = aTime; }
137     // Check if the RtpCSRCStats has expired, checks against a
138     //   given expiration time.
Expired(const DOMHighResTimeStamp aExpiry)139     bool Expired(const DOMHighResTimeStamp aExpiry) const {
140       return mTimestamp < aExpiry;
141     }
142 
143    private:
144     static const double constexpr EXPIRY_TIME_MILLISECONDS = 10 * 1000;
145     const uint32_t mCsrc;
146     DOMHighResTimeStamp mTimestamp;
147   };
148 
149   // Gets the gathered contributing source stats for the last expiration period.
150   // @param aId the stream id to use for populating inboundRtpStreamId field
151   // @param aArr the array to append the stats objects to
152   void GetContributingSourceStats(
153       const nsString& aInboundRtpStreamId,
154       FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const;
155 
RtpPacketsSent()156   int32_t RtpPacketsSent() const { return mRtpPacketsSent; }
RtpBytesSent()157   int64_t RtpBytesSent() const { return mRtpBytesSent; }
RtcpPacketsSent()158   int32_t RtcpPacketsSent() const { return mRtcpPacketsSent; }
RtpPacketsReceived()159   int32_t RtpPacketsReceived() const { return mRtpPacketsReceived; }
RtpBytesReceived()160   int64_t RtpBytesReceived() const { return mRtpBytesReceived; }
RtcpPacketsReceived()161   int32_t RtcpPacketsReceived() const { return mRtcpPacketsReceived; }
162   // Gets the current time as a DOMHighResTimeStamp
163   DOMHighResTimeStamp GetNow() const;
164 
Conduit()165   MediaSessionConduit* Conduit() const { return mConduit; }
166 
167   // Thread counting
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline)168   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline)
169 
170   // Separate class to allow ref counting
171   class PipelineTransport : public TransportInterface {
172    public:
173     // Implement the TransportInterface functions
174     explicit PipelineTransport(RefPtr<nsISerialEventTarget> aStsThread)
175         : mPipeline(nullptr), mStsThread(std::move(aStsThread)) {}
176 
177     void Attach(MediaPipeline* pipeline) { mPipeline = pipeline; }
178     void Detach() { mPipeline = nullptr; }
179     MediaPipeline* Pipeline() const { return mPipeline; }
180 
181     virtual nsresult SendRtpPacket(const uint8_t* aData, size_t aLen) override;
182     virtual nsresult SendRtcpPacket(const uint8_t* aData, size_t aLen) override;
183 
184    private:
185     void SendRtpRtcpPacket_s(MediaPacket&& aPacket);
186 
187     // Creates a cycle, which we break with Detach
188     RefPtr<MediaPipeline> mPipeline;
189     const RefPtr<nsISerialEventTarget> mStsThread;
190   };
191 
192  protected:
193   virtual ~MediaPipeline();
194   friend class PipelineTransport;
195 
196   // The transport is ready
TransportReady_s()197   virtual void TransportReady_s() {}
198 
199   void IncrementRtpPacketsSent(int aBytes);
200   void IncrementRtcpPacketsSent();
201   void IncrementRtpPacketsReceived(int aBytes);
OnRtpPacketReceived()202   virtual void OnRtpPacketReceived() {}
203   void IncrementRtcpPacketsReceived();
204 
205   virtual void SendPacket(MediaPacket&& packet);
206 
207   // Process slots on transports
208   void RtpStateChange(const std::string& aTransportId, TransportLayer::State);
209   void RtcpStateChange(const std::string& aTransportId, TransportLayer::State);
210   virtual void CheckTransportStates();
211   void PacketReceived(const std::string& aTransportId,
212                       const MediaPacket& packet);
213   void AlpnNegotiated(const std::string& aAlpn, bool aPrivacyRequested);
214 
215   void RtpPacketReceived(const MediaPacket& packet);
216   void RtcpPacketReceived(const MediaPacket& packet);
217 
218   void EncryptedPacketSending(const std::string& aTransportId,
219                               const MediaPacket& aPacket);
220 
221   void SetDescription_s(const std::string& description);
222 
223   // Called when ALPN is negotiated and is requesting privacy, so receive
224   // pipelines do not enter data into the graph under a content principal.
MakePrincipalPrivate_s()225   virtual void MakePrincipalPrivate_s() {}
226 
227   const DirectionType mDirection;
228   size_t mLevel;
229   std::string mTransportId;
230   const RefPtr<MediaTransportHandler> mTransportHandler;
231   RefPtr<MediaSessionConduit> mConduit;  // Our conduit. Written on the main
232                                          // thread. Read on STS thread.
233 
234   TransportLayer::State mRtpState = TransportLayer::TS_NONE;
235   TransportLayer::State mRtcpState = TransportLayer::TS_NONE;
236   bool mSignalsConnected = false;
237 
238   // Pointers to the threads we need. Initialized at creation
239   // and used all over the place.
240   const RefPtr<nsISerialEventTarget> mMainThread;
241   const RefPtr<nsISerialEventTarget> mStsThread;
242 
243   // Created in c'tor. Referenced by the conduit.
244   const RefPtr<PipelineTransport> mTransport;
245 
246   // Only safe to access from STS thread.
247   int32_t mRtpPacketsSent;
248   int32_t mRtcpPacketsSent;
249   int32_t mRtpPacketsReceived;
250   int32_t mRtcpPacketsReceived;
251   int64_t mRtpBytesSent;
252   int64_t mRtpBytesReceived;
253 
254   // Only safe to access from STS thread.
255   std::map<uint32_t, RtpCSRCStats> mCsrcStats;
256 
257   // Written in c'tor. Read on STS thread.
258   const std::string mPc;
259   std::string mDescription;
260 
261   // Written in c'tor, all following accesses are on the STS thread.
262   UniquePtr<MediaPipelineFilter> mFilter;
263   const UniquePtr<webrtc::RtpHeaderParser> mRtpParser;
264 
265   UniquePtr<PacketDumper> mPacketDumper;
266 
267  private:
268   bool IsRtp(const unsigned char* aData, size_t aLen) const;
269   // Must be called on the STS thread.  Must be called after DetachMedia().
270   void DetachTransport_s();
271 };
272 
273 // A specialization of pipeline for reading from an input device
274 // and transmitting to the network.
275 class MediaPipelineTransmit : public MediaPipeline {
276  public:
277   // Set aRtcpTransport to nullptr to use rtcp-mux
278   MediaPipelineTransmit(const std::string& aPc,
279                         RefPtr<MediaTransportHandler> aTransportHandler,
280                         RefPtr<nsISerialEventTarget> aMainThread,
281                         RefPtr<nsISerialEventTarget> aStsThread, bool aIsVideo,
282                         RefPtr<MediaSessionConduit> aConduit);
283 
284   bool Transmitting() const;
285 
286   void Start() override;
287   RefPtr<GenericPromise> Stop() override;
288 
289   // written and used from MainThread
290   bool IsVideo() const override;
291 
292   // When the principal of the domtrack changes, it calls through to here
293   // so that we can determine whether to enable track transmission.
294   // `track` has to be null or equal `mDomTrack` for us to apply the update.
295   virtual void UpdateSinkIdentity_m(const dom::MediaStreamTrack* aTrack,
296                                     nsIPrincipal* aPrincipal,
297                                     const PeerIdentity* aSinkIdentity);
298 
299   // Called on the main thread.
300   void DetachMedia() override;
301 
302   // Override MediaPipeline::TransportReady_s.
303   void TransportReady_s() override;
304 
305   // Replace a track with a different one.
306   nsresult SetTrack(RefPtr<dom::MediaStreamTrack> aDomTrack);
307 
308   // Set the track whose data we will transmit. For internal and test use.
309   void SetSendTrack(RefPtr<ProcessedMediaTrack> aSendTrack);
310 
311   // Separate classes to allow ref counting
312   class PipelineListener;
313   class PipelineListenerTrackConsumer;
314   class VideoFrameFeeder;
315 
316  protected:
317   ~MediaPipelineTransmit();
318 
319   void SetDescription();
320 
321  private:
322   void AsyncStart(const RefPtr<GenericPromise>& aPromise);
323 
324   const bool mIsVideo;
325   const RefPtr<PipelineListener> mListener;
326   // Listens for changes in enabled state on the attached MediaStreamTrack, and
327   // notifies mListener.
328   const nsMainThreadPtrHandle<PipelineListenerTrackConsumer> mTrackConsumer;
329   const RefPtr<VideoFrameFeeder> mFeeder;
330   RefPtr<AudioProxyThread> mAudioProcessing;
331   RefPtr<VideoFrameConverter> mConverter;
332   RefPtr<dom::MediaStreamTrack> mDomTrack;
333   // Input port connecting mDomTrack's MediaTrack to mSendTrack.
334   RefPtr<MediaInputPort> mSendPort;
335   // MediaTrack that we send over the network. This allows changing mDomTrack.
336   RefPtr<ProcessedMediaTrack> mSendTrack;
337   // True if we're actively transmitting data to the network. Main thread only.
338   bool mTransmitting;
339   // When AsyncStart() is used this flag helps to avoid unexpected starts. One
340   // case is that a start has already been scheduled. A second case is that a
341   // start has already taken place (from JS for example). A third case is that
342   // a stop has taken place so we want to cancel the start. Main thread only.
343   bool mAsyncStartRequested;
344 };
345 
346 // A specialization of pipeline for reading from the network and
347 // rendering media.
348 class MediaPipelineReceive : public MediaPipeline {
349  public:
350   // Set aRtcpTransport to nullptr to use rtcp-mux
351   MediaPipelineReceive(const std::string& aPc,
352                        RefPtr<MediaTransportHandler> aTransportHandler,
353                        RefPtr<nsISerialEventTarget> aMainThread,
354                        RefPtr<nsISerialEventTarget> aStsThread,
355                        RefPtr<MediaSessionConduit> aConduit);
356 
357  protected:
358   ~MediaPipelineReceive();
359 };
360 
361 // A specialization of pipeline for reading from the network and
362 // rendering audio.
363 class MediaPipelineReceiveAudio : public MediaPipelineReceive {
364  public:
365   MediaPipelineReceiveAudio(const std::string& aPc,
366                             RefPtr<MediaTransportHandler> aTransportHandler,
367                             RefPtr<nsISerialEventTarget> aMainThread,
368                             RefPtr<nsISerialEventTarget> aStsThread,
369                             RefPtr<AudioSessionConduit> aConduit,
370                             const RefPtr<dom::MediaStreamTrack>& aTrack,
371                             const PrincipalHandle& aPrincipalHandle);
372 
373   void DetachMedia() override;
374 
IsVideo()375   bool IsVideo() const override { return false; }
376 
377   void MakePrincipalPrivate_s() override;
378 
379   void Start() override;
380   RefPtr<GenericPromise> Stop() override;
381 
382   void OnRtpPacketReceived() override;
383 
384  private:
385   // Separate class to allow ref counting
386   class PipelineListener;
387 
388   const RefPtr<PipelineListener> mListener;
389 };
390 
391 // A specialization of pipeline for reading from the network and
392 // rendering video.
393 class MediaPipelineReceiveVideo : public MediaPipelineReceive {
394  public:
395   MediaPipelineReceiveVideo(const std::string& aPc,
396                             RefPtr<MediaTransportHandler> aTransportHandler,
397                             RefPtr<nsISerialEventTarget> aMainThread,
398                             RefPtr<nsISerialEventTarget> aStsThread,
399                             RefPtr<VideoSessionConduit> aConduit,
400                             const RefPtr<dom::MediaStreamTrack>& aTrack,
401                             const PrincipalHandle& aPrincipalHandle);
402 
403   // Called on the main thread.
404   void DetachMedia() override;
405 
IsVideo()406   bool IsVideo() const override { return true; }
407 
408   void MakePrincipalPrivate_s() override;
409 
410   void Start() override;
411   RefPtr<GenericPromise> Stop() override;
412 
413   void OnRtpPacketReceived() override;
414 
415  private:
416   class PipelineRenderer;
417   friend class PipelineRenderer;
418 
419   // Separate class to allow ref counting
420   class PipelineListener;
421 
422   const RefPtr<PipelineRenderer> mRenderer;
423   const RefPtr<PipelineListener> mListener;
424 };
425 
426 }  // namespace mozilla
427 #endif
428