1 /*
2  *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "pc/data_channel_controller.h"
12 
13 #include <algorithm>
14 #include <utility>
15 
16 #include "absl/algorithm/container.h"
17 #include "absl/types/optional.h"
18 #include "api/peer_connection_interface.h"
19 #include "api/rtc_error.h"
20 #include "pc/peer_connection.h"
21 #include "pc/sctp_utils.h"
22 #include "rtc_base/location.h"
23 #include "rtc_base/logging.h"
24 #include "rtc_base/string_encode.h"
25 #include "rtc_base/task_utils/to_queued_task.h"
26 
27 namespace webrtc {
28 
HasDataChannels() const29 bool DataChannelController::HasDataChannels() const {
30   RTC_DCHECK_RUN_ON(signaling_thread());
31   return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
32 }
33 
SendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)34 bool DataChannelController::SendData(const cricket::SendDataParams& params,
35                                      const rtc::CopyOnWriteBuffer& payload,
36                                      cricket::SendDataResult* result) {
37   if (data_channel_transport())
38     return DataChannelSendData(params, payload, result);
39   if (rtp_data_channel())
40     return rtp_data_channel()->SendData(params, payload, result);
41   RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
42   return false;
43 }
44 
ConnectDataChannel(RtpDataChannel * webrtc_data_channel)45 bool DataChannelController::ConnectDataChannel(
46     RtpDataChannel* webrtc_data_channel) {
47   RTC_DCHECK_RUN_ON(signaling_thread());
48   if (!rtp_data_channel()) {
49     // Don't log an error here, because DataChannels are expected to call
50     // ConnectDataChannel in this state. It's the only way to initially tell
51     // whether or not the underlying transport is ready.
52     return false;
53   }
54   rtp_data_channel()->SignalReadyToSendData.connect(
55       webrtc_data_channel, &RtpDataChannel::OnChannelReady);
56   rtp_data_channel()->SignalDataReceived.connect(
57       webrtc_data_channel, &RtpDataChannel::OnDataReceived);
58   return true;
59 }
60 
DisconnectDataChannel(RtpDataChannel * webrtc_data_channel)61 void DataChannelController::DisconnectDataChannel(
62     RtpDataChannel* webrtc_data_channel) {
63   RTC_DCHECK_RUN_ON(signaling_thread());
64   if (!rtp_data_channel()) {
65     RTC_LOG(LS_ERROR)
66         << "DisconnectDataChannel called when rtp_data_channel_ is NULL.";
67     return;
68   }
69   rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel);
70   rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel);
71 }
72 
ConnectDataChannel(SctpDataChannel * webrtc_data_channel)73 bool DataChannelController::ConnectDataChannel(
74     SctpDataChannel* webrtc_data_channel) {
75   RTC_DCHECK_RUN_ON(signaling_thread());
76   if (!data_channel_transport()) {
77     // Don't log an error here, because DataChannels are expected to call
78     // ConnectDataChannel in this state. It's the only way to initially tell
79     // whether or not the underlying transport is ready.
80     return false;
81   }
82   SignalDataChannelTransportWritable_s.connect(
83       webrtc_data_channel, &SctpDataChannel::OnTransportReady);
84   SignalDataChannelTransportReceivedData_s.connect(
85       webrtc_data_channel, &SctpDataChannel::OnDataReceived);
86   SignalDataChannelTransportChannelClosing_s.connect(
87       webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
88   SignalDataChannelTransportChannelClosed_s.connect(
89       webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
90   return true;
91 }
92 
DisconnectDataChannel(SctpDataChannel * webrtc_data_channel)93 void DataChannelController::DisconnectDataChannel(
94     SctpDataChannel* webrtc_data_channel) {
95   RTC_DCHECK_RUN_ON(signaling_thread());
96   if (!data_channel_transport()) {
97     RTC_LOG(LS_ERROR)
98         << "DisconnectDataChannel called when sctp_transport_ is NULL.";
99     return;
100   }
101   SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
102   SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
103   SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
104   SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
105 }
106 
AddSctpDataStream(int sid)107 void DataChannelController::AddSctpDataStream(int sid) {
108   if (data_channel_transport()) {
109     network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
110       if (data_channel_transport()) {
111         data_channel_transport()->OpenChannel(sid);
112       }
113     });
114   }
115 }
116 
RemoveSctpDataStream(int sid)117 void DataChannelController::RemoveSctpDataStream(int sid) {
118   if (data_channel_transport()) {
119     network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
120       if (data_channel_transport()) {
121         data_channel_transport()->CloseChannel(sid);
122       }
123     });
124   }
125 }
126 
ReadyToSendData() const127 bool DataChannelController::ReadyToSendData() const {
128   RTC_DCHECK_RUN_ON(signaling_thread());
129   return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
130          (data_channel_transport() && data_channel_transport_ready_to_send_);
131 }
132 
OnDataReceived(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)133 void DataChannelController::OnDataReceived(
134     int channel_id,
135     DataMessageType type,
136     const rtc::CopyOnWriteBuffer& buffer) {
137   RTC_DCHECK_RUN_ON(network_thread());
138   cricket::ReceiveDataParams params;
139   params.sid = channel_id;
140   params.type = ToCricketDataMessageType(type);
141   signaling_thread()->PostTask(
142       ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] {
143         if (self) {
144           RTC_DCHECK_RUN_ON(self->signaling_thread());
145           // TODO(bugs.webrtc.org/11547): The data being received should be
146           // delivered on the network thread. The way HandleOpenMessage_s works
147           // right now is that it's called for all types of buffers and operates
148           // as a selector function. Change this so that it's only called for
149           // buffers that it should be able to handle. Once we do that, we can
150           // deliver all other buffers on the network thread (change
151           // SignalDataChannelTransportReceivedData_s to
152           // SignalDataChannelTransportReceivedData_n).
153           if (!self->HandleOpenMessage_s(params, buffer)) {
154             self->SignalDataChannelTransportReceivedData_s(params, buffer);
155           }
156         }
157       }));
158 }
159 
OnChannelClosing(int channel_id)160 void DataChannelController::OnChannelClosing(int channel_id) {
161   RTC_DCHECK_RUN_ON(network_thread());
162   signaling_thread()->PostTask(
163       ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
164         if (self) {
165           RTC_DCHECK_RUN_ON(self->signaling_thread());
166           self->SignalDataChannelTransportChannelClosing_s(channel_id);
167         }
168       }));
169 }
170 
OnChannelClosed(int channel_id)171 void DataChannelController::OnChannelClosed(int channel_id) {
172   RTC_DCHECK_RUN_ON(network_thread());
173   signaling_thread()->PostTask(
174       ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
175         if (self) {
176           RTC_DCHECK_RUN_ON(self->signaling_thread());
177           self->SignalDataChannelTransportChannelClosed_s(channel_id);
178         }
179       }));
180 }
181 
OnReadyToSend()182 void DataChannelController::OnReadyToSend() {
183   RTC_DCHECK_RUN_ON(network_thread());
184   signaling_thread()->PostTask(
185       ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
186         if (self) {
187           RTC_DCHECK_RUN_ON(self->signaling_thread());
188           self->data_channel_transport_ready_to_send_ = true;
189           self->SignalDataChannelTransportWritable_s(
190               self->data_channel_transport_ready_to_send_);
191         }
192       }));
193 }
194 
OnTransportClosed()195 void DataChannelController::OnTransportClosed() {
196   RTC_DCHECK_RUN_ON(network_thread());
197   signaling_thread()->PostTask(
198       ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
199         if (self) {
200           RTC_DCHECK_RUN_ON(self->signaling_thread());
201           self->OnTransportChannelClosed();
202         }
203       }));
204 }
205 
SetupDataChannelTransport_n()206 void DataChannelController::SetupDataChannelTransport_n() {
207   RTC_DCHECK_RUN_ON(network_thread());
208 
209   // There's a new data channel transport.  This needs to be signaled to the
210   // |sctp_data_channels_| so that they can reopen and reconnect.  This is
211   // necessary when bundling is applied.
212   NotifyDataChannelsOfTransportCreated();
213 }
214 
TeardownDataChannelTransport_n()215 void DataChannelController::TeardownDataChannelTransport_n() {
216   RTC_DCHECK_RUN_ON(network_thread());
217   if (data_channel_transport()) {
218     data_channel_transport()->SetDataSink(nullptr);
219   }
220   set_data_channel_transport(nullptr);
221 }
222 
OnTransportChanged(DataChannelTransportInterface * new_data_channel_transport)223 void DataChannelController::OnTransportChanged(
224     DataChannelTransportInterface* new_data_channel_transport) {
225   RTC_DCHECK_RUN_ON(network_thread());
226   if (data_channel_transport() &&
227       data_channel_transport() != new_data_channel_transport) {
228     // Changed which data channel transport is used for |sctp_mid_| (eg. now
229     // it's bundled).
230     data_channel_transport()->SetDataSink(nullptr);
231     set_data_channel_transport(new_data_channel_transport);
232     if (new_data_channel_transport) {
233       new_data_channel_transport->SetDataSink(this);
234 
235       // There's a new data channel transport.  This needs to be signaled to the
236       // |sctp_data_channels_| so that they can reopen and reconnect.  This is
237       // necessary when bundling is applied.
238       NotifyDataChannelsOfTransportCreated();
239     }
240   }
241 }
242 
GetDataChannelStats() const243 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
244     const {
245   RTC_DCHECK_RUN_ON(signaling_thread());
246   std::vector<DataChannelStats> stats;
247   stats.reserve(sctp_data_channels_.size());
248   for (const auto& channel : sctp_data_channels_)
249     stats.push_back(channel->GetStats());
250   return stats;
251 }
252 
HandleOpenMessage_s(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)253 bool DataChannelController::HandleOpenMessage_s(
254     const cricket::ReceiveDataParams& params,
255     const rtc::CopyOnWriteBuffer& buffer) {
256   if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
257     // Received OPEN message; parse and signal that a new data channel should
258     // be created.
259     std::string label;
260     InternalDataChannelInit config;
261     config.id = params.ssrc;
262     if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
263       RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc "
264                           << params.ssrc;
265       return true;
266     }
267     config.open_handshake_role = InternalDataChannelInit::kAcker;
268     OnDataChannelOpenMessage(label, config);
269     return true;
270   }
271   return false;
272 }
273 
OnDataChannelOpenMessage(const std::string & label,const InternalDataChannelInit & config)274 void DataChannelController::OnDataChannelOpenMessage(
275     const std::string& label,
276     const InternalDataChannelInit& config) {
277   rtc::scoped_refptr<DataChannelInterface> channel(
278       InternalCreateDataChannelWithProxy(label, &config));
279   if (!channel.get()) {
280     RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
281     return;
282   }
283 
284   pc_->Observer()->OnDataChannel(std::move(channel));
285   pc_->NoteDataAddedEvent();
286 }
287 
288 rtc::scoped_refptr<DataChannelInterface>
InternalCreateDataChannelWithProxy(const std::string & label,const InternalDataChannelInit * config)289 DataChannelController::InternalCreateDataChannelWithProxy(
290     const std::string& label,
291     const InternalDataChannelInit* config) {
292   RTC_DCHECK_RUN_ON(signaling_thread());
293   if (pc_->IsClosed()) {
294     return nullptr;
295   }
296   if (data_channel_type_ == cricket::DCT_NONE) {
297     RTC_LOG(LS_ERROR)
298         << "InternalCreateDataChannel: Data is not supported in this call.";
299     return nullptr;
300   }
301   if (IsSctpLike(data_channel_type())) {
302     rtc::scoped_refptr<SctpDataChannel> channel =
303         InternalCreateSctpDataChannel(label, config);
304     if (channel) {
305       return SctpDataChannel::CreateProxy(channel);
306     }
307   } else if (data_channel_type() == cricket::DCT_RTP) {
308     rtc::scoped_refptr<RtpDataChannel> channel =
309         InternalCreateRtpDataChannel(label, config);
310     if (channel) {
311       return RtpDataChannel::CreateProxy(channel);
312     }
313   }
314 
315   return nullptr;
316 }
317 
318 rtc::scoped_refptr<RtpDataChannel>
InternalCreateRtpDataChannel(const std::string & label,const DataChannelInit * config)319 DataChannelController::InternalCreateRtpDataChannel(
320     const std::string& label,
321     const DataChannelInit* config) {
322   RTC_DCHECK_RUN_ON(signaling_thread());
323   DataChannelInit new_config = config ? (*config) : DataChannelInit();
324   rtc::scoped_refptr<RtpDataChannel> channel(
325       RtpDataChannel::Create(this, label, new_config, signaling_thread()));
326   if (!channel) {
327     return nullptr;
328   }
329   if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
330     RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
331                       << " already exists.";
332     return nullptr;
333   }
334   rtp_data_channels_[channel->label()] = channel;
335   SignalRtpDataChannelCreated_(channel.get());
336   return channel;
337 }
338 
339 rtc::scoped_refptr<SctpDataChannel>
InternalCreateSctpDataChannel(const std::string & label,const InternalDataChannelInit * config)340 DataChannelController::InternalCreateSctpDataChannel(
341     const std::string& label,
342     const InternalDataChannelInit* config) {
343   RTC_DCHECK_RUN_ON(signaling_thread());
344   InternalDataChannelInit new_config =
345       config ? (*config) : InternalDataChannelInit();
346   if (new_config.id < 0) {
347     rtc::SSLRole role;
348     if ((pc_->GetSctpSslRole(&role)) &&
349         !sid_allocator_.AllocateSid(role, &new_config.id)) {
350       RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
351       return nullptr;
352     }
353   } else if (!sid_allocator_.ReserveSid(new_config.id)) {
354     RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
355                          "because the id is already in use or out of range.";
356     return nullptr;
357   }
358   rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
359       this, label, new_config, signaling_thread(), network_thread()));
360   if (!channel) {
361     sid_allocator_.ReleaseSid(new_config.id);
362     return nullptr;
363   }
364   sctp_data_channels_.push_back(channel);
365   channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed);
366   SignalSctpDataChannelCreated_(channel.get());
367   return channel;
368 }
369 
AllocateSctpSids(rtc::SSLRole role)370 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
371   RTC_DCHECK_RUN_ON(signaling_thread());
372   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
373   for (const auto& channel : sctp_data_channels_) {
374     if (channel->id() < 0) {
375       int sid;
376       if (!sid_allocator_.AllocateSid(role, &sid)) {
377         RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
378         channels_to_close.push_back(channel);
379         continue;
380       }
381       channel->SetSctpSid(sid);
382     }
383   }
384   // Since closing modifies the list of channels, we have to do the actual
385   // closing outside the loop.
386   for (const auto& channel : channels_to_close) {
387     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
388   }
389 }
390 
OnSctpDataChannelClosed(SctpDataChannel * channel)391 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
392   RTC_DCHECK_RUN_ON(signaling_thread());
393   for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
394        ++it) {
395     if (it->get() == channel) {
396       if (channel->id() >= 0) {
397         // After the closing procedure is done, it's safe to use this ID for
398         // another data channel.
399         sid_allocator_.ReleaseSid(channel->id());
400       }
401       // Since this method is triggered by a signal from the DataChannel,
402       // we can't free it directly here; we need to free it asynchronously.
403       sctp_data_channels_to_free_.push_back(*it);
404       sctp_data_channels_.erase(it);
405       signaling_thread()->PostTask(
406           ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
407             if (self) {
408               RTC_DCHECK_RUN_ON(self->signaling_thread());
409               self->sctp_data_channels_to_free_.clear();
410             }
411           }));
412       return;
413     }
414   }
415 }
416 
OnTransportChannelClosed()417 void DataChannelController::OnTransportChannelClosed() {
418   RTC_DCHECK_RUN_ON(signaling_thread());
419   // Use a temporary copy of the RTP/SCTP DataChannel list because the
420   // DataChannel may callback to us and try to modify the list.
421   std::map<std::string, rtc::scoped_refptr<RtpDataChannel>> temp_rtp_dcs;
422   temp_rtp_dcs.swap(rtp_data_channels_);
423   for (const auto& kv : temp_rtp_dcs) {
424     kv.second->OnTransportChannelClosed();
425   }
426 
427   std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
428   temp_sctp_dcs.swap(sctp_data_channels_);
429   for (const auto& channel : temp_sctp_dcs) {
430     channel->OnTransportChannelClosed();
431   }
432 }
433 
FindDataChannelBySid(int sid) const434 SctpDataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
435   RTC_DCHECK_RUN_ON(signaling_thread());
436   for (const auto& channel : sctp_data_channels_) {
437     if (channel->id() == sid) {
438       return channel;
439     }
440   }
441   return nullptr;
442 }
443 
UpdateLocalRtpDataChannels(const cricket::StreamParamsVec & streams)444 void DataChannelController::UpdateLocalRtpDataChannels(
445     const cricket::StreamParamsVec& streams) {
446   std::vector<std::string> existing_channels;
447 
448   RTC_DCHECK_RUN_ON(signaling_thread());
449   // Find new and active data channels.
450   for (const cricket::StreamParams& params : streams) {
451     // |it->sync_label| is actually the data channel label. The reason is that
452     // we use the same naming of data channels as we do for
453     // MediaStreams and Tracks.
454     // For MediaStreams, the sync_label is the MediaStream label and the
455     // track label is the same as |streamid|.
456     const std::string& channel_label = params.first_stream_id();
457     auto data_channel_it = rtp_data_channels()->find(channel_label);
458     if (data_channel_it == rtp_data_channels()->end()) {
459       RTC_LOG(LS_ERROR) << "channel label not found";
460       continue;
461     }
462     // Set the SSRC the data channel should use for sending.
463     data_channel_it->second->SetSendSsrc(params.first_ssrc());
464     existing_channels.push_back(data_channel_it->first);
465   }
466 
467   UpdateClosingRtpDataChannels(existing_channels, true);
468 }
469 
UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec & streams)470 void DataChannelController::UpdateRemoteRtpDataChannels(
471     const cricket::StreamParamsVec& streams) {
472   RTC_DCHECK_RUN_ON(signaling_thread());
473 
474   std::vector<std::string> existing_channels;
475 
476   // Find new and active data channels.
477   for (const cricket::StreamParams& params : streams) {
478     // The data channel label is either the mslabel or the SSRC if the mslabel
479     // does not exist. Ex a=ssrc:444330170 mslabel:test1.
480     std::string label = params.first_stream_id().empty()
481                             ? rtc::ToString(params.first_ssrc())
482                             : params.first_stream_id();
483     auto data_channel_it = rtp_data_channels()->find(label);
484     if (data_channel_it == rtp_data_channels()->end()) {
485       // This is a new data channel.
486       CreateRemoteRtpDataChannel(label, params.first_ssrc());
487     } else {
488       data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
489     }
490     existing_channels.push_back(label);
491   }
492 
493   UpdateClosingRtpDataChannels(existing_channels, false);
494 }
495 
data_channel_type() const496 cricket::DataChannelType DataChannelController::data_channel_type() const {
497   // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
498   // RTC_DCHECK_RUN_ON(signaling_thread());
499   return data_channel_type_;
500 }
501 
set_data_channel_type(cricket::DataChannelType type)502 void DataChannelController::set_data_channel_type(
503     cricket::DataChannelType type) {
504   RTC_DCHECK_RUN_ON(signaling_thread());
505   data_channel_type_ = type;
506 }
507 
rtp_data_channel() const508 cricket::RtpDataChannel* DataChannelController::rtp_data_channel() const {
509   // TODO(bugs.webrtc.org/9987): Only allow this accessor to be called on the
510   // network thread.
511   // RTC_DCHECK_RUN_ON(network_thread());
512   return rtp_data_channel_;
513 }
514 
set_rtp_data_channel(cricket::RtpDataChannel * channel)515 void DataChannelController::set_rtp_data_channel(
516     cricket::RtpDataChannel* channel) {
517   RTC_DCHECK_RUN_ON(network_thread());
518   rtp_data_channel_ = channel;
519 }
520 
data_channel_transport() const521 DataChannelTransportInterface* DataChannelController::data_channel_transport()
522     const {
523   // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
524   // network thread.
525   // RTC_DCHECK_RUN_ON(network_thread());
526   return data_channel_transport_;
527 }
528 
set_data_channel_transport(DataChannelTransportInterface * transport)529 void DataChannelController::set_data_channel_transport(
530     DataChannelTransportInterface* transport) {
531   RTC_DCHECK_RUN_ON(network_thread());
532   data_channel_transport_ = transport;
533 }
534 
535 const std::map<std::string, rtc::scoped_refptr<RtpDataChannel>>*
rtp_data_channels() const536 DataChannelController::rtp_data_channels() const {
537   RTC_DCHECK_RUN_ON(signaling_thread());
538   return &rtp_data_channels_;
539 }
540 
UpdateClosingRtpDataChannels(const std::vector<std::string> & active_channels,bool is_local_update)541 void DataChannelController::UpdateClosingRtpDataChannels(
542     const std::vector<std::string>& active_channels,
543     bool is_local_update) {
544   auto it = rtp_data_channels_.begin();
545   while (it != rtp_data_channels_.end()) {
546     RtpDataChannel* data_channel = it->second;
547     if (absl::c_linear_search(active_channels, data_channel->label())) {
548       ++it;
549       continue;
550     }
551 
552     if (is_local_update) {
553       data_channel->SetSendSsrc(0);
554     } else {
555       data_channel->RemotePeerRequestClose();
556     }
557 
558     if (data_channel->state() == RtpDataChannel::kClosed) {
559       rtp_data_channels_.erase(it);
560       it = rtp_data_channels_.begin();
561     } else {
562       ++it;
563     }
564   }
565 }
566 
CreateRemoteRtpDataChannel(const std::string & label,uint32_t remote_ssrc)567 void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
568                                                        uint32_t remote_ssrc) {
569   if (data_channel_type() != cricket::DCT_RTP) {
570     return;
571   }
572   rtc::scoped_refptr<RtpDataChannel> channel(
573       InternalCreateRtpDataChannel(label, nullptr));
574   if (!channel.get()) {
575     RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
576                            "CreateDataChannel failed.";
577     return;
578   }
579   channel->SetReceiveSsrc(remote_ssrc);
580   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
581       RtpDataChannel::CreateProxy(std::move(channel));
582   pc_->Observer()->OnDataChannel(std::move(proxy_channel));
583 }
584 
DataChannelSendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)585 bool DataChannelController::DataChannelSendData(
586     const cricket::SendDataParams& params,
587     const rtc::CopyOnWriteBuffer& payload,
588     cricket::SendDataResult* result) {
589   // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
590   // thread instead. Remove the Invoke() below and move assocated state to
591   // the network thread.
592   RTC_DCHECK_RUN_ON(signaling_thread());
593   RTC_DCHECK(data_channel_transport());
594 
595   SendDataParams send_params;
596   send_params.type = ToWebrtcDataMessageType(params.type);
597   send_params.ordered = params.ordered;
598   if (params.max_rtx_count >= 0) {
599     send_params.max_rtx_count = params.max_rtx_count;
600   } else if (params.max_rtx_ms >= 0) {
601     send_params.max_rtx_ms = params.max_rtx_ms;
602   }
603 
604   RTCError error = network_thread()->Invoke<RTCError>(
605       RTC_FROM_HERE, [this, params, send_params, payload] {
606         return data_channel_transport()->SendData(params.sid, send_params,
607                                                   payload);
608       });
609 
610   if (error.ok()) {
611     *result = cricket::SendDataResult::SDR_SUCCESS;
612     return true;
613   } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
614     // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
615     // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
616     *result = cricket::SendDataResult::SDR_BLOCK;
617     return false;
618   }
619   *result = cricket::SendDataResult::SDR_ERROR;
620   return false;
621 }
622 
NotifyDataChannelsOfTransportCreated()623 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
624   RTC_DCHECK_RUN_ON(network_thread());
625   signaling_thread()->PostTask(
626       ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
627         if (self) {
628           RTC_DCHECK_RUN_ON(self->signaling_thread());
629           for (const auto& channel : self->sctp_data_channels_) {
630             channel->OnTransportChannelCreated();
631           }
632         }
633       }));
634 }
635 
network_thread() const636 rtc::Thread* DataChannelController::network_thread() const {
637   return pc_->network_thread();
638 }
signaling_thread() const639 rtc::Thread* DataChannelController::signaling_thread() const {
640   return pc_->signaling_thread();
641 }
642 
643 }  // namespace webrtc
644