1 #include "SctpDataChannelProviderInterfaceImpl.h"
2
3 #include "p2p/base/dtls_transport.h"
4
5 namespace tgcalls {
6
SctpDataChannelProviderInterfaceImpl(cricket::DtlsTransport * transportChannel,bool isOutgoing,std::function<void (bool)> onStateChanged,std::function<void ()> onTerminated,std::function<void (std::string const &)> onMessageReceived,std::shared_ptr<Threads> threads)7 SctpDataChannelProviderInterfaceImpl::SctpDataChannelProviderInterfaceImpl(
8 cricket::DtlsTransport *transportChannel,
9 bool isOutgoing,
10 std::function<void(bool)> onStateChanged,
11 std::function<void()> onTerminated,
12 std::function<void(std::string const &)> onMessageReceived,
13 std::shared_ptr<Threads> threads
14 ) :
15 _threads(std::move(threads)),
16 _onStateChanged(onStateChanged),
17 _onTerminated(onTerminated),
18 _onMessageReceived(onMessageReceived) {
19 assert(_threads->getNetworkThread()->IsCurrent());
20
21 _sctpTransportFactory.reset(new cricket::SctpTransportFactory(_threads->getNetworkThread()));
22
23 _sctpTransport = _sctpTransportFactory->CreateSctpTransport(transportChannel);
24 _sctpTransport->SignalReadyToSendData.connect(this, &SctpDataChannelProviderInterfaceImpl::sctpReadyToSendData);
25 _sctpTransport->SignalDataReceived.connect(this, &SctpDataChannelProviderInterfaceImpl::sctpDataReceived);
26 _sctpTransport->SignalClosedAbruptly.connect(this, &SctpDataChannelProviderInterfaceImpl::sctpClosedAbruptly);
27
28 webrtc::InternalDataChannelInit dataChannelInit;
29 dataChannelInit.id = 0;
30 dataChannelInit.open_handshake_role = isOutgoing ? webrtc::InternalDataChannelInit::kOpener : webrtc::InternalDataChannelInit::kAcker;
31 _dataChannel = webrtc::SctpDataChannel::Create(
32 this,
33 "data",
34 dataChannelInit,
35 _threads->getNetworkThread(),
36 _threads->getNetworkThread()
37 );
38
39 _dataChannel->RegisterObserver(this);
40 }
41
42
~SctpDataChannelProviderInterfaceImpl()43 SctpDataChannelProviderInterfaceImpl::~SctpDataChannelProviderInterfaceImpl() {
44 assert(_threads->getNetworkThread()->IsCurrent());
45
46 _dataChannel->UnregisterObserver();
47 _dataChannel->Close();
48 _dataChannel = nullptr;
49
50 _sctpTransport = nullptr;
51 _sctpTransportFactory.reset();
52 }
53
sendDataChannelMessage(std::string const & message)54 void SctpDataChannelProviderInterfaceImpl::sendDataChannelMessage(std::string const &message) {
55 assert(_threads->getNetworkThread()->IsCurrent());
56
57 if (_isDataChannelOpen) {
58 RTC_LOG(LS_INFO) << "Outgoing DataChannel message: " << message;
59
60 webrtc::DataBuffer buffer(message);
61 _dataChannel->Send(buffer);
62 } else {
63 RTC_LOG(LS_INFO) << "Could not send an outgoing DataChannel message: the channel is not open";
64 }
65 }
66
OnStateChange()67 void SctpDataChannelProviderInterfaceImpl::OnStateChange() {
68 assert(_threads->getNetworkThread()->IsCurrent());
69
70 auto state = _dataChannel->state();
71 bool isDataChannelOpen = state == webrtc::DataChannelInterface::DataState::kOpen;
72 if (_isDataChannelOpen != isDataChannelOpen) {
73 _isDataChannelOpen = isDataChannelOpen;
74 _onStateChanged(_isDataChannelOpen);
75 }
76 }
77
OnMessage(const webrtc::DataBuffer & buffer)78 void SctpDataChannelProviderInterfaceImpl::OnMessage(const webrtc::DataBuffer& buffer) {
79 assert(_threads->getNetworkThread()->IsCurrent());
80
81 if (!buffer.binary) {
82 std::string messageText(buffer.data.data(), buffer.data.data() + buffer.data.size());
83 RTC_LOG(LS_INFO) << "Incoming DataChannel message: " << messageText;
84
85 _onMessageReceived(messageText);
86 }
87 }
88
updateIsConnected(bool isConnected)89 void SctpDataChannelProviderInterfaceImpl::updateIsConnected(bool isConnected) {
90 assert(_threads->getNetworkThread()->IsCurrent());
91
92 if (isConnected) {
93 if (!_isSctpTransportStarted) {
94 _isSctpTransportStarted = true;
95 _sctpTransport->Start(5000, 5000, 262144);
96 }
97 }
98 }
99
sctpReadyToSendData()100 void SctpDataChannelProviderInterfaceImpl::sctpReadyToSendData() {
101 assert(_threads->getNetworkThread()->IsCurrent());
102
103 _dataChannel->OnTransportReady(true);
104 }
105
sctpClosedAbruptly()106 void SctpDataChannelProviderInterfaceImpl::sctpClosedAbruptly() {
107 assert(_threads->getNetworkThread()->IsCurrent());
108
109 if (_onTerminated) {
110 _onTerminated();
111 }
112 }
113
sctpDataReceived(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)114 void SctpDataChannelProviderInterfaceImpl::sctpDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) {
115 assert(_threads->getNetworkThread()->IsCurrent());
116
117 _dataChannel->OnDataReceived(params, buffer);
118 }
119
SendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)120 bool SctpDataChannelProviderInterfaceImpl::SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) {
121 assert(_threads->getNetworkThread()->IsCurrent());
122
123 return _sctpTransport->SendData(params, payload);
124 }
125
ConnectDataChannel(webrtc::SctpDataChannel * data_channel)126 bool SctpDataChannelProviderInterfaceImpl::ConnectDataChannel(webrtc::SctpDataChannel *data_channel) {
127 assert(_threads->getNetworkThread()->IsCurrent());
128
129 return true;
130 }
131
DisconnectDataChannel(webrtc::SctpDataChannel * data_channel)132 void SctpDataChannelProviderInterfaceImpl::DisconnectDataChannel(webrtc::SctpDataChannel* data_channel) {
133 assert(_threads->getNetworkThread()->IsCurrent());
134
135 return;
136 }
137
AddSctpDataStream(int sid)138 void SctpDataChannelProviderInterfaceImpl::AddSctpDataStream(int sid) {
139 assert(_threads->getNetworkThread()->IsCurrent());
140
141 _sctpTransport->OpenStream(sid);
142 }
143
RemoveSctpDataStream(int sid)144 void SctpDataChannelProviderInterfaceImpl::RemoveSctpDataStream(int sid) {
145 assert(_threads->getNetworkThread()->IsCurrent());
146
147 _threads->getNetworkThread()->Invoke<void>(RTC_FROM_HERE, [this, sid]() {
148 _sctpTransport->ResetStream(sid);
149 });
150 }
151
ReadyToSendData() const152 bool SctpDataChannelProviderInterfaceImpl::ReadyToSendData() const {
153 assert(_threads->getNetworkThread()->IsCurrent());
154
155 return _sctpTransport->ReadyToSendData();
156 }
157
158 }
159