1 /*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "pc/data_channel_controller.h"
12
13 #include <algorithm>
14 #include <utility>
15
16 #include "absl/algorithm/container.h"
17 #include "absl/types/optional.h"
18 #include "api/peer_connection_interface.h"
19 #include "api/rtc_error.h"
20 #include "pc/peer_connection.h"
21 #include "pc/sctp_utils.h"
22 #include "rtc_base/location.h"
23 #include "rtc_base/logging.h"
24 #include "rtc_base/string_encode.h"
25 #include "rtc_base/task_utils/to_queued_task.h"
26
27 namespace webrtc {
28
HasDataChannels() const29 bool DataChannelController::HasDataChannels() const {
30 RTC_DCHECK_RUN_ON(signaling_thread());
31 return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
32 }
33
SendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)34 bool DataChannelController::SendData(const cricket::SendDataParams& params,
35 const rtc::CopyOnWriteBuffer& payload,
36 cricket::SendDataResult* result) {
37 if (data_channel_transport())
38 return DataChannelSendData(params, payload, result);
39 if (rtp_data_channel())
40 return rtp_data_channel()->SendData(params, payload, result);
41 RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
42 return false;
43 }
44
ConnectDataChannel(RtpDataChannel * webrtc_data_channel)45 bool DataChannelController::ConnectDataChannel(
46 RtpDataChannel* webrtc_data_channel) {
47 RTC_DCHECK_RUN_ON(signaling_thread());
48 if (!rtp_data_channel()) {
49 // Don't log an error here, because DataChannels are expected to call
50 // ConnectDataChannel in this state. It's the only way to initially tell
51 // whether or not the underlying transport is ready.
52 return false;
53 }
54 rtp_data_channel()->SignalReadyToSendData.connect(
55 webrtc_data_channel, &RtpDataChannel::OnChannelReady);
56 rtp_data_channel()->SignalDataReceived.connect(
57 webrtc_data_channel, &RtpDataChannel::OnDataReceived);
58 return true;
59 }
60
DisconnectDataChannel(RtpDataChannel * webrtc_data_channel)61 void DataChannelController::DisconnectDataChannel(
62 RtpDataChannel* webrtc_data_channel) {
63 RTC_DCHECK_RUN_ON(signaling_thread());
64 if (!rtp_data_channel()) {
65 RTC_LOG(LS_ERROR)
66 << "DisconnectDataChannel called when rtp_data_channel_ is NULL.";
67 return;
68 }
69 rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel);
70 rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel);
71 }
72
ConnectDataChannel(SctpDataChannel * webrtc_data_channel)73 bool DataChannelController::ConnectDataChannel(
74 SctpDataChannel* webrtc_data_channel) {
75 RTC_DCHECK_RUN_ON(signaling_thread());
76 if (!data_channel_transport()) {
77 // Don't log an error here, because DataChannels are expected to call
78 // ConnectDataChannel in this state. It's the only way to initially tell
79 // whether or not the underlying transport is ready.
80 return false;
81 }
82 SignalDataChannelTransportWritable_s.connect(
83 webrtc_data_channel, &SctpDataChannel::OnTransportReady);
84 SignalDataChannelTransportReceivedData_s.connect(
85 webrtc_data_channel, &SctpDataChannel::OnDataReceived);
86 SignalDataChannelTransportChannelClosing_s.connect(
87 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
88 SignalDataChannelTransportChannelClosed_s.connect(
89 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
90 return true;
91 }
92
DisconnectDataChannel(SctpDataChannel * webrtc_data_channel)93 void DataChannelController::DisconnectDataChannel(
94 SctpDataChannel* webrtc_data_channel) {
95 RTC_DCHECK_RUN_ON(signaling_thread());
96 if (!data_channel_transport()) {
97 RTC_LOG(LS_ERROR)
98 << "DisconnectDataChannel called when sctp_transport_ is NULL.";
99 return;
100 }
101 SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
102 SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
103 SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
104 SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
105 }
106
AddSctpDataStream(int sid)107 void DataChannelController::AddSctpDataStream(int sid) {
108 if (data_channel_transport()) {
109 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
110 if (data_channel_transport()) {
111 data_channel_transport()->OpenChannel(sid);
112 }
113 });
114 }
115 }
116
RemoveSctpDataStream(int sid)117 void DataChannelController::RemoveSctpDataStream(int sid) {
118 if (data_channel_transport()) {
119 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
120 if (data_channel_transport()) {
121 data_channel_transport()->CloseChannel(sid);
122 }
123 });
124 }
125 }
126
ReadyToSendData() const127 bool DataChannelController::ReadyToSendData() const {
128 RTC_DCHECK_RUN_ON(signaling_thread());
129 return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
130 (data_channel_transport() && data_channel_transport_ready_to_send_);
131 }
132
OnDataReceived(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)133 void DataChannelController::OnDataReceived(
134 int channel_id,
135 DataMessageType type,
136 const rtc::CopyOnWriteBuffer& buffer) {
137 RTC_DCHECK_RUN_ON(network_thread());
138 cricket::ReceiveDataParams params;
139 params.sid = channel_id;
140 params.type = ToCricketDataMessageType(type);
141 signaling_thread()->PostTask(
142 ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] {
143 if (self) {
144 RTC_DCHECK_RUN_ON(self->signaling_thread());
145 // TODO(bugs.webrtc.org/11547): The data being received should be
146 // delivered on the network thread. The way HandleOpenMessage_s works
147 // right now is that it's called for all types of buffers and operates
148 // as a selector function. Change this so that it's only called for
149 // buffers that it should be able to handle. Once we do that, we can
150 // deliver all other buffers on the network thread (change
151 // SignalDataChannelTransportReceivedData_s to
152 // SignalDataChannelTransportReceivedData_n).
153 if (!self->HandleOpenMessage_s(params, buffer)) {
154 self->SignalDataChannelTransportReceivedData_s(params, buffer);
155 }
156 }
157 }));
158 }
159
OnChannelClosing(int channel_id)160 void DataChannelController::OnChannelClosing(int channel_id) {
161 RTC_DCHECK_RUN_ON(network_thread());
162 signaling_thread()->PostTask(
163 ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
164 if (self) {
165 RTC_DCHECK_RUN_ON(self->signaling_thread());
166 self->SignalDataChannelTransportChannelClosing_s(channel_id);
167 }
168 }));
169 }
170
OnChannelClosed(int channel_id)171 void DataChannelController::OnChannelClosed(int channel_id) {
172 RTC_DCHECK_RUN_ON(network_thread());
173 signaling_thread()->PostTask(
174 ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
175 if (self) {
176 RTC_DCHECK_RUN_ON(self->signaling_thread());
177 self->SignalDataChannelTransportChannelClosed_s(channel_id);
178 }
179 }));
180 }
181
OnReadyToSend()182 void DataChannelController::OnReadyToSend() {
183 RTC_DCHECK_RUN_ON(network_thread());
184 signaling_thread()->PostTask(
185 ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
186 if (self) {
187 RTC_DCHECK_RUN_ON(self->signaling_thread());
188 self->data_channel_transport_ready_to_send_ = true;
189 self->SignalDataChannelTransportWritable_s(
190 self->data_channel_transport_ready_to_send_);
191 }
192 }));
193 }
194
OnTransportClosed()195 void DataChannelController::OnTransportClosed() {
196 RTC_DCHECK_RUN_ON(network_thread());
197 signaling_thread()->PostTask(
198 ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
199 if (self) {
200 RTC_DCHECK_RUN_ON(self->signaling_thread());
201 self->OnTransportChannelClosed();
202 }
203 }));
204 }
205
SetupDataChannelTransport_n()206 void DataChannelController::SetupDataChannelTransport_n() {
207 RTC_DCHECK_RUN_ON(network_thread());
208
209 // There's a new data channel transport. This needs to be signaled to the
210 // |sctp_data_channels_| so that they can reopen and reconnect. This is
211 // necessary when bundling is applied.
212 NotifyDataChannelsOfTransportCreated();
213 }
214
TeardownDataChannelTransport_n()215 void DataChannelController::TeardownDataChannelTransport_n() {
216 RTC_DCHECK_RUN_ON(network_thread());
217 if (data_channel_transport()) {
218 data_channel_transport()->SetDataSink(nullptr);
219 }
220 set_data_channel_transport(nullptr);
221 }
222
OnTransportChanged(DataChannelTransportInterface * new_data_channel_transport)223 void DataChannelController::OnTransportChanged(
224 DataChannelTransportInterface* new_data_channel_transport) {
225 RTC_DCHECK_RUN_ON(network_thread());
226 if (data_channel_transport() &&
227 data_channel_transport() != new_data_channel_transport) {
228 // Changed which data channel transport is used for |sctp_mid_| (eg. now
229 // it's bundled).
230 data_channel_transport()->SetDataSink(nullptr);
231 set_data_channel_transport(new_data_channel_transport);
232 if (new_data_channel_transport) {
233 new_data_channel_transport->SetDataSink(this);
234
235 // There's a new data channel transport. This needs to be signaled to the
236 // |sctp_data_channels_| so that they can reopen and reconnect. This is
237 // necessary when bundling is applied.
238 NotifyDataChannelsOfTransportCreated();
239 }
240 }
241 }
242
GetDataChannelStats() const243 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
244 const {
245 RTC_DCHECK_RUN_ON(signaling_thread());
246 std::vector<DataChannelStats> stats;
247 stats.reserve(sctp_data_channels_.size());
248 for (const auto& channel : sctp_data_channels_)
249 stats.push_back(channel->GetStats());
250 return stats;
251 }
252
HandleOpenMessage_s(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)253 bool DataChannelController::HandleOpenMessage_s(
254 const cricket::ReceiveDataParams& params,
255 const rtc::CopyOnWriteBuffer& buffer) {
256 if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
257 // Received OPEN message; parse and signal that a new data channel should
258 // be created.
259 std::string label;
260 InternalDataChannelInit config;
261 config.id = params.ssrc;
262 if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
263 RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc "
264 << params.ssrc;
265 return true;
266 }
267 config.open_handshake_role = InternalDataChannelInit::kAcker;
268 OnDataChannelOpenMessage(label, config);
269 return true;
270 }
271 return false;
272 }
273
OnDataChannelOpenMessage(const std::string & label,const InternalDataChannelInit & config)274 void DataChannelController::OnDataChannelOpenMessage(
275 const std::string& label,
276 const InternalDataChannelInit& config) {
277 rtc::scoped_refptr<DataChannelInterface> channel(
278 InternalCreateDataChannelWithProxy(label, &config));
279 if (!channel.get()) {
280 RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
281 return;
282 }
283
284 pc_->Observer()->OnDataChannel(std::move(channel));
285 pc_->NoteDataAddedEvent();
286 }
287
288 rtc::scoped_refptr<DataChannelInterface>
InternalCreateDataChannelWithProxy(const std::string & label,const InternalDataChannelInit * config)289 DataChannelController::InternalCreateDataChannelWithProxy(
290 const std::string& label,
291 const InternalDataChannelInit* config) {
292 RTC_DCHECK_RUN_ON(signaling_thread());
293 if (pc_->IsClosed()) {
294 return nullptr;
295 }
296 if (data_channel_type_ == cricket::DCT_NONE) {
297 RTC_LOG(LS_ERROR)
298 << "InternalCreateDataChannel: Data is not supported in this call.";
299 return nullptr;
300 }
301 if (IsSctpLike(data_channel_type())) {
302 rtc::scoped_refptr<SctpDataChannel> channel =
303 InternalCreateSctpDataChannel(label, config);
304 if (channel) {
305 return SctpDataChannel::CreateProxy(channel);
306 }
307 } else if (data_channel_type() == cricket::DCT_RTP) {
308 rtc::scoped_refptr<RtpDataChannel> channel =
309 InternalCreateRtpDataChannel(label, config);
310 if (channel) {
311 return RtpDataChannel::CreateProxy(channel);
312 }
313 }
314
315 return nullptr;
316 }
317
318 rtc::scoped_refptr<RtpDataChannel>
InternalCreateRtpDataChannel(const std::string & label,const DataChannelInit * config)319 DataChannelController::InternalCreateRtpDataChannel(
320 const std::string& label,
321 const DataChannelInit* config) {
322 RTC_DCHECK_RUN_ON(signaling_thread());
323 DataChannelInit new_config = config ? (*config) : DataChannelInit();
324 rtc::scoped_refptr<RtpDataChannel> channel(
325 RtpDataChannel::Create(this, label, new_config, signaling_thread()));
326 if (!channel) {
327 return nullptr;
328 }
329 if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
330 RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
331 << " already exists.";
332 return nullptr;
333 }
334 rtp_data_channels_[channel->label()] = channel;
335 SignalRtpDataChannelCreated_(channel.get());
336 return channel;
337 }
338
339 rtc::scoped_refptr<SctpDataChannel>
InternalCreateSctpDataChannel(const std::string & label,const InternalDataChannelInit * config)340 DataChannelController::InternalCreateSctpDataChannel(
341 const std::string& label,
342 const InternalDataChannelInit* config) {
343 RTC_DCHECK_RUN_ON(signaling_thread());
344 InternalDataChannelInit new_config =
345 config ? (*config) : InternalDataChannelInit();
346 if (new_config.id < 0) {
347 rtc::SSLRole role;
348 if ((pc_->GetSctpSslRole(&role)) &&
349 !sid_allocator_.AllocateSid(role, &new_config.id)) {
350 RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
351 return nullptr;
352 }
353 } else if (!sid_allocator_.ReserveSid(new_config.id)) {
354 RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
355 "because the id is already in use or out of range.";
356 return nullptr;
357 }
358 rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
359 this, label, new_config, signaling_thread(), network_thread()));
360 if (!channel) {
361 sid_allocator_.ReleaseSid(new_config.id);
362 return nullptr;
363 }
364 sctp_data_channels_.push_back(channel);
365 channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed);
366 SignalSctpDataChannelCreated_(channel.get());
367 return channel;
368 }
369
AllocateSctpSids(rtc::SSLRole role)370 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
371 RTC_DCHECK_RUN_ON(signaling_thread());
372 std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
373 for (const auto& channel : sctp_data_channels_) {
374 if (channel->id() < 0) {
375 int sid;
376 if (!sid_allocator_.AllocateSid(role, &sid)) {
377 RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
378 channels_to_close.push_back(channel);
379 continue;
380 }
381 channel->SetSctpSid(sid);
382 }
383 }
384 // Since closing modifies the list of channels, we have to do the actual
385 // closing outside the loop.
386 for (const auto& channel : channels_to_close) {
387 channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
388 }
389 }
390
OnSctpDataChannelClosed(SctpDataChannel * channel)391 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
392 RTC_DCHECK_RUN_ON(signaling_thread());
393 for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
394 ++it) {
395 if (it->get() == channel) {
396 if (channel->id() >= 0) {
397 // After the closing procedure is done, it's safe to use this ID for
398 // another data channel.
399 sid_allocator_.ReleaseSid(channel->id());
400 }
401 // Since this method is triggered by a signal from the DataChannel,
402 // we can't free it directly here; we need to free it asynchronously.
403 sctp_data_channels_to_free_.push_back(*it);
404 sctp_data_channels_.erase(it);
405 signaling_thread()->PostTask(
406 ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
407 if (self) {
408 RTC_DCHECK_RUN_ON(self->signaling_thread());
409 self->sctp_data_channels_to_free_.clear();
410 }
411 }));
412 return;
413 }
414 }
415 }
416
OnTransportChannelClosed()417 void DataChannelController::OnTransportChannelClosed() {
418 RTC_DCHECK_RUN_ON(signaling_thread());
419 // Use a temporary copy of the RTP/SCTP DataChannel list because the
420 // DataChannel may callback to us and try to modify the list.
421 std::map<std::string, rtc::scoped_refptr<RtpDataChannel>> temp_rtp_dcs;
422 temp_rtp_dcs.swap(rtp_data_channels_);
423 for (const auto& kv : temp_rtp_dcs) {
424 kv.second->OnTransportChannelClosed();
425 }
426
427 std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
428 temp_sctp_dcs.swap(sctp_data_channels_);
429 for (const auto& channel : temp_sctp_dcs) {
430 channel->OnTransportChannelClosed();
431 }
432 }
433
FindDataChannelBySid(int sid) const434 SctpDataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
435 RTC_DCHECK_RUN_ON(signaling_thread());
436 for (const auto& channel : sctp_data_channels_) {
437 if (channel->id() == sid) {
438 return channel;
439 }
440 }
441 return nullptr;
442 }
443
UpdateLocalRtpDataChannels(const cricket::StreamParamsVec & streams)444 void DataChannelController::UpdateLocalRtpDataChannels(
445 const cricket::StreamParamsVec& streams) {
446 std::vector<std::string> existing_channels;
447
448 RTC_DCHECK_RUN_ON(signaling_thread());
449 // Find new and active data channels.
450 for (const cricket::StreamParams& params : streams) {
451 // |it->sync_label| is actually the data channel label. The reason is that
452 // we use the same naming of data channels as we do for
453 // MediaStreams and Tracks.
454 // For MediaStreams, the sync_label is the MediaStream label and the
455 // track label is the same as |streamid|.
456 const std::string& channel_label = params.first_stream_id();
457 auto data_channel_it = rtp_data_channels()->find(channel_label);
458 if (data_channel_it == rtp_data_channels()->end()) {
459 RTC_LOG(LS_ERROR) << "channel label not found";
460 continue;
461 }
462 // Set the SSRC the data channel should use for sending.
463 data_channel_it->second->SetSendSsrc(params.first_ssrc());
464 existing_channels.push_back(data_channel_it->first);
465 }
466
467 UpdateClosingRtpDataChannels(existing_channels, true);
468 }
469
UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec & streams)470 void DataChannelController::UpdateRemoteRtpDataChannels(
471 const cricket::StreamParamsVec& streams) {
472 RTC_DCHECK_RUN_ON(signaling_thread());
473
474 std::vector<std::string> existing_channels;
475
476 // Find new and active data channels.
477 for (const cricket::StreamParams& params : streams) {
478 // The data channel label is either the mslabel or the SSRC if the mslabel
479 // does not exist. Ex a=ssrc:444330170 mslabel:test1.
480 std::string label = params.first_stream_id().empty()
481 ? rtc::ToString(params.first_ssrc())
482 : params.first_stream_id();
483 auto data_channel_it = rtp_data_channels()->find(label);
484 if (data_channel_it == rtp_data_channels()->end()) {
485 // This is a new data channel.
486 CreateRemoteRtpDataChannel(label, params.first_ssrc());
487 } else {
488 data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
489 }
490 existing_channels.push_back(label);
491 }
492
493 UpdateClosingRtpDataChannels(existing_channels, false);
494 }
495
data_channel_type() const496 cricket::DataChannelType DataChannelController::data_channel_type() const {
497 // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
498 // RTC_DCHECK_RUN_ON(signaling_thread());
499 return data_channel_type_;
500 }
501
set_data_channel_type(cricket::DataChannelType type)502 void DataChannelController::set_data_channel_type(
503 cricket::DataChannelType type) {
504 RTC_DCHECK_RUN_ON(signaling_thread());
505 data_channel_type_ = type;
506 }
507
rtp_data_channel() const508 cricket::RtpDataChannel* DataChannelController::rtp_data_channel() const {
509 // TODO(bugs.webrtc.org/9987): Only allow this accessor to be called on the
510 // network thread.
511 // RTC_DCHECK_RUN_ON(network_thread());
512 return rtp_data_channel_;
513 }
514
set_rtp_data_channel(cricket::RtpDataChannel * channel)515 void DataChannelController::set_rtp_data_channel(
516 cricket::RtpDataChannel* channel) {
517 RTC_DCHECK_RUN_ON(network_thread());
518 rtp_data_channel_ = channel;
519 }
520
data_channel_transport() const521 DataChannelTransportInterface* DataChannelController::data_channel_transport()
522 const {
523 // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
524 // network thread.
525 // RTC_DCHECK_RUN_ON(network_thread());
526 return data_channel_transport_;
527 }
528
set_data_channel_transport(DataChannelTransportInterface * transport)529 void DataChannelController::set_data_channel_transport(
530 DataChannelTransportInterface* transport) {
531 RTC_DCHECK_RUN_ON(network_thread());
532 data_channel_transport_ = transport;
533 }
534
535 const std::map<std::string, rtc::scoped_refptr<RtpDataChannel>>*
rtp_data_channels() const536 DataChannelController::rtp_data_channels() const {
537 RTC_DCHECK_RUN_ON(signaling_thread());
538 return &rtp_data_channels_;
539 }
540
UpdateClosingRtpDataChannels(const std::vector<std::string> & active_channels,bool is_local_update)541 void DataChannelController::UpdateClosingRtpDataChannels(
542 const std::vector<std::string>& active_channels,
543 bool is_local_update) {
544 auto it = rtp_data_channels_.begin();
545 while (it != rtp_data_channels_.end()) {
546 RtpDataChannel* data_channel = it->second;
547 if (absl::c_linear_search(active_channels, data_channel->label())) {
548 ++it;
549 continue;
550 }
551
552 if (is_local_update) {
553 data_channel->SetSendSsrc(0);
554 } else {
555 data_channel->RemotePeerRequestClose();
556 }
557
558 if (data_channel->state() == RtpDataChannel::kClosed) {
559 rtp_data_channels_.erase(it);
560 it = rtp_data_channels_.begin();
561 } else {
562 ++it;
563 }
564 }
565 }
566
CreateRemoteRtpDataChannel(const std::string & label,uint32_t remote_ssrc)567 void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
568 uint32_t remote_ssrc) {
569 if (data_channel_type() != cricket::DCT_RTP) {
570 return;
571 }
572 rtc::scoped_refptr<RtpDataChannel> channel(
573 InternalCreateRtpDataChannel(label, nullptr));
574 if (!channel.get()) {
575 RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
576 "CreateDataChannel failed.";
577 return;
578 }
579 channel->SetReceiveSsrc(remote_ssrc);
580 rtc::scoped_refptr<DataChannelInterface> proxy_channel =
581 RtpDataChannel::CreateProxy(std::move(channel));
582 pc_->Observer()->OnDataChannel(std::move(proxy_channel));
583 }
584
DataChannelSendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)585 bool DataChannelController::DataChannelSendData(
586 const cricket::SendDataParams& params,
587 const rtc::CopyOnWriteBuffer& payload,
588 cricket::SendDataResult* result) {
589 // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
590 // thread instead. Remove the Invoke() below and move assocated state to
591 // the network thread.
592 RTC_DCHECK_RUN_ON(signaling_thread());
593 RTC_DCHECK(data_channel_transport());
594
595 SendDataParams send_params;
596 send_params.type = ToWebrtcDataMessageType(params.type);
597 send_params.ordered = params.ordered;
598 if (params.max_rtx_count >= 0) {
599 send_params.max_rtx_count = params.max_rtx_count;
600 } else if (params.max_rtx_ms >= 0) {
601 send_params.max_rtx_ms = params.max_rtx_ms;
602 }
603
604 RTCError error = network_thread()->Invoke<RTCError>(
605 RTC_FROM_HERE, [this, params, send_params, payload] {
606 return data_channel_transport()->SendData(params.sid, send_params,
607 payload);
608 });
609
610 if (error.ok()) {
611 *result = cricket::SendDataResult::SDR_SUCCESS;
612 return true;
613 } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
614 // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
615 // TODO(mellem): Stop using RTCError here and get rid of the mapping.
616 *result = cricket::SendDataResult::SDR_BLOCK;
617 return false;
618 }
619 *result = cricket::SendDataResult::SDR_ERROR;
620 return false;
621 }
622
NotifyDataChannelsOfTransportCreated()623 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
624 RTC_DCHECK_RUN_ON(network_thread());
625 signaling_thread()->PostTask(
626 ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
627 if (self) {
628 RTC_DCHECK_RUN_ON(self->signaling_thread());
629 for (const auto& channel : self->sctp_data_channels_) {
630 channel->OnTransportChannelCreated();
631 }
632 }
633 }));
634 }
635
network_thread() const636 rtc::Thread* DataChannelController::network_thread() const {
637 return pc_->network_thread();
638 }
signaling_thread() const639 rtc::Thread* DataChannelController::signaling_thread() const {
640 return pc_->signaling_thread();
641 }
642
643 } // namespace webrtc
644