1 /*
2 * Copyright 2018 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "api/test/loopback_media_transport.h"
12
13 #include <memory>
14
15 #include "absl/algorithm/container.h"
16 #include "rtc_base/time_utils.h"
17
18 namespace webrtc {
19
20 namespace {
21
22 constexpr size_t kLoopbackMaxDatagramSize = 1200;
23
24 class WrapperDatagramTransport : public DatagramTransportInterface {
25 public:
WrapperDatagramTransport(DatagramTransportInterface * wrapped)26 explicit WrapperDatagramTransport(DatagramTransportInterface* wrapped)
27 : wrapped_(wrapped) {}
28
29 // Datagram transport overrides.
Connect(rtc::PacketTransportInternal * packet_transport)30 void Connect(rtc::PacketTransportInternal* packet_transport) override {
31 return wrapped_->Connect(packet_transport);
32 }
33
congestion_control()34 CongestionControlInterface* congestion_control() override {
35 return wrapped_->congestion_control();
36 }
37
SetTransportStateCallback(MediaTransportStateCallback * callback)38 void SetTransportStateCallback(
39 MediaTransportStateCallback* callback) override {
40 return wrapped_->SetTransportStateCallback(callback);
41 }
42
SendDatagram(rtc::ArrayView<const uint8_t> data,DatagramId datagram_id)43 RTCError SendDatagram(rtc::ArrayView<const uint8_t> data,
44 DatagramId datagram_id) override {
45 return wrapped_->SendDatagram(data, datagram_id);
46 }
47
GetLargestDatagramSize() const48 size_t GetLargestDatagramSize() const override {
49 return wrapped_->GetLargestDatagramSize();
50 }
51
SetDatagramSink(DatagramSinkInterface * sink)52 void SetDatagramSink(DatagramSinkInterface* sink) override {
53 return wrapped_->SetDatagramSink(sink);
54 }
55
GetTransportParameters() const56 std::string GetTransportParameters() const override {
57 return wrapped_->GetTransportParameters();
58 }
59
SetRemoteTransportParameters(absl::string_view parameters)60 RTCError SetRemoteTransportParameters(absl::string_view parameters) override {
61 return wrapped_->SetRemoteTransportParameters(parameters);
62 }
63
64 // Data channel overrides.
OpenChannel(int channel_id)65 RTCError OpenChannel(int channel_id) override {
66 return wrapped_->OpenChannel(channel_id);
67 }
68
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)69 RTCError SendData(int channel_id,
70 const SendDataParams& params,
71 const rtc::CopyOnWriteBuffer& buffer) override {
72 return wrapped_->SendData(channel_id, params, buffer);
73 }
74
CloseChannel(int channel_id)75 RTCError CloseChannel(int channel_id) override {
76 return wrapped_->CloseChannel(channel_id);
77 }
78
SetDataSink(DataChannelSink * sink)79 void SetDataSink(DataChannelSink* sink) override {
80 wrapped_->SetDataSink(sink);
81 }
82
IsReadyToSend() const83 bool IsReadyToSend() const override { return wrapped_->IsReadyToSend(); }
84
85 private:
86 DatagramTransportInterface* wrapped_;
87 };
88
89 } // namespace
90
WrapperMediaTransportFactory(DatagramTransportInterface * wrapped_datagram_transport)91 WrapperMediaTransportFactory::WrapperMediaTransportFactory(
92 DatagramTransportInterface* wrapped_datagram_transport)
93 : wrapped_datagram_transport_(wrapped_datagram_transport) {}
94
WrapperMediaTransportFactory(MediaTransportFactory * wrapped)95 WrapperMediaTransportFactory::WrapperMediaTransportFactory(
96 MediaTransportFactory* wrapped)
97 : wrapped_factory_(wrapped) {}
98
99 RTCErrorOr<std::unique_ptr<MediaTransportInterface>>
CreateMediaTransport(rtc::PacketTransportInternal * packet_transport,rtc::Thread * network_thread,const MediaTransportSettings & settings)100 WrapperMediaTransportFactory::CreateMediaTransport(
101 rtc::PacketTransportInternal* packet_transport,
102 rtc::Thread* network_thread,
103 const MediaTransportSettings& settings) {
104 return RTCError(RTCErrorType::UNSUPPORTED_OPERATION);
105 }
106
107 RTCErrorOr<std::unique_ptr<DatagramTransportInterface>>
CreateDatagramTransport(rtc::Thread * network_thread,const MediaTransportSettings & settings)108 WrapperMediaTransportFactory::CreateDatagramTransport(
109 rtc::Thread* network_thread,
110 const MediaTransportSettings& settings) {
111 created_transport_count_++;
112 if (wrapped_factory_) {
113 return wrapped_factory_->CreateDatagramTransport(network_thread, settings);
114 }
115 return {
116 std::make_unique<WrapperDatagramTransport>(wrapped_datagram_transport_)};
117 }
118
GetTransportName() const119 std::string WrapperMediaTransportFactory::GetTransportName() const {
120 if (wrapped_factory_) {
121 return wrapped_factory_->GetTransportName();
122 }
123 return "wrapped-transport";
124 }
125
created_transport_count() const126 int WrapperMediaTransportFactory::created_transport_count() const {
127 return created_transport_count_;
128 }
129
130 RTCErrorOr<std::unique_ptr<MediaTransportInterface>>
CreateMediaTransport(rtc::Thread * network_thread,const MediaTransportSettings & settings)131 WrapperMediaTransportFactory::CreateMediaTransport(
132 rtc::Thread* network_thread,
133 const MediaTransportSettings& settings) {
134 return RTCError(RTCErrorType::UNSUPPORTED_OPERATION);
135 }
136
MediaTransportPair(rtc::Thread * thread)137 MediaTransportPair::MediaTransportPair(rtc::Thread* thread)
138 : first_datagram_transport_(thread),
139 second_datagram_transport_(thread),
140 first_factory_(&first_datagram_transport_),
141 second_factory_(&second_datagram_transport_) {
142 first_datagram_transport_.Connect(&second_datagram_transport_);
143 second_datagram_transport_.Connect(&first_datagram_transport_);
144 }
145
146 MediaTransportPair::~MediaTransportPair() = default;
147
LoopbackDataChannelTransport(rtc::Thread * thread)148 MediaTransportPair::LoopbackDataChannelTransport::LoopbackDataChannelTransport(
149 rtc::Thread* thread)
150 : thread_(thread) {}
151
152 MediaTransportPair::LoopbackDataChannelTransport::
~LoopbackDataChannelTransport()153 ~LoopbackDataChannelTransport() {
154 RTC_CHECK(data_sink_ == nullptr);
155 }
156
Connect(LoopbackDataChannelTransport * other)157 void MediaTransportPair::LoopbackDataChannelTransport::Connect(
158 LoopbackDataChannelTransport* other) {
159 other_ = other;
160 }
161
OpenChannel(int channel_id)162 RTCError MediaTransportPair::LoopbackDataChannelTransport::OpenChannel(
163 int channel_id) {
164 // No-op. No need to open channels for the loopback.
165 return RTCError::OK();
166 }
167
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)168 RTCError MediaTransportPair::LoopbackDataChannelTransport::SendData(
169 int channel_id,
170 const SendDataParams& params,
171 const rtc::CopyOnWriteBuffer& buffer) {
172 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_,
173 [this, channel_id, params, buffer] {
174 other_->OnData(channel_id, params.type, buffer);
175 });
176 return RTCError::OK();
177 }
178
CloseChannel(int channel_id)179 RTCError MediaTransportPair::LoopbackDataChannelTransport::CloseChannel(
180 int channel_id) {
181 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, channel_id] {
182 other_->OnRemoteCloseChannel(channel_id);
183 rtc::CritScope lock(&sink_lock_);
184 if (data_sink_) {
185 data_sink_->OnChannelClosed(channel_id);
186 }
187 });
188 return RTCError::OK();
189 }
190
SetDataSink(DataChannelSink * sink)191 void MediaTransportPair::LoopbackDataChannelTransport::SetDataSink(
192 DataChannelSink* sink) {
193 rtc::CritScope lock(&sink_lock_);
194 data_sink_ = sink;
195 if (data_sink_ && ready_to_send_) {
196 data_sink_->OnReadyToSend();
197 }
198 }
199
IsReadyToSend() const200 bool MediaTransportPair::LoopbackDataChannelTransport::IsReadyToSend() const {
201 rtc::CritScope lock(&sink_lock_);
202 return ready_to_send_;
203 }
204
FlushAsyncInvokes()205 void MediaTransportPair::LoopbackDataChannelTransport::FlushAsyncInvokes() {
206 invoker_.Flush(thread_);
207 }
208
OnData(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)209 void MediaTransportPair::LoopbackDataChannelTransport::OnData(
210 int channel_id,
211 DataMessageType type,
212 const rtc::CopyOnWriteBuffer& buffer) {
213 rtc::CritScope lock(&sink_lock_);
214 if (data_sink_) {
215 data_sink_->OnDataReceived(channel_id, type, buffer);
216 }
217 }
218
OnRemoteCloseChannel(int channel_id)219 void MediaTransportPair::LoopbackDataChannelTransport::OnRemoteCloseChannel(
220 int channel_id) {
221 rtc::CritScope lock(&sink_lock_);
222 if (data_sink_) {
223 data_sink_->OnChannelClosing(channel_id);
224 data_sink_->OnChannelClosed(channel_id);
225 }
226 }
227
OnReadyToSend(bool ready_to_send)228 void MediaTransportPair::LoopbackDataChannelTransport::OnReadyToSend(
229 bool ready_to_send) {
230 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, ready_to_send] {
231 rtc::CritScope lock(&sink_lock_);
232 ready_to_send_ = ready_to_send;
233 // Propagate state to data channel sink, if present.
234 if (data_sink_ && ready_to_send_) {
235 data_sink_->OnReadyToSend();
236 }
237 });
238 }
239
LoopbackDatagramTransport(rtc::Thread * thread)240 MediaTransportPair::LoopbackDatagramTransport::LoopbackDatagramTransport(
241 rtc::Thread* thread)
242 : thread_(thread), dc_transport_(thread) {}
243
Connect(LoopbackDatagramTransport * other)244 void MediaTransportPair::LoopbackDatagramTransport::Connect(
245 LoopbackDatagramTransport* other) {
246 other_ = other;
247 dc_transport_.Connect(&other->dc_transport_);
248 }
249
Connect(rtc::PacketTransportInternal * packet_transport)250 void MediaTransportPair::LoopbackDatagramTransport::Connect(
251 rtc::PacketTransportInternal* packet_transport) {
252 if (state_after_connect_) {
253 SetState(*state_after_connect_);
254 }
255 }
256
257 CongestionControlInterface*
congestion_control()258 MediaTransportPair::LoopbackDatagramTransport::congestion_control() {
259 return nullptr;
260 }
261
SetTransportStateCallback(MediaTransportStateCallback * callback)262 void MediaTransportPair::LoopbackDatagramTransport::SetTransportStateCallback(
263 MediaTransportStateCallback* callback) {
264 RTC_DCHECK_RUN_ON(thread_);
265 state_callback_ = callback;
266 if (state_callback_) {
267 state_callback_->OnStateChanged(state_);
268 }
269 }
270
SendDatagram(rtc::ArrayView<const uint8_t> data,DatagramId datagram_id)271 RTCError MediaTransportPair::LoopbackDatagramTransport::SendDatagram(
272 rtc::ArrayView<const uint8_t> data,
273 DatagramId datagram_id) {
274 rtc::CopyOnWriteBuffer buffer;
275 buffer.SetData(data.data(), data.size());
276 invoker_.AsyncInvoke<void>(
277 RTC_FROM_HERE, thread_, [this, datagram_id, buffer = std::move(buffer)] {
278 RTC_DCHECK_RUN_ON(thread_);
279 other_->DeliverDatagram(std::move(buffer));
280 if (sink_) {
281 DatagramAck ack;
282 ack.datagram_id = datagram_id;
283 ack.receive_timestamp = Timestamp::Micros(rtc::TimeMicros());
284 sink_->OnDatagramAcked(ack);
285 }
286 });
287 return RTCError::OK();
288 }
289
GetLargestDatagramSize() const290 size_t MediaTransportPair::LoopbackDatagramTransport::GetLargestDatagramSize()
291 const {
292 return kLoopbackMaxDatagramSize;
293 }
294
SetDatagramSink(DatagramSinkInterface * sink)295 void MediaTransportPair::LoopbackDatagramTransport::SetDatagramSink(
296 DatagramSinkInterface* sink) {
297 RTC_DCHECK_RUN_ON(thread_);
298 sink_ = sink;
299 }
300
301 std::string
GetTransportParameters() const302 MediaTransportPair::LoopbackDatagramTransport::GetTransportParameters() const {
303 return transport_parameters_;
304 }
305
306 RTCError
SetRemoteTransportParameters(absl::string_view remote_parameters)307 MediaTransportPair::LoopbackDatagramTransport::SetRemoteTransportParameters(
308 absl::string_view remote_parameters) {
309 RTC_DCHECK_RUN_ON(thread_);
310 if (transport_parameters_comparison_(GetTransportParameters(),
311 remote_parameters)) {
312 return RTCError::OK();
313 }
314 return RTCError(RTCErrorType::UNSUPPORTED_PARAMETER,
315 "Incompatible remote transport parameters");
316 }
317
OpenChannel(int channel_id)318 RTCError MediaTransportPair::LoopbackDatagramTransport::OpenChannel(
319 int channel_id) {
320 return dc_transport_.OpenChannel(channel_id);
321 }
322
SendData(int channel_id,const SendDataParams & params,const rtc::CopyOnWriteBuffer & buffer)323 RTCError MediaTransportPair::LoopbackDatagramTransport::SendData(
324 int channel_id,
325 const SendDataParams& params,
326 const rtc::CopyOnWriteBuffer& buffer) {
327 return dc_transport_.SendData(channel_id, params, buffer);
328 }
329
CloseChannel(int channel_id)330 RTCError MediaTransportPair::LoopbackDatagramTransport::CloseChannel(
331 int channel_id) {
332 return dc_transport_.CloseChannel(channel_id);
333 }
334
SetDataSink(DataChannelSink * sink)335 void MediaTransportPair::LoopbackDatagramTransport::SetDataSink(
336 DataChannelSink* sink) {
337 dc_transport_.SetDataSink(sink);
338 }
339
IsReadyToSend() const340 bool MediaTransportPair::LoopbackDatagramTransport::IsReadyToSend() const {
341 return dc_transport_.IsReadyToSend();
342 }
343
SetState(MediaTransportState state)344 void MediaTransportPair::LoopbackDatagramTransport::SetState(
345 MediaTransportState state) {
346 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, state] {
347 RTC_DCHECK_RUN_ON(thread_);
348 state_ = state;
349 if (state_callback_) {
350 state_callback_->OnStateChanged(state_);
351 }
352 });
353 dc_transport_.OnReadyToSend(state == MediaTransportState::kWritable);
354 }
355
SetStateAfterConnect(MediaTransportState state)356 void MediaTransportPair::LoopbackDatagramTransport::SetStateAfterConnect(
357 MediaTransportState state) {
358 state_after_connect_ = state;
359 }
360
FlushAsyncInvokes()361 void MediaTransportPair::LoopbackDatagramTransport::FlushAsyncInvokes() {
362 dc_transport_.FlushAsyncInvokes();
363 }
364
DeliverDatagram(rtc::CopyOnWriteBuffer buffer)365 void MediaTransportPair::LoopbackDatagramTransport::DeliverDatagram(
366 rtc::CopyOnWriteBuffer buffer) {
367 RTC_DCHECK_RUN_ON(thread_);
368 if (sink_) {
369 sink_->OnDatagramReceived(buffer);
370 }
371 }
372
373 } // namespace webrtc
374