1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "core/internal/mediums/webrtc/connection_flow.h"
16
17 #include <iterator>
18 #include <memory>
19
20 #include "core/internal/mediums/webrtc/session_description_wrapper.h"
21 #include "platform/public/logging.h"
22 #include "platform/public/mutex_lock.h"
23 #include "platform/public/webrtc.h"
24 #include "absl/memory/memory.h"
25 #include "absl/time/time.h"
26 #include "webrtc/api/data_channel_interface.h"
27 #include "webrtc/api/jsep.h"
28
29 namespace location {
30 namespace nearby {
31 namespace connections {
32 namespace mediums {
33
34 constexpr absl::Duration ConnectionFlow::kTimeout;
35
36 namespace {
37 // This is the same as the nearby data channel name.
38 const char kDataChannelName[] = "dataChannel";
39
40 class CreateSessionDescriptionObserverImpl
41 : public webrtc::CreateSessionDescriptionObserver {
42 public:
CreateSessionDescriptionObserverImpl(Future<SessionDescriptionWrapper> * settable_future)43 explicit CreateSessionDescriptionObserverImpl(
44 Future<SessionDescriptionWrapper>* settable_future)
45 : settable_future_(settable_future) {}
46 ~CreateSessionDescriptionObserverImpl() override = default;
47
48 // webrtc::CreateSessionDescriptionObserver
OnSuccess(webrtc::SessionDescriptionInterface * desc)49 void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
50 settable_future_->Set(SessionDescriptionWrapper{desc});
51 }
52
OnFailure(webrtc::RTCError error)53 void OnFailure(webrtc::RTCError error) override {
54 NEARBY_LOG(ERROR, "Error when creating session description: %s",
55 error.message());
56 settable_future_->SetException({Exception::kFailed});
57 }
58
59 private:
60 std::unique_ptr<Future<SessionDescriptionWrapper>> settable_future_;
61 };
62
63 class SetSessionDescriptionObserverImpl
64 : public webrtc::SetSessionDescriptionObserver {
65 public:
SetSessionDescriptionObserverImpl(Future<bool> * settable_future)66 explicit SetSessionDescriptionObserverImpl(Future<bool>* settable_future)
67 : settable_future_(settable_future) {}
68
OnSuccess()69 void OnSuccess() override { settable_future_->Set(true); }
70
OnFailure(webrtc::RTCError error)71 void OnFailure(webrtc::RTCError error) override {
72 NEARBY_LOG(ERROR, "Error when setting session description: %s",
73 error.message());
74 settable_future_->SetException({Exception::kFailed});
75 }
76
77 private:
78 std::unique_ptr<Future<bool>> settable_future_;
79 };
80
81 using PeerConnectionState =
82 webrtc::PeerConnectionInterface::PeerConnectionState;
83
84 } // namespace
85
Create(LocalIceCandidateListener local_ice_candidate_listener,DataChannelListener data_channel_listener,WebRtcMedium & webrtc_medium)86 std::unique_ptr<ConnectionFlow> ConnectionFlow::Create(
87 LocalIceCandidateListener local_ice_candidate_listener,
88 DataChannelListener data_channel_listener, WebRtcMedium& webrtc_medium) {
89 auto connection_flow = absl::WrapUnique(
90 new ConnectionFlow(std::move(local_ice_candidate_listener),
91 std::move(data_channel_listener)));
92 if (connection_flow->InitPeerConnection(webrtc_medium)) {
93 return connection_flow;
94 }
95
96 return nullptr;
97 }
98
ConnectionFlow(LocalIceCandidateListener local_ice_candidate_listener,DataChannelListener data_channel_listener)99 ConnectionFlow::ConnectionFlow(
100 LocalIceCandidateListener local_ice_candidate_listener,
101 DataChannelListener data_channel_listener)
102 : data_channel_listener_(std::move(data_channel_listener)),
103 peer_connection_observer_(this, std::move(local_ice_candidate_listener)) {
104 }
105
~ConnectionFlow()106 ConnectionFlow::~ConnectionFlow() { Close(); }
107
CreateOffer()108 SessionDescriptionWrapper ConnectionFlow::CreateOffer() {
109 MutexLock lock(&mutex_);
110
111 if (!TransitionState(State::kInitialized, State::kCreatingOffer)) {
112 return SessionDescriptionWrapper();
113 }
114
115 webrtc::DataChannelInit data_channel_init;
116 data_channel_init.reliable = true;
117 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel =
118 peer_connection_->CreateDataChannel(kDataChannelName, &data_channel_init);
119 data_channel->RegisterObserver(CreateDataChannelObserver(data_channel));
120
121 auto success_future = new Future<SessionDescriptionWrapper>();
122 webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
123 rtc::scoped_refptr<CreateSessionDescriptionObserverImpl> observer =
124 new rtc::RefCountedObject<CreateSessionDescriptionObserverImpl>(
125 success_future);
126 peer_connection_->CreateOffer(observer, options);
127
128 ExceptionOr<SessionDescriptionWrapper> result = success_future->Get(kTimeout);
129 if (result.ok() &&
130 TransitionState(State::kCreatingOffer, State::kWaitingForAnswer)) {
131 return std::move(result.result());
132 }
133
134 return SessionDescriptionWrapper();
135 }
136
CreateAnswer()137 SessionDescriptionWrapper ConnectionFlow::CreateAnswer() {
138 MutexLock lock(&mutex_);
139
140 if (!TransitionState(State::kReceivedOffer, State::kCreatingAnswer)) {
141 return SessionDescriptionWrapper();
142 }
143
144 auto success_future = new Future<SessionDescriptionWrapper>();
145 webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
146 rtc::scoped_refptr<CreateSessionDescriptionObserverImpl> observer =
147 new rtc::RefCountedObject<CreateSessionDescriptionObserverImpl>(
148 success_future);
149 peer_connection_->CreateAnswer(observer, options);
150
151 ExceptionOr<SessionDescriptionWrapper> result = success_future->Get(kTimeout);
152 if (result.ok() &&
153 TransitionState(State::kCreatingAnswer, State::kWaitingToConnect)) {
154 return std::move(result.result());
155 }
156
157 return SessionDescriptionWrapper();
158 }
159
SetLocalSessionDescription(SessionDescriptionWrapper sdp)160 bool ConnectionFlow::SetLocalSessionDescription(SessionDescriptionWrapper sdp) {
161 MutexLock lock(&mutex_);
162
163 if (!sdp.IsValid()) return false;
164
165 auto success_future = new Future<bool>();
166 rtc::scoped_refptr<SetSessionDescriptionObserverImpl> observer =
167 new rtc::RefCountedObject<SetSessionDescriptionObserverImpl>(
168 success_future);
169
170 peer_connection_->SetLocalDescription(observer, sdp.Release());
171
172 ExceptionOr<bool> result = success_future->Get(kTimeout);
173 return result.ok() && result.result();
174 }
175
SetRemoteSessionDescription(SessionDescriptionWrapper sdp)176 bool ConnectionFlow::SetRemoteSessionDescription(
177 SessionDescriptionWrapper sdp) {
178 if (!sdp.IsValid()) return false;
179
180 auto success_future = new Future<bool>();
181 rtc::scoped_refptr<SetSessionDescriptionObserverImpl> observer =
182 new rtc::RefCountedObject<SetSessionDescriptionObserverImpl>(
183 success_future);
184
185 peer_connection_->SetRemoteDescription(observer, sdp.Release());
186
187 ExceptionOr<bool> result = success_future->Get(kTimeout);
188 return result.ok() && result.result();
189 }
190
OnOfferReceived(SessionDescriptionWrapper offer)191 bool ConnectionFlow::OnOfferReceived(SessionDescriptionWrapper offer) {
192 MutexLock lock(&mutex_);
193
194 if (!TransitionState(State::kInitialized, State::kReceivedOffer)) {
195 return false;
196 }
197 return SetRemoteSessionDescription(std::move(offer));
198 }
199
OnAnswerReceived(SessionDescriptionWrapper answer)200 bool ConnectionFlow::OnAnswerReceived(SessionDescriptionWrapper answer) {
201 MutexLock lock(&mutex_);
202
203 if (!TransitionState(State::kWaitingForAnswer, State::kWaitingToConnect)) {
204 return false;
205 }
206 return SetRemoteSessionDescription(std::move(answer));
207 }
208
OnRemoteIceCandidatesReceived(std::vector<std::unique_ptr<webrtc::IceCandidateInterface>> ice_candidates)209 bool ConnectionFlow::OnRemoteIceCandidatesReceived(
210 std::vector<std::unique_ptr<webrtc::IceCandidateInterface>>
211 ice_candidates) {
212 MutexLock lock(&mutex_);
213
214 if (state_ == State::kEnded) {
215 NEARBY_LOG(WARNING,
216 "You cannot add ice candidates to a disconnected session.");
217 return false;
218 }
219
220 if (state_ != State::kWaitingToConnect && state_ != State::kConnected) {
221 cached_remote_ice_candidates_.insert(
222 cached_remote_ice_candidates_.end(),
223 std::make_move_iterator(ice_candidates.begin()),
224 std::make_move_iterator(ice_candidates.end()));
225 return true;
226 }
227
228 for (auto&& ice_candidate : ice_candidates) {
229 if (!peer_connection_->AddIceCandidate(ice_candidate.get())) {
230 NEARBY_LOG(WARNING, "Unable to add remote ice candidate.");
231 }
232 }
233 return true;
234 }
235
236 Future<rtc::scoped_refptr<webrtc::DataChannelInterface>>
GetDataChannel()237 ConnectionFlow::GetDataChannel() {
238 return data_channel_future_;
239 }
240
Close()241 bool ConnectionFlow::Close() {
242 MutexLock lock(&mutex_);
243 return CloseLocked();
244 }
245
InitPeerConnection(WebRtcMedium & webrtc_medium)246 bool ConnectionFlow::InitPeerConnection(WebRtcMedium& webrtc_medium) {
247 Future<bool> success_future;
248 // CreatePeerConnection callback may be invoked after ConnectionFlow lifetime
249 // has ended, in case of a timeout. Future is captured by value, and is safe
250 // to access, but it is not safe to access ConnectionFlow member variables
251 // unless the Future::Set() returns true.
252 webrtc_medium.CreatePeerConnection(
253 &peer_connection_observer_,
254 [this, success_future](rtc::scoped_refptr<webrtc::PeerConnectionInterface>
255 peer_connection) mutable {
256 if (!peer_connection) {
257 success_future.Set(false);
258 return;
259 }
260
261 // If this fails, means we have already assigned something to
262 // success_future; it is either:
263 // 1) this is the 2nd call of this callback (and this is a bug), or
264 // 2) Get(timeout) has set the future value as exception already.
265 if (success_future.IsSet()) return;
266 peer_connection_ = peer_connection;
267 success_future.Set(true);
268 });
269
270 ExceptionOr<bool> result = success_future.Get(kTimeout);
271 return result.ok() && result.result();
272 }
273
OnSignalingStable()274 void ConnectionFlow::OnSignalingStable() {
275 MutexLock lock(&mutex_);
276
277 if (state_ != State::kWaitingToConnect && state_ != State::kConnected) return;
278
279 for (auto&& ice_candidate : cached_remote_ice_candidates_) {
280 if (!peer_connection_->AddIceCandidate(ice_candidate.get())) {
281 NEARBY_LOG(WARNING, "Unable to add remote ice candidate.");
282 }
283 }
284 cached_remote_ice_candidates_.clear();
285 }
286
ProcessOnPeerConnectionChange(webrtc::PeerConnectionInterface::PeerConnectionState new_state)287 void ConnectionFlow::ProcessOnPeerConnectionChange(
288 webrtc::PeerConnectionInterface::PeerConnectionState new_state) {
289 if (new_state == PeerConnectionState::kClosed ||
290 new_state == PeerConnectionState::kFailed ||
291 new_state == PeerConnectionState::kDisconnected) {
292 MutexLock lock(&mutex_);
293 CloseAndNotifyLocked();
294 }
295 }
296
ProcessDataChannelConnected()297 void ConnectionFlow::ProcessDataChannelConnected() {
298 MutexLock lock(&mutex_);
299 NEARBY_LOG(INFO, "Data channel state changed to connected.");
300 if (!TransitionState(State::kWaitingToConnect, State::kConnected))
301 CloseAndNotifyLocked();
302 }
303
CreateDataChannelObserver(rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel)304 webrtc::DataChannelObserver* ConnectionFlow::CreateDataChannelObserver(
305 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) {
306 if (!data_channel_observer_) {
307 auto state_change_callback = [this,
308 data_channel{std::move(data_channel)}]() {
309 if (data_channel->state() ==
310 webrtc::DataChannelInterface::DataState::kOpen) {
311 data_channel_future_.Set(std::move(data_channel));
312 OffloadFromSignalingThread([this]() { ProcessDataChannelConnected(); });
313 } else if (data_channel->state() ==
314 webrtc::DataChannelInterface::DataState::kClosed) {
315 data_channel->UnregisterObserver();
316 OffloadFromSignalingThread([this]() {
317 MutexLock lock(&mutex_);
318 CloseAndNotifyLocked();
319 });
320 }
321 };
322 data_channel_observer_ = absl::make_unique<DataChannelObserverImpl>(
323 &data_channel_listener_, std::move(state_change_callback));
324 }
325
326 return reinterpret_cast<webrtc::DataChannelObserver*>(
327 data_channel_observer_.get());
328 }
329
TransitionState(State current_state,State new_state)330 bool ConnectionFlow::TransitionState(State current_state, State new_state) {
331 if (current_state != state_) {
332 NEARBY_LOG(
333 WARNING,
334 "Invalid state transition to %d: current state is %d but expected %d.",
335 new_state, state_, current_state);
336 return false;
337 }
338 state_ = new_state;
339 return true;
340 }
341
CloseAndNotifyLocked()342 void ConnectionFlow::CloseAndNotifyLocked() {
343 if (CloseLocked()) {
344 data_channel_listener_.data_channel_closed_cb();
345 }
346 }
347
CloseLocked()348 bool ConnectionFlow::CloseLocked() {
349 if (state_ == State::kEnded) {
350 return false;
351 }
352 state_ = State::kEnded;
353
354 data_channel_future_.SetException({Exception::kInterrupted});
355 if (peer_connection_) peer_connection_->Close();
356
357 data_channel_observer_.reset();
358 NEARBY_LOG(INFO, "Closed WebRTC connection.");
359 return true;
360 }
361
OffloadFromSignalingThread(Runnable runnable)362 void ConnectionFlow::OffloadFromSignalingThread(Runnable runnable) {
363 single_threaded_signaling_offloader_.Execute(std::move(runnable));
364 }
365
366 } // namespace mediums
367 } // namespace connections
368 } // namespace nearby
369 } // namespace location
370