1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "core/internal/service_controller_router.h"
16 
17 #include <cstddef>
18 #include <memory>
19 #include <string>
20 #include <utility>
21 
22 #include "core/internal/client_proxy.h"
23 #include "core/listeners.h"
24 #include "core/options.h"
25 #include "core/params.h"
26 #include "core/payload.h"
27 #include "platform/public/logging.h"
28 #include "absl/time/clock.h"
29 
30 namespace location {
31 namespace nearby {
32 namespace connections {
33 namespace {
34 // Length of a MAC address, which consists of 6 bytes uniquely identifying a
35 // hardware interface.
36 const std::size_t kMacAddressLength = 6u;
37 
38 // Length used for an endpoint ID, which identifies a device discovery and
39 // associated connection request.
40 const std::size_t kEndpointIdLength = 4u;
41 
42 // Maximum length for information describing an endpoint; this information is
43 // advertised by one device and can be used by the other device to identify the
44 // advertiser.
45 const std::size_t kMaxEndpointInfoLength = 131u;
46 }  // namespace
47 
~ServiceControllerRouter()48 ServiceControllerRouter::~ServiceControllerRouter() {
49   NEARBY_LOG(INFO, "ServiceControllerRouter going down.");
50 
51   service_controller_.reset();
52   // And make sure that cleanup is the last thing we do.
53   serializer_.Shutdown();
54 }
55 
StartAdvertising(ClientProxy * client,absl::string_view service_id,const ConnectionOptions & options,const ConnectionRequestInfo & info,const ResultCallback & callback)56 void ServiceControllerRouter::StartAdvertising(
57     ClientProxy* client, absl::string_view service_id,
58     const ConnectionOptions& options, const ConnectionRequestInfo& info,
59     const ResultCallback& callback) {
60   RouteToServiceController([this, client, service_id = std::string(service_id),
61                             options, info, callback]() {
62     Status status = AcquireServiceControllerForClient(client, options.strategy);
63     if (!status.Ok()) {
64       callback.result_cb(status);
65       return;
66     }
67 
68     if (client->IsAdvertising()) {
69       callback.result_cb({Status::kAlreadyAdvertising});
70       return;
71     }
72 
73     status = service_controller_->StartAdvertising(client, service_id, options,
74                                                    info);
75     callback.result_cb(status);
76   });
77 }
78 
StopAdvertising(ClientProxy * client,const ResultCallback & callback)79 void ServiceControllerRouter::StopAdvertising(ClientProxy* client,
80                                               const ResultCallback& callback) {
81   RouteToServiceController([this, client, callback]() {
82     if (ClientHasAcquiredServiceController(client) && client->IsAdvertising()) {
83       service_controller_->StopAdvertising(client);
84     }
85     callback.result_cb({Status::kSuccess});
86   });
87 }
88 
StartDiscovery(ClientProxy * client,absl::string_view service_id,const ConnectionOptions & options,const DiscoveryListener & listener,const ResultCallback & callback)89 void ServiceControllerRouter::StartDiscovery(ClientProxy* client,
90                                              absl::string_view service_id,
91                                              const ConnectionOptions& options,
92                                              const DiscoveryListener& listener,
93                                              const ResultCallback& callback) {
94   RouteToServiceController([this, client, service_id = std::string(service_id),
95                             options, listener, callback]() {
96     Status status = AcquireServiceControllerForClient(client, options.strategy);
97     if (!status.Ok()) {
98       callback.result_cb(status);
99       return;
100     }
101 
102     if (client->IsDiscovering()) {
103       callback.result_cb({Status::kAlreadyDiscovering});
104       return;
105     }
106 
107     status = service_controller_->StartDiscovery(client, service_id, options,
108                                                  listener);
109     callback.result_cb(status);
110   });
111 }
112 
StopDiscovery(ClientProxy * client,const ResultCallback & callback)113 void ServiceControllerRouter::StopDiscovery(ClientProxy* client,
114                                             const ResultCallback& callback) {
115   RouteToServiceController([this, client, callback]() {
116     if (ClientHasAcquiredServiceController(client) && client->IsDiscovering()) {
117       service_controller_->StopDiscovery(client);
118     }
119     callback.result_cb({Status::kSuccess});
120   });
121 }
122 
InjectEndpoint(ClientProxy * client,absl::string_view service_id,const OutOfBandConnectionMetadata & metadata,const ResultCallback & callback)123 void ServiceControllerRouter::InjectEndpoint(
124     ClientProxy* client, absl::string_view service_id,
125     const OutOfBandConnectionMetadata& metadata,
126     const ResultCallback& callback) {
127   RouteToServiceController(
128       [this, client, service_id = std::string(service_id), metadata,
129        callback]() {
130     // Currently, Bluetooth is the only supported medium for endpoint injection.
131     if (metadata.medium != Medium::BLUETOOTH ||
132         metadata.remote_bluetooth_mac_address.size() != kMacAddressLength) {
133       callback.result_cb({Status::kError});
134       return;
135     }
136 
137     if (metadata.endpoint_id.size() != kEndpointIdLength) {
138       callback.result_cb({Status::kError});
139       return;
140     }
141 
142     if (metadata.endpoint_info.Empty() ||
143         metadata.endpoint_info.size() > kMaxEndpointInfoLength) {
144       callback.result_cb({Status::kError});
145       return;
146     }
147 
148     if (!ClientHasAcquiredServiceController(client) ||
149         !client->IsDiscovering()) {
150       callback.result_cb({Status::kOutOfOrderApiCall});
151       return;
152     }
153 
154     service_controller_->InjectEndpoint(client, service_id, metadata);
155     callback.result_cb({Status::kSuccess});
156   });
157 }
158 
RequestConnection(ClientProxy * client,absl::string_view endpoint_id,const ConnectionRequestInfo & info,const ConnectionOptions & options,const ResultCallback & callback)159 void ServiceControllerRouter::RequestConnection(
160     ClientProxy* client, absl::string_view endpoint_id,
161     const ConnectionRequestInfo& info, const ConnectionOptions& options,
162     const ResultCallback& callback) {
163   RouteToServiceController([this, client,
164                             endpoint_id = std::string(endpoint_id), info,
165                             options, callback]() {
166     if (!ClientHasAcquiredServiceController(client)) {
167       callback.result_cb({Status::kOutOfOrderApiCall});
168       return;
169     }
170 
171     if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
172         client->IsConnectedToEndpoint(endpoint_id)) {
173       callback.result_cb({Status::kAlreadyConnectedToEndpoint});
174       return;
175     }
176 
177     callback.result_cb(service_controller_->RequestConnection(
178         client, endpoint_id, info, options));
179   });
180 }
181 
AcceptConnection(ClientProxy * client,absl::string_view endpoint_id,const PayloadListener & listener,const ResultCallback & callback)182 void ServiceControllerRouter::AcceptConnection(ClientProxy* client,
183                                                absl::string_view endpoint_id,
184                                                const PayloadListener& listener,
185                                                const ResultCallback& callback) {
186   RouteToServiceController([this, client,
187                             endpoint_id = std::string(endpoint_id), listener,
188                             callback]() {
189     if (!ClientHasAcquiredServiceController(client)) {
190       callback.result_cb({Status::kOutOfOrderApiCall});
191       return;
192     }
193 
194     if (client->IsConnectedToEndpoint(endpoint_id)) {
195       callback.result_cb({Status::kAlreadyConnectedToEndpoint});
196       return;
197     }
198 
199     if (client->HasLocalEndpointResponded(endpoint_id)) {
200       NEARBY_LOG(INFO,
201                  "[ServiceControllerRouter:Accept]: Client has local "
202                  "endpoint responded; id=%s",
203                  endpoint_id.c_str());
204       callback.result_cb({Status::kOutOfOrderApiCall});
205       return;
206     }
207 
208     callback.result_cb(
209         service_controller_->AcceptConnection(client, endpoint_id, listener));
210   });
211 }
212 
RejectConnection(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)213 void ServiceControllerRouter::RejectConnection(ClientProxy* client,
214                                                absl::string_view endpoint_id,
215                                                const ResultCallback& callback) {
216   RouteToServiceController(
217       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
218         if (!ClientHasAcquiredServiceController(client)) {
219           callback.result_cb({Status::kOutOfOrderApiCall});
220           return;
221         }
222 
223         if (client->IsConnectedToEndpoint(endpoint_id)) {
224           callback.result_cb({Status::kAlreadyConnectedToEndpoint});
225           return;
226         }
227 
228         if (client->HasLocalEndpointResponded(endpoint_id)) {
229           NEARBY_LOG(INFO,
230                      "[ServiceControllerRouter:Reject]: Client has local "
231                      "endpoint responded; id=%s",
232                      endpoint_id.c_str());
233           callback.result_cb({Status::kOutOfOrderApiCall});
234           return;
235         }
236 
237         callback.result_cb(
238             service_controller_->RejectConnection(client, endpoint_id));
239       });
240 }
241 
InitiateBandwidthUpgrade(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)242 void ServiceControllerRouter::InitiateBandwidthUpgrade(
243     ClientProxy* client, absl::string_view endpoint_id,
244     const ResultCallback& callback) {
245   RouteToServiceController(
246       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
247         if (!ClientHasAcquiredServiceController(client) ||
248             !client->IsConnectedToEndpoint(endpoint_id)) {
249           callback.result_cb({Status::kOutOfOrderApiCall});
250           return;
251         }
252 
253         service_controller_->InitiateBandwidthUpgrade(client, endpoint_id);
254 
255         // Operation is triggered; the caller can listen to
256         // ConnectionListener::OnBandwidthChanged() to determine its success.
257         callback.result_cb({Status::kSuccess});
258       });
259 }
260 
SendPayload(ClientProxy * client,absl::Span<const std::string> endpoint_ids,Payload payload,const ResultCallback & callback)261 void ServiceControllerRouter::SendPayload(
262     ClientProxy* client, absl::Span<const std::string> endpoint_ids,
263     Payload payload, const ResultCallback& callback) {
264   // Payload is a move-only type.
265   // We have to capture it by value inside the lambda, and pass it over to
266   // the executor as an std::function<void()> instance.
267   // Lambda must be copyable, in order ot satisfy std::function<> requirements.
268   // To make it so, we need Payload wrapped by a copyable wrapper.
269   // std::shared_ptr<> is used, because it is copyable.
270   auto shared_payload = std::make_shared<Payload>(std::move(payload));
271   const std::vector<std::string> endpoints =
272       std::vector<std::string>(endpoint_ids.begin(), endpoint_ids.end());
273 
274   RouteToServiceController(
275       [this, client, shared_payload, endpoints, callback]() {
276         if (!ClientHasAcquiredServiceController(client)) {
277           callback.result_cb({Status::kOutOfOrderApiCall});
278           return;
279         }
280 
281         if (!ClientHasConnectionToAtLeastOneEndpoint(client, endpoints)) {
282           callback.result_cb({Status::kEndpointUnknown});
283           return;
284         }
285 
286         service_controller_->SendPayload(client, endpoints,
287                                          std::move(*shared_payload));
288 
289         // At this point, we've queued up the send Payload request with the
290         // ServiceController; any further failures (e.g. one of the endpoints is
291         // unknown, goes away, or otherwise fails) will be returned to the
292         // client as a PayloadTransferUpdate.
293         callback.result_cb({Status::kSuccess});
294       });
295 }
296 
CancelPayload(ClientProxy * client,std::uint64_t payload_id,const ResultCallback & callback)297 void ServiceControllerRouter::CancelPayload(ClientProxy* client,
298                                             std::uint64_t payload_id,
299                                             const ResultCallback& callback) {
300   RouteToServiceController([this, client, payload_id, callback]() {
301     if (!ClientHasAcquiredServiceController(client)) {
302       callback.result_cb({Status::kOutOfOrderApiCall});
303       return;
304     }
305 
306     callback.result_cb(service_controller_->CancelPayload(client, payload_id));
307   });
308 }
309 
DisconnectFromEndpoint(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)310 void ServiceControllerRouter::DisconnectFromEndpoint(
311     ClientProxy* client, absl::string_view endpoint_id,
312     const ResultCallback& callback) {
313   RouteToServiceController(
314       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
315         if (ClientHasAcquiredServiceController(client)) {
316           if (!client->IsConnectedToEndpoint(endpoint_id) &&
317               !client->HasPendingConnectionToEndpoint(endpoint_id)) {
318             callback.result_cb({Status::kOutOfOrderApiCall});
319             return;
320           }
321           service_controller_->DisconnectFromEndpoint(client, endpoint_id);
322           callback.result_cb({Status::kSuccess});
323         }
324       });
325 }
326 
StopAllEndpoints(ClientProxy * client,const ResultCallback & callback)327 void ServiceControllerRouter::StopAllEndpoints(ClientProxy* client,
328                                                const ResultCallback& callback) {
329   RouteToServiceController([this, client, callback]() {
330     if (ClientHasAcquiredServiceController(client)) {
331       DoneWithStrategySessionForClient(client);
332     }
333     callback.result_cb({Status::kSuccess});
334   });
335 }
336 
ClientDisconnecting(ClientProxy * client,const ResultCallback & callback)337 void ServiceControllerRouter::ClientDisconnecting(
338     ClientProxy* client, const ResultCallback& callback) {
339   RouteToServiceController([this, client, callback]() {
340     if (ClientHasAcquiredServiceController(client)) {
341       DoneWithStrategySessionForClient(client);
342       NEARBY_LOG(INFO,
343                  "[ServiceControllerRouter:Disconnect]: Client has completed "
344                  "the client's connection");
345     }
346     callback.result_cb({Status::kSuccess});
347   });
348 }
349 
AcquireServiceControllerForClient(ClientProxy * client,Strategy strategy)350 Status ServiceControllerRouter::AcquireServiceControllerForClient(
351     ClientProxy* client, Strategy strategy) {
352   if (current_strategy_.IsNone()) {
353     // Case 1: There is no existing Strategy at all.
354 
355     // Set everything up for the first time.
356     Status status = UpdateCurrentServiceControllerAndStrategy(strategy);
357     if (!status.Ok()) {
358       return status;
359     }
360     clients_.insert(client);
361     return {Status::kSuccess};
362   } else if (strategy == current_strategy_) {
363     // Case 2: The existing Strategy matches.
364 
365     // The new client just needs to be added to the set of clients using the
366     // current ServiceController.
367     clients_.insert(client);
368     return {Status::kSuccess};
369   } else {
370     // Case 3: The existing Strategy doesn't match.
371 
372     // It's only safe for a client to cause a switch if it's the only client
373     // using the current ServiceController.
374     bool is_the_only_client_of_service_controller =
375         clients_.size() == 1 && ClientHasAcquiredServiceController(client);
376     if (!is_the_only_client_of_service_controller) {
377       NEARBY_LOG(INFO,
378                  "[ServiceControllerRouter:AcquireServiceControllerForClient]: "
379                  "Client has already active strategy.");
380       return {Status::kAlreadyHaveActiveStrategy};
381     }
382 
383     // If the client still has connected endpoints, they must disconnect before
384     // they can switch.
385     if (!client->GetConnectedEndpoints().empty()) {
386       NEARBY_LOG(INFO,
387                  "[ServiceControllerRouter:AcquireServiceControllerForClient]: "
388                  "Client has connected endpoints.");
389       return {Status::kOutOfOrderApiCall};
390     }
391 
392     // By this point, it's safe to switch the Strategy and ServiceController
393     // (and since it's the only client, there's no need to add it to the set of
394     // clients using the current ServiceController).
395     return UpdateCurrentServiceControllerAndStrategy(strategy);
396   }
397 }
398 
ClientHasAcquiredServiceController(ClientProxy * client) const399 bool ServiceControllerRouter::ClientHasAcquiredServiceController(
400     ClientProxy* client) const {
401   return clients_.contains(client);
402 }
403 
ReleaseServiceControllerForClient(ClientProxy * client)404 void ServiceControllerRouter::ReleaseServiceControllerForClient(
405     ClientProxy* client) {
406   clients_.erase(client);
407 
408   // service_controller_ won't be released here. Instead, in desctructor.
409   if (clients_.empty()) {
410     current_strategy_ = Strategy{};
411   }
412 }
413 
414 /** Clean up all state for this client. The client is now free to switch
415  * strategies. */
DoneWithStrategySessionForClient(ClientProxy * client)416 void ServiceControllerRouter::DoneWithStrategySessionForClient(
417     ClientProxy* client) {
418   // Disconnect from all the connected endpoints tied to this clientProxy.
419   for (auto& endpoint_id : client->GetPendingConnectedEndpoints()) {
420     service_controller_->DisconnectFromEndpoint(client, endpoint_id);
421   }
422 
423   for (auto& endpoint_id : client->GetConnectedEndpoints()) {
424     service_controller_->DisconnectFromEndpoint(client, endpoint_id);
425   }
426 
427   // Stop any advertising and discovery that may be underway due to this
428   // clientProxy.
429   service_controller_->StopAdvertising(client);
430   service_controller_->StopDiscovery(client);
431 
432   ReleaseServiceControllerForClient(client);
433 }
434 
RouteToServiceController(Runnable runnable)435 void ServiceControllerRouter::RouteToServiceController(Runnable runnable) {
436   serializer_.Execute(std::move(runnable));
437 }
438 
ClientHasConnectionToAtLeastOneEndpoint(ClientProxy * client,const std::vector<std::string> & remote_endpoint_ids)439 bool ServiceControllerRouter::ClientHasConnectionToAtLeastOneEndpoint(
440     ClientProxy* client, const std::vector<std::string>& remote_endpoint_ids) {
441   for (auto& endpoint_id : remote_endpoint_ids) {
442     if (client->IsConnectedToEndpoint(endpoint_id)) {
443       return true;
444     }
445   }
446   return false;
447 }
448 
UpdateCurrentServiceControllerAndStrategy(Strategy strategy)449 Status ServiceControllerRouter::UpdateCurrentServiceControllerAndStrategy(
450     Strategy strategy) {
451   if (!strategy.IsValid()) {
452     NEARBY_LOG(INFO, "Strategy is not valid.");
453     return {Status::kError};
454   }
455 
456   service_controller_.reset(service_controller_factory_());
457   current_strategy_ = strategy;
458 
459   return {Status::kSuccess};
460 }
461 
462 }  // namespace connections
463 }  // namespace nearby
464 }  // namespace location
465