1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "core/internal/mediums/webrtc/connection_flow.h"
16 
17 #include <iterator>
18 #include <memory>
19 
20 #include "core/internal/mediums/webrtc/session_description_wrapper.h"
21 #include "platform/public/logging.h"
22 #include "platform/public/mutex_lock.h"
23 #include "platform/public/webrtc.h"
24 #include "absl/memory/memory.h"
25 #include "absl/time/time.h"
26 #include "webrtc/api/data_channel_interface.h"
27 #include "webrtc/api/jsep.h"
28 
29 namespace location {
30 namespace nearby {
31 namespace connections {
32 namespace mediums {
33 
34 constexpr absl::Duration ConnectionFlow::kTimeout;
35 
36 namespace {
37 // This is the same as the nearby data channel name.
38 const char kDataChannelName[] = "dataChannel";
39 
40 class CreateSessionDescriptionObserverImpl
41     : public webrtc::CreateSessionDescriptionObserver {
42  public:
CreateSessionDescriptionObserverImpl(Future<SessionDescriptionWrapper> * settable_future)43   explicit CreateSessionDescriptionObserverImpl(
44       Future<SessionDescriptionWrapper>* settable_future)
45       : settable_future_(settable_future) {}
46   ~CreateSessionDescriptionObserverImpl() override = default;
47 
48   // webrtc::CreateSessionDescriptionObserver
OnSuccess(webrtc::SessionDescriptionInterface * desc)49   void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
50     settable_future_->Set(SessionDescriptionWrapper{desc});
51   }
52 
OnFailure(webrtc::RTCError error)53   void OnFailure(webrtc::RTCError error) override {
54     NEARBY_LOG(ERROR, "Error when creating session description: %s",
55                error.message());
56     settable_future_->SetException({Exception::kFailed});
57   }
58 
59  private:
60   std::unique_ptr<Future<SessionDescriptionWrapper>> settable_future_;
61 };
62 
63 class SetSessionDescriptionObserverImpl
64     : public webrtc::SetSessionDescriptionObserver {
65  public:
SetSessionDescriptionObserverImpl(Future<bool> * settable_future)66   explicit SetSessionDescriptionObserverImpl(Future<bool>* settable_future)
67       : settable_future_(settable_future) {}
68 
OnSuccess()69   void OnSuccess() override { settable_future_->Set(true); }
70 
OnFailure(webrtc::RTCError error)71   void OnFailure(webrtc::RTCError error) override {
72     NEARBY_LOG(ERROR, "Error when setting session description: %s",
73                error.message());
74     settable_future_->SetException({Exception::kFailed});
75   }
76 
77  private:
78   std::unique_ptr<Future<bool>> settable_future_;
79 };
80 
81 using PeerConnectionState =
82     webrtc::PeerConnectionInterface::PeerConnectionState;
83 
84 }  // namespace
85 
Create(LocalIceCandidateListener local_ice_candidate_listener,DataChannelListener data_channel_listener,WebRtcMedium & webrtc_medium)86 std::unique_ptr<ConnectionFlow> ConnectionFlow::Create(
87     LocalIceCandidateListener local_ice_candidate_listener,
88     DataChannelListener data_channel_listener, WebRtcMedium& webrtc_medium) {
89   auto connection_flow = absl::WrapUnique(
90       new ConnectionFlow(std::move(local_ice_candidate_listener),
91                          std::move(data_channel_listener)));
92   if (connection_flow->InitPeerConnection(webrtc_medium)) {
93     return connection_flow;
94   }
95 
96   return nullptr;
97 }
98 
ConnectionFlow(LocalIceCandidateListener local_ice_candidate_listener,DataChannelListener data_channel_listener)99 ConnectionFlow::ConnectionFlow(
100     LocalIceCandidateListener local_ice_candidate_listener,
101     DataChannelListener data_channel_listener)
102     : data_channel_listener_(std::move(data_channel_listener)),
103       peer_connection_observer_(this, std::move(local_ice_candidate_listener)) {
104 }
105 
~ConnectionFlow()106 ConnectionFlow::~ConnectionFlow() { Close(); }
107 
CreateOffer()108 SessionDescriptionWrapper ConnectionFlow::CreateOffer() {
109   MutexLock lock(&mutex_);
110 
111   if (!TransitionState(State::kInitialized, State::kCreatingOffer)) {
112     return SessionDescriptionWrapper();
113   }
114 
115   webrtc::DataChannelInit data_channel_init;
116   data_channel_init.reliable = true;
117   rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel =
118       peer_connection_->CreateDataChannel(kDataChannelName, &data_channel_init);
119   data_channel->RegisterObserver(CreateDataChannelObserver(data_channel));
120 
121   auto success_future = new Future<SessionDescriptionWrapper>();
122   webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
123   rtc::scoped_refptr<CreateSessionDescriptionObserverImpl> observer =
124       new rtc::RefCountedObject<CreateSessionDescriptionObserverImpl>(
125           success_future);
126   peer_connection_->CreateOffer(observer, options);
127 
128   ExceptionOr<SessionDescriptionWrapper> result = success_future->Get(kTimeout);
129   if (result.ok() &&
130       TransitionState(State::kCreatingOffer, State::kWaitingForAnswer)) {
131     return std::move(result.result());
132   }
133 
134   return SessionDescriptionWrapper();
135 }
136 
CreateAnswer()137 SessionDescriptionWrapper ConnectionFlow::CreateAnswer() {
138   MutexLock lock(&mutex_);
139 
140   if (!TransitionState(State::kReceivedOffer, State::kCreatingAnswer)) {
141     return SessionDescriptionWrapper();
142   }
143 
144   auto success_future = new Future<SessionDescriptionWrapper>();
145   webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
146   rtc::scoped_refptr<CreateSessionDescriptionObserverImpl> observer =
147       new rtc::RefCountedObject<CreateSessionDescriptionObserverImpl>(
148           success_future);
149   peer_connection_->CreateAnswer(observer, options);
150 
151   ExceptionOr<SessionDescriptionWrapper> result = success_future->Get(kTimeout);
152   if (result.ok() &&
153       TransitionState(State::kCreatingAnswer, State::kWaitingToConnect)) {
154     return std::move(result.result());
155   }
156 
157   return SessionDescriptionWrapper();
158 }
159 
SetLocalSessionDescription(SessionDescriptionWrapper sdp)160 bool ConnectionFlow::SetLocalSessionDescription(SessionDescriptionWrapper sdp) {
161   MutexLock lock(&mutex_);
162 
163   if (!sdp.IsValid()) return false;
164 
165   auto success_future = new Future<bool>();
166   rtc::scoped_refptr<SetSessionDescriptionObserverImpl> observer =
167       new rtc::RefCountedObject<SetSessionDescriptionObserverImpl>(
168           success_future);
169 
170   peer_connection_->SetLocalDescription(observer, sdp.Release());
171 
172   ExceptionOr<bool> result = success_future->Get(kTimeout);
173   return result.ok() && result.result();
174 }
175 
SetRemoteSessionDescription(SessionDescriptionWrapper sdp)176 bool ConnectionFlow::SetRemoteSessionDescription(
177     SessionDescriptionWrapper sdp) {
178   if (!sdp.IsValid()) return false;
179 
180   auto success_future = new Future<bool>();
181   rtc::scoped_refptr<SetSessionDescriptionObserverImpl> observer =
182       new rtc::RefCountedObject<SetSessionDescriptionObserverImpl>(
183           success_future);
184 
185   peer_connection_->SetRemoteDescription(observer, sdp.Release());
186 
187   ExceptionOr<bool> result = success_future->Get(kTimeout);
188   return result.ok() && result.result();
189 }
190 
OnOfferReceived(SessionDescriptionWrapper offer)191 bool ConnectionFlow::OnOfferReceived(SessionDescriptionWrapper offer) {
192   MutexLock lock(&mutex_);
193 
194   if (!TransitionState(State::kInitialized, State::kReceivedOffer)) {
195     return false;
196   }
197   return SetRemoteSessionDescription(std::move(offer));
198 }
199 
OnAnswerReceived(SessionDescriptionWrapper answer)200 bool ConnectionFlow::OnAnswerReceived(SessionDescriptionWrapper answer) {
201   MutexLock lock(&mutex_);
202 
203   if (!TransitionState(State::kWaitingForAnswer, State::kWaitingToConnect)) {
204     return false;
205   }
206   return SetRemoteSessionDescription(std::move(answer));
207 }
208 
OnRemoteIceCandidatesReceived(std::vector<std::unique_ptr<webrtc::IceCandidateInterface>> ice_candidates)209 bool ConnectionFlow::OnRemoteIceCandidatesReceived(
210     std::vector<std::unique_ptr<webrtc::IceCandidateInterface>>
211         ice_candidates) {
212   MutexLock lock(&mutex_);
213 
214   if (state_ == State::kEnded) {
215     NEARBY_LOG(WARNING,
216                "You cannot add ice candidates to a disconnected session.");
217     return false;
218   }
219 
220   if (state_ != State::kWaitingToConnect && state_ != State::kConnected) {
221     cached_remote_ice_candidates_.insert(
222         cached_remote_ice_candidates_.end(),
223         std::make_move_iterator(ice_candidates.begin()),
224         std::make_move_iterator(ice_candidates.end()));
225     return true;
226   }
227 
228   for (auto&& ice_candidate : ice_candidates) {
229     if (!peer_connection_->AddIceCandidate(ice_candidate.get())) {
230       NEARBY_LOG(WARNING, "Unable to add remote ice candidate.");
231     }
232   }
233   return true;
234 }
235 
236 Future<rtc::scoped_refptr<webrtc::DataChannelInterface>>
GetDataChannel()237 ConnectionFlow::GetDataChannel() {
238   return data_channel_future_;
239 }
240 
Close()241 bool ConnectionFlow::Close() {
242   MutexLock lock(&mutex_);
243   return CloseLocked();
244 }
245 
InitPeerConnection(WebRtcMedium & webrtc_medium)246 bool ConnectionFlow::InitPeerConnection(WebRtcMedium& webrtc_medium) {
247   Future<bool> success_future;
248   // CreatePeerConnection callback may be invoked after ConnectionFlow lifetime
249   // has ended, in case of a timeout. Future is captured by value, and is safe
250   // to access, but it is not safe to access ConnectionFlow member variables
251   // unless the Future::Set() returns true.
252   webrtc_medium.CreatePeerConnection(
253       &peer_connection_observer_,
254       [this, success_future](rtc::scoped_refptr<webrtc::PeerConnectionInterface>
255                                  peer_connection) mutable {
256         if (!peer_connection) {
257           success_future.Set(false);
258           return;
259         }
260 
261         // If this fails, means we have already assigned something to
262         // success_future; it is either:
263         // 1) this is the 2nd call of this callback (and this is a bug), or
264         // 2) Get(timeout) has set the future value as exception already.
265         if (success_future.IsSet()) return;
266         peer_connection_ = peer_connection;
267         success_future.Set(true);
268       });
269 
270   ExceptionOr<bool> result = success_future.Get(kTimeout);
271   return result.ok() && result.result();
272 }
273 
OnSignalingStable()274 void ConnectionFlow::OnSignalingStable() {
275   MutexLock lock(&mutex_);
276 
277   if (state_ != State::kWaitingToConnect && state_ != State::kConnected) return;
278 
279   for (auto&& ice_candidate : cached_remote_ice_candidates_) {
280     if (!peer_connection_->AddIceCandidate(ice_candidate.get())) {
281       NEARBY_LOG(WARNING, "Unable to add remote ice candidate.");
282     }
283   }
284   cached_remote_ice_candidates_.clear();
285 }
286 
ProcessOnPeerConnectionChange(webrtc::PeerConnectionInterface::PeerConnectionState new_state)287 void ConnectionFlow::ProcessOnPeerConnectionChange(
288     webrtc::PeerConnectionInterface::PeerConnectionState new_state) {
289   if (new_state == PeerConnectionState::kClosed ||
290       new_state == PeerConnectionState::kFailed ||
291       new_state == PeerConnectionState::kDisconnected) {
292     MutexLock lock(&mutex_);
293     CloseAndNotifyLocked();
294   }
295 }
296 
ProcessDataChannelConnected()297 void ConnectionFlow::ProcessDataChannelConnected() {
298   MutexLock lock(&mutex_);
299   NEARBY_LOG(INFO, "Data channel state changed to connected.");
300   if (!TransitionState(State::kWaitingToConnect, State::kConnected))
301     CloseAndNotifyLocked();
302 }
303 
CreateDataChannelObserver(rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel)304 webrtc::DataChannelObserver* ConnectionFlow::CreateDataChannelObserver(
305     rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) {
306   if (!data_channel_observer_) {
307     auto state_change_callback = [this,
308                                   data_channel{std::move(data_channel)}]() {
309       if (data_channel->state() ==
310           webrtc::DataChannelInterface::DataState::kOpen) {
311         data_channel_future_.Set(std::move(data_channel));
312         OffloadFromSignalingThread([this]() { ProcessDataChannelConnected(); });
313       } else if (data_channel->state() ==
314                  webrtc::DataChannelInterface::DataState::kClosed) {
315         data_channel->UnregisterObserver();
316         OffloadFromSignalingThread([this]() {
317           MutexLock lock(&mutex_);
318           CloseAndNotifyLocked();
319         });
320       }
321     };
322     data_channel_observer_ = absl::make_unique<DataChannelObserverImpl>(
323         &data_channel_listener_, std::move(state_change_callback));
324   }
325 
326   return reinterpret_cast<webrtc::DataChannelObserver*>(
327       data_channel_observer_.get());
328 }
329 
TransitionState(State current_state,State new_state)330 bool ConnectionFlow::TransitionState(State current_state, State new_state) {
331   if (current_state != state_) {
332     NEARBY_LOG(
333         WARNING,
334         "Invalid state transition to %d: current state is %d but expected %d.",
335         new_state, state_, current_state);
336     return false;
337   }
338   state_ = new_state;
339   return true;
340 }
341 
CloseAndNotifyLocked()342 void ConnectionFlow::CloseAndNotifyLocked() {
343   if (CloseLocked()) {
344     data_channel_listener_.data_channel_closed_cb();
345   }
346 }
347 
CloseLocked()348 bool ConnectionFlow::CloseLocked() {
349   if (state_ == State::kEnded) {
350     return false;
351   }
352   state_ = State::kEnded;
353 
354   data_channel_future_.SetException({Exception::kInterrupted});
355   if (peer_connection_) peer_connection_->Close();
356 
357   data_channel_observer_.reset();
358   NEARBY_LOG(INFO, "Closed WebRTC connection.");
359   return true;
360 }
361 
OffloadFromSignalingThread(Runnable runnable)362 void ConnectionFlow::OffloadFromSignalingThread(Runnable runnable) {
363   single_threaded_signaling_offloader_.Execute(std::move(runnable));
364 }
365 
366 }  // namespace mediums
367 }  // namespace connections
368 }  // namespace nearby
369 }  // namespace location
370