1 /*
2  *  Copyright 2018 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "api/test/loopback_media_transport.h"
12 
13 #include <memory>
14 
15 #include "absl/algorithm/container.h"
16 #include "rtc_base/time_utils.h"
17 
18 namespace webrtc {
19 
20 namespace {
21 
22 constexpr size_t kLoopbackMaxDatagramSize = 1200;
23 
24 class WrapperDatagramTransport : public DatagramTransportInterface {
25  public:
WrapperDatagramTransport(DatagramTransportInterface * wrapped)26   explicit WrapperDatagramTransport(DatagramTransportInterface* wrapped)
27       : wrapped_(wrapped) {}
28 
29   // Datagram transport overrides.
Connect(rtc::PacketTransportInternal * packet_transport)30   void Connect(rtc::PacketTransportInternal* packet_transport) override {
31     return wrapped_->Connect(packet_transport);
32   }
33 
congestion_control()34   CongestionControlInterface* congestion_control() override {
35     return wrapped_->congestion_control();
36   }
37 
SetTransportStateCallback(MediaTransportStateCallback * callback)38   void SetTransportStateCallback(
39       MediaTransportStateCallback* callback) override {
40     return wrapped_->SetTransportStateCallback(callback);
41   }
42 
SendDatagram(rtc::ArrayView<const uint8_t> data,DatagramId datagram_id)43   RTCError SendDatagram(rtc::ArrayView<const uint8_t> data,
44                         DatagramId datagram_id) override {
45     return wrapped_->SendDatagram(data, datagram_id);
46   }
47 
GetLargestDatagramSize() const48   size_t GetLargestDatagramSize() const override {
49     return wrapped_->GetLargestDatagramSize();
50   }
51 
SetDatagramSink(DatagramSinkInterface * sink)52   void SetDatagramSink(DatagramSinkInterface* sink) override {
53     return wrapped_->SetDatagramSink(sink);
54   }
55 
GetTransportParameters() const56   std::string GetTransportParameters() const override {
57     return wrapped_->GetTransportParameters();
58   }
59 
SetRemoteTransportParameters(absl::string_view parameters)60   RTCError SetRemoteTransportParameters(absl::string_view parameters) override {
61     return wrapped_->SetRemoteTransportParameters(parameters);
62   }
63 
64   // Data channel overrides.
OpenChannel(int channel_id)65   RTCError OpenChannel(int channel_id) override {
66     return wrapped_->OpenChannel(channel_id);
67   }
68 
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)69   RTCError SendData(int channel_id,
70                     const SendDataParams& params,
71                     const rtc::CopyOnWriteBuffer& buffer) override {
72     return wrapped_->SendData(channel_id, params, buffer);
73   }
74 
CloseChannel(int channel_id)75   RTCError CloseChannel(int channel_id) override {
76     return wrapped_->CloseChannel(channel_id);
77   }
78 
SetDataSink(DataChannelSink * sink)79   void SetDataSink(DataChannelSink* sink) override {
80     wrapped_->SetDataSink(sink);
81   }
82 
IsReadyToSend() const83   bool IsReadyToSend() const override { return wrapped_->IsReadyToSend(); }
84 
85  private:
86   DatagramTransportInterface* wrapped_;
87 };
88 
89 }  // namespace
90 
WrapperMediaTransportFactory(DatagramTransportInterface * wrapped_datagram_transport)91 WrapperMediaTransportFactory::WrapperMediaTransportFactory(
92     DatagramTransportInterface* wrapped_datagram_transport)
93     : wrapped_datagram_transport_(wrapped_datagram_transport) {}
94 
WrapperMediaTransportFactory(MediaTransportFactory * wrapped)95 WrapperMediaTransportFactory::WrapperMediaTransportFactory(
96     MediaTransportFactory* wrapped)
97     : wrapped_factory_(wrapped) {}
98 
99 RTCErrorOr<std::unique_ptr<MediaTransportInterface>>
CreateMediaTransport(rtc::PacketTransportInternal * packet_transport,rtc::Thread * network_thread,const MediaTransportSettings & settings)100 WrapperMediaTransportFactory::CreateMediaTransport(
101     rtc::PacketTransportInternal* packet_transport,
102     rtc::Thread* network_thread,
103     const MediaTransportSettings& settings) {
104   return RTCError(RTCErrorType::UNSUPPORTED_OPERATION);
105 }
106 
107 RTCErrorOr<std::unique_ptr<DatagramTransportInterface>>
CreateDatagramTransport(rtc::Thread * network_thread,const MediaTransportSettings & settings)108 WrapperMediaTransportFactory::CreateDatagramTransport(
109     rtc::Thread* network_thread,
110     const MediaTransportSettings& settings) {
111   created_transport_count_++;
112   if (wrapped_factory_) {
113     return wrapped_factory_->CreateDatagramTransport(network_thread, settings);
114   }
115   return {
116       std::make_unique<WrapperDatagramTransport>(wrapped_datagram_transport_)};
117 }
118 
GetTransportName() const119 std::string WrapperMediaTransportFactory::GetTransportName() const {
120   if (wrapped_factory_) {
121     return wrapped_factory_->GetTransportName();
122   }
123   return "wrapped-transport";
124 }
125 
created_transport_count() const126 int WrapperMediaTransportFactory::created_transport_count() const {
127   return created_transport_count_;
128 }
129 
130 RTCErrorOr<std::unique_ptr<MediaTransportInterface>>
CreateMediaTransport(rtc::Thread * network_thread,const MediaTransportSettings & settings)131 WrapperMediaTransportFactory::CreateMediaTransport(
132     rtc::Thread* network_thread,
133     const MediaTransportSettings& settings) {
134   return RTCError(RTCErrorType::UNSUPPORTED_OPERATION);
135 }
136 
MediaTransportPair(rtc::Thread * thread)137 MediaTransportPair::MediaTransportPair(rtc::Thread* thread)
138     : first_datagram_transport_(thread),
139       second_datagram_transport_(thread),
140       first_factory_(&first_datagram_transport_),
141       second_factory_(&second_datagram_transport_) {
142   first_datagram_transport_.Connect(&second_datagram_transport_);
143   second_datagram_transport_.Connect(&first_datagram_transport_);
144 }
145 
146 MediaTransportPair::~MediaTransportPair() = default;
147 
LoopbackDataChannelTransport(rtc::Thread * thread)148 MediaTransportPair::LoopbackDataChannelTransport::LoopbackDataChannelTransport(
149     rtc::Thread* thread)
150     : thread_(thread) {}
151 
152 MediaTransportPair::LoopbackDataChannelTransport::
~LoopbackDataChannelTransport()153     ~LoopbackDataChannelTransport() {
154   RTC_CHECK(data_sink_ == nullptr);
155 }
156 
Connect(LoopbackDataChannelTransport * other)157 void MediaTransportPair::LoopbackDataChannelTransport::Connect(
158     LoopbackDataChannelTransport* other) {
159   other_ = other;
160 }
161 
OpenChannel(int channel_id)162 RTCError MediaTransportPair::LoopbackDataChannelTransport::OpenChannel(
163     int channel_id) {
164   // No-op.  No need to open channels for the loopback.
165   return RTCError::OK();
166 }
167 
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)168 RTCError MediaTransportPair::LoopbackDataChannelTransport::SendData(
169     int channel_id,
170     const SendDataParams& params,
171     const rtc::CopyOnWriteBuffer& buffer) {
172   invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_,
173                              [this, channel_id, params, buffer] {
174                                other_->OnData(channel_id, params.type, buffer);
175                              });
176   return RTCError::OK();
177 }
178 
CloseChannel(int channel_id)179 RTCError MediaTransportPair::LoopbackDataChannelTransport::CloseChannel(
180     int channel_id) {
181   invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, channel_id] {
182     other_->OnRemoteCloseChannel(channel_id);
183     rtc::CritScope lock(&sink_lock_);
184     if (data_sink_) {
185       data_sink_->OnChannelClosed(channel_id);
186     }
187   });
188   return RTCError::OK();
189 }
190 
SetDataSink(DataChannelSink * sink)191 void MediaTransportPair::LoopbackDataChannelTransport::SetDataSink(
192     DataChannelSink* sink) {
193   rtc::CritScope lock(&sink_lock_);
194   data_sink_ = sink;
195   if (data_sink_ && ready_to_send_) {
196     data_sink_->OnReadyToSend();
197   }
198 }
199 
IsReadyToSend() const200 bool MediaTransportPair::LoopbackDataChannelTransport::IsReadyToSend() const {
201   rtc::CritScope lock(&sink_lock_);
202   return ready_to_send_;
203 }
204 
FlushAsyncInvokes()205 void MediaTransportPair::LoopbackDataChannelTransport::FlushAsyncInvokes() {
206   invoker_.Flush(thread_);
207 }
208 
OnData(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)209 void MediaTransportPair::LoopbackDataChannelTransport::OnData(
210     int channel_id,
211     DataMessageType type,
212     const rtc::CopyOnWriteBuffer& buffer) {
213   rtc::CritScope lock(&sink_lock_);
214   if (data_sink_) {
215     data_sink_->OnDataReceived(channel_id, type, buffer);
216   }
217 }
218 
OnRemoteCloseChannel(int channel_id)219 void MediaTransportPair::LoopbackDataChannelTransport::OnRemoteCloseChannel(
220     int channel_id) {
221   rtc::CritScope lock(&sink_lock_);
222   if (data_sink_) {
223     data_sink_->OnChannelClosing(channel_id);
224     data_sink_->OnChannelClosed(channel_id);
225   }
226 }
227 
OnReadyToSend(bool ready_to_send)228 void MediaTransportPair::LoopbackDataChannelTransport::OnReadyToSend(
229     bool ready_to_send) {
230   invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, ready_to_send] {
231     rtc::CritScope lock(&sink_lock_);
232     ready_to_send_ = ready_to_send;
233     // Propagate state to data channel sink, if present.
234     if (data_sink_ && ready_to_send_) {
235       data_sink_->OnReadyToSend();
236     }
237   });
238 }
239 
LoopbackDatagramTransport(rtc::Thread * thread)240 MediaTransportPair::LoopbackDatagramTransport::LoopbackDatagramTransport(
241     rtc::Thread* thread)
242     : thread_(thread), dc_transport_(thread) {}
243 
Connect(LoopbackDatagramTransport * other)244 void MediaTransportPair::LoopbackDatagramTransport::Connect(
245     LoopbackDatagramTransport* other) {
246   other_ = other;
247   dc_transport_.Connect(&other->dc_transport_);
248 }
249 
Connect(rtc::PacketTransportInternal * packet_transport)250 void MediaTransportPair::LoopbackDatagramTransport::Connect(
251     rtc::PacketTransportInternal* packet_transport) {
252   if (state_after_connect_) {
253     SetState(*state_after_connect_);
254   }
255 }
256 
257 CongestionControlInterface*
congestion_control()258 MediaTransportPair::LoopbackDatagramTransport::congestion_control() {
259   return nullptr;
260 }
261 
SetTransportStateCallback(MediaTransportStateCallback * callback)262 void MediaTransportPair::LoopbackDatagramTransport::SetTransportStateCallback(
263     MediaTransportStateCallback* callback) {
264   RTC_DCHECK_RUN_ON(thread_);
265   state_callback_ = callback;
266   if (state_callback_) {
267     state_callback_->OnStateChanged(state_);
268   }
269 }
270 
SendDatagram(rtc::ArrayView<const uint8_t> data,DatagramId datagram_id)271 RTCError MediaTransportPair::LoopbackDatagramTransport::SendDatagram(
272     rtc::ArrayView<const uint8_t> data,
273     DatagramId datagram_id) {
274   rtc::CopyOnWriteBuffer buffer;
275   buffer.SetData(data.data(), data.size());
276   invoker_.AsyncInvoke<void>(
277       RTC_FROM_HERE, thread_, [this, datagram_id, buffer = std::move(buffer)] {
278         RTC_DCHECK_RUN_ON(thread_);
279         other_->DeliverDatagram(std::move(buffer));
280         if (sink_) {
281           DatagramAck ack;
282           ack.datagram_id = datagram_id;
283           ack.receive_timestamp = Timestamp::Micros(rtc::TimeMicros());
284           sink_->OnDatagramAcked(ack);
285         }
286       });
287   return RTCError::OK();
288 }
289 
GetLargestDatagramSize() const290 size_t MediaTransportPair::LoopbackDatagramTransport::GetLargestDatagramSize()
291     const {
292   return kLoopbackMaxDatagramSize;
293 }
294 
SetDatagramSink(DatagramSinkInterface * sink)295 void MediaTransportPair::LoopbackDatagramTransport::SetDatagramSink(
296     DatagramSinkInterface* sink) {
297   RTC_DCHECK_RUN_ON(thread_);
298   sink_ = sink;
299 }
300 
301 std::string
GetTransportParameters() const302 MediaTransportPair::LoopbackDatagramTransport::GetTransportParameters() const {
303   return transport_parameters_;
304 }
305 
306 RTCError
SetRemoteTransportParameters(absl::string_view remote_parameters)307 MediaTransportPair::LoopbackDatagramTransport::SetRemoteTransportParameters(
308     absl::string_view remote_parameters) {
309   RTC_DCHECK_RUN_ON(thread_);
310   if (transport_parameters_comparison_(GetTransportParameters(),
311                                        remote_parameters)) {
312     return RTCError::OK();
313   }
314   return RTCError(RTCErrorType::UNSUPPORTED_PARAMETER,
315                   "Incompatible remote transport parameters");
316 }
317 
OpenChannel(int channel_id)318 RTCError MediaTransportPair::LoopbackDatagramTransport::OpenChannel(
319     int channel_id) {
320   return dc_transport_.OpenChannel(channel_id);
321 }
322 
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)323 RTCError MediaTransportPair::LoopbackDatagramTransport::SendData(
324     int channel_id,
325     const SendDataParams& params,
326     const rtc::CopyOnWriteBuffer& buffer) {
327   return dc_transport_.SendData(channel_id, params, buffer);
328 }
329 
CloseChannel(int channel_id)330 RTCError MediaTransportPair::LoopbackDatagramTransport::CloseChannel(
331     int channel_id) {
332   return dc_transport_.CloseChannel(channel_id);
333 }
334 
SetDataSink(DataChannelSink * sink)335 void MediaTransportPair::LoopbackDatagramTransport::SetDataSink(
336     DataChannelSink* sink) {
337   dc_transport_.SetDataSink(sink);
338 }
339 
IsReadyToSend() const340 bool MediaTransportPair::LoopbackDatagramTransport::IsReadyToSend() const {
341   return dc_transport_.IsReadyToSend();
342 }
343 
SetState(MediaTransportState state)344 void MediaTransportPair::LoopbackDatagramTransport::SetState(
345     MediaTransportState state) {
346   invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, state] {
347     RTC_DCHECK_RUN_ON(thread_);
348     state_ = state;
349     if (state_callback_) {
350       state_callback_->OnStateChanged(state_);
351     }
352   });
353   dc_transport_.OnReadyToSend(state == MediaTransportState::kWritable);
354 }
355 
SetStateAfterConnect(MediaTransportState state)356 void MediaTransportPair::LoopbackDatagramTransport::SetStateAfterConnect(
357     MediaTransportState state) {
358   state_after_connect_ = state;
359 }
360 
FlushAsyncInvokes()361 void MediaTransportPair::LoopbackDatagramTransport::FlushAsyncInvokes() {
362   dc_transport_.FlushAsyncInvokes();
363 }
364 
DeliverDatagram(rtc::CopyOnWriteBuffer buffer)365 void MediaTransportPair::LoopbackDatagramTransport::DeliverDatagram(
366     rtc::CopyOnWriteBuffer buffer) {
367   RTC_DCHECK_RUN_ON(thread_);
368   if (sink_) {
369     sink_->OnDatagramReceived(buffer);
370   }
371 }
372 
373 }  // namespace webrtc
374