1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "core/internal/service_controller_router.h"
16
17 #include <cstddef>
18 #include <memory>
19 #include <string>
20 #include <utility>
21
22 #include "core/internal/client_proxy.h"
23 #include "core/listeners.h"
24 #include "core/options.h"
25 #include "core/params.h"
26 #include "core/payload.h"
27 #include "platform/public/logging.h"
28 #include "absl/time/clock.h"
29
30 namespace location {
31 namespace nearby {
32 namespace connections {
33 namespace {
34 // Length of a MAC address, which consists of 6 bytes uniquely identifying a
35 // hardware interface.
36 const std::size_t kMacAddressLength = 6u;
37
38 // Length used for an endpoint ID, which identifies a device discovery and
39 // associated connection request.
40 const std::size_t kEndpointIdLength = 4u;
41
42 // Maximum length for information describing an endpoint; this information is
43 // advertised by one device and can be used by the other device to identify the
44 // advertiser.
45 const std::size_t kMaxEndpointInfoLength = 131u;
46 } // namespace
47
~ServiceControllerRouter()48 ServiceControllerRouter::~ServiceControllerRouter() {
49 NEARBY_LOG(INFO, "ServiceControllerRouter going down.");
50
51 service_controller_.reset();
52 // And make sure that cleanup is the last thing we do.
53 serializer_.Shutdown();
54 }
55
StartAdvertising(ClientProxy * client,absl::string_view service_id,const ConnectionOptions & options,const ConnectionRequestInfo & info,const ResultCallback & callback)56 void ServiceControllerRouter::StartAdvertising(
57 ClientProxy* client, absl::string_view service_id,
58 const ConnectionOptions& options, const ConnectionRequestInfo& info,
59 const ResultCallback& callback) {
60 RouteToServiceController([this, client, service_id = std::string(service_id),
61 options, info, callback]() {
62 Status status = AcquireServiceControllerForClient(client, options.strategy);
63 if (!status.Ok()) {
64 callback.result_cb(status);
65 return;
66 }
67
68 if (client->IsAdvertising()) {
69 callback.result_cb({Status::kAlreadyAdvertising});
70 return;
71 }
72
73 status = service_controller_->StartAdvertising(client, service_id, options,
74 info);
75 callback.result_cb(status);
76 });
77 }
78
StopAdvertising(ClientProxy * client,const ResultCallback & callback)79 void ServiceControllerRouter::StopAdvertising(ClientProxy* client,
80 const ResultCallback& callback) {
81 RouteToServiceController([this, client, callback]() {
82 if (ClientHasAcquiredServiceController(client) && client->IsAdvertising()) {
83 service_controller_->StopAdvertising(client);
84 }
85 callback.result_cb({Status::kSuccess});
86 });
87 }
88
StartDiscovery(ClientProxy * client,absl::string_view service_id,const ConnectionOptions & options,const DiscoveryListener & listener,const ResultCallback & callback)89 void ServiceControllerRouter::StartDiscovery(ClientProxy* client,
90 absl::string_view service_id,
91 const ConnectionOptions& options,
92 const DiscoveryListener& listener,
93 const ResultCallback& callback) {
94 RouteToServiceController([this, client, service_id = std::string(service_id),
95 options, listener, callback]() {
96 Status status = AcquireServiceControllerForClient(client, options.strategy);
97 if (!status.Ok()) {
98 callback.result_cb(status);
99 return;
100 }
101
102 if (client->IsDiscovering()) {
103 callback.result_cb({Status::kAlreadyDiscovering});
104 return;
105 }
106
107 status = service_controller_->StartDiscovery(client, service_id, options,
108 listener);
109 callback.result_cb(status);
110 });
111 }
112
StopDiscovery(ClientProxy * client,const ResultCallback & callback)113 void ServiceControllerRouter::StopDiscovery(ClientProxy* client,
114 const ResultCallback& callback) {
115 RouteToServiceController([this, client, callback]() {
116 if (ClientHasAcquiredServiceController(client) && client->IsDiscovering()) {
117 service_controller_->StopDiscovery(client);
118 }
119 callback.result_cb({Status::kSuccess});
120 });
121 }
122
InjectEndpoint(ClientProxy * client,absl::string_view service_id,const OutOfBandConnectionMetadata & metadata,const ResultCallback & callback)123 void ServiceControllerRouter::InjectEndpoint(
124 ClientProxy* client, absl::string_view service_id,
125 const OutOfBandConnectionMetadata& metadata,
126 const ResultCallback& callback) {
127 RouteToServiceController(
128 [this, client, service_id = std::string(service_id), metadata,
129 callback]() {
130 // Currently, Bluetooth is the only supported medium for endpoint injection.
131 if (metadata.medium != Medium::BLUETOOTH ||
132 metadata.remote_bluetooth_mac_address.size() != kMacAddressLength) {
133 callback.result_cb({Status::kError});
134 return;
135 }
136
137 if (metadata.endpoint_id.size() != kEndpointIdLength) {
138 callback.result_cb({Status::kError});
139 return;
140 }
141
142 if (metadata.endpoint_info.Empty() ||
143 metadata.endpoint_info.size() > kMaxEndpointInfoLength) {
144 callback.result_cb({Status::kError});
145 return;
146 }
147
148 if (!ClientHasAcquiredServiceController(client) ||
149 !client->IsDiscovering()) {
150 callback.result_cb({Status::kOutOfOrderApiCall});
151 return;
152 }
153
154 service_controller_->InjectEndpoint(client, service_id, metadata);
155 callback.result_cb({Status::kSuccess});
156 });
157 }
158
RequestConnection(ClientProxy * client,absl::string_view endpoint_id,const ConnectionRequestInfo & info,const ConnectionOptions & options,const ResultCallback & callback)159 void ServiceControllerRouter::RequestConnection(
160 ClientProxy* client, absl::string_view endpoint_id,
161 const ConnectionRequestInfo& info, const ConnectionOptions& options,
162 const ResultCallback& callback) {
163 RouteToServiceController([this, client,
164 endpoint_id = std::string(endpoint_id), info,
165 options, callback]() {
166 if (!ClientHasAcquiredServiceController(client)) {
167 callback.result_cb({Status::kOutOfOrderApiCall});
168 return;
169 }
170
171 if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
172 client->IsConnectedToEndpoint(endpoint_id)) {
173 callback.result_cb({Status::kAlreadyConnectedToEndpoint});
174 return;
175 }
176
177 callback.result_cb(service_controller_->RequestConnection(
178 client, endpoint_id, info, options));
179 });
180 }
181
AcceptConnection(ClientProxy * client,absl::string_view endpoint_id,const PayloadListener & listener,const ResultCallback & callback)182 void ServiceControllerRouter::AcceptConnection(ClientProxy* client,
183 absl::string_view endpoint_id,
184 const PayloadListener& listener,
185 const ResultCallback& callback) {
186 RouteToServiceController([this, client,
187 endpoint_id = std::string(endpoint_id), listener,
188 callback]() {
189 if (!ClientHasAcquiredServiceController(client)) {
190 callback.result_cb({Status::kOutOfOrderApiCall});
191 return;
192 }
193
194 if (client->IsConnectedToEndpoint(endpoint_id)) {
195 callback.result_cb({Status::kAlreadyConnectedToEndpoint});
196 return;
197 }
198
199 if (client->HasLocalEndpointResponded(endpoint_id)) {
200 NEARBY_LOG(INFO,
201 "[ServiceControllerRouter:Accept]: Client has local "
202 "endpoint responded; id=%s",
203 endpoint_id.c_str());
204 callback.result_cb({Status::kOutOfOrderApiCall});
205 return;
206 }
207
208 callback.result_cb(
209 service_controller_->AcceptConnection(client, endpoint_id, listener));
210 });
211 }
212
RejectConnection(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)213 void ServiceControllerRouter::RejectConnection(ClientProxy* client,
214 absl::string_view endpoint_id,
215 const ResultCallback& callback) {
216 RouteToServiceController(
217 [this, client, endpoint_id = std::string(endpoint_id), callback]() {
218 if (!ClientHasAcquiredServiceController(client)) {
219 callback.result_cb({Status::kOutOfOrderApiCall});
220 return;
221 }
222
223 if (client->IsConnectedToEndpoint(endpoint_id)) {
224 callback.result_cb({Status::kAlreadyConnectedToEndpoint});
225 return;
226 }
227
228 if (client->HasLocalEndpointResponded(endpoint_id)) {
229 NEARBY_LOG(INFO,
230 "[ServiceControllerRouter:Reject]: Client has local "
231 "endpoint responded; id=%s",
232 endpoint_id.c_str());
233 callback.result_cb({Status::kOutOfOrderApiCall});
234 return;
235 }
236
237 callback.result_cb(
238 service_controller_->RejectConnection(client, endpoint_id));
239 });
240 }
241
InitiateBandwidthUpgrade(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)242 void ServiceControllerRouter::InitiateBandwidthUpgrade(
243 ClientProxy* client, absl::string_view endpoint_id,
244 const ResultCallback& callback) {
245 RouteToServiceController(
246 [this, client, endpoint_id = std::string(endpoint_id), callback]() {
247 if (!ClientHasAcquiredServiceController(client) ||
248 !client->IsConnectedToEndpoint(endpoint_id)) {
249 callback.result_cb({Status::kOutOfOrderApiCall});
250 return;
251 }
252
253 service_controller_->InitiateBandwidthUpgrade(client, endpoint_id);
254
255 // Operation is triggered; the caller can listen to
256 // ConnectionListener::OnBandwidthChanged() to determine its success.
257 callback.result_cb({Status::kSuccess});
258 });
259 }
260
SendPayload(ClientProxy * client,absl::Span<const std::string> endpoint_ids,Payload payload,const ResultCallback & callback)261 void ServiceControllerRouter::SendPayload(
262 ClientProxy* client, absl::Span<const std::string> endpoint_ids,
263 Payload payload, const ResultCallback& callback) {
264 // Payload is a move-only type.
265 // We have to capture it by value inside the lambda, and pass it over to
266 // the executor as an std::function<void()> instance.
267 // Lambda must be copyable, in order ot satisfy std::function<> requirements.
268 // To make it so, we need Payload wrapped by a copyable wrapper.
269 // std::shared_ptr<> is used, because it is copyable.
270 auto shared_payload = std::make_shared<Payload>(std::move(payload));
271 const std::vector<std::string> endpoints =
272 std::vector<std::string>(endpoint_ids.begin(), endpoint_ids.end());
273
274 RouteToServiceController(
275 [this, client, shared_payload, endpoints, callback]() {
276 if (!ClientHasAcquiredServiceController(client)) {
277 callback.result_cb({Status::kOutOfOrderApiCall});
278 return;
279 }
280
281 if (!ClientHasConnectionToAtLeastOneEndpoint(client, endpoints)) {
282 callback.result_cb({Status::kEndpointUnknown});
283 return;
284 }
285
286 service_controller_->SendPayload(client, endpoints,
287 std::move(*shared_payload));
288
289 // At this point, we've queued up the send Payload request with the
290 // ServiceController; any further failures (e.g. one of the endpoints is
291 // unknown, goes away, or otherwise fails) will be returned to the
292 // client as a PayloadTransferUpdate.
293 callback.result_cb({Status::kSuccess});
294 });
295 }
296
CancelPayload(ClientProxy * client,std::uint64_t payload_id,const ResultCallback & callback)297 void ServiceControllerRouter::CancelPayload(ClientProxy* client,
298 std::uint64_t payload_id,
299 const ResultCallback& callback) {
300 RouteToServiceController([this, client, payload_id, callback]() {
301 if (!ClientHasAcquiredServiceController(client)) {
302 callback.result_cb({Status::kOutOfOrderApiCall});
303 return;
304 }
305
306 callback.result_cb(service_controller_->CancelPayload(client, payload_id));
307 });
308 }
309
DisconnectFromEndpoint(ClientProxy * client,absl::string_view endpoint_id,const ResultCallback & callback)310 void ServiceControllerRouter::DisconnectFromEndpoint(
311 ClientProxy* client, absl::string_view endpoint_id,
312 const ResultCallback& callback) {
313 RouteToServiceController(
314 [this, client, endpoint_id = std::string(endpoint_id), callback]() {
315 if (ClientHasAcquiredServiceController(client)) {
316 if (!client->IsConnectedToEndpoint(endpoint_id) &&
317 !client->HasPendingConnectionToEndpoint(endpoint_id)) {
318 callback.result_cb({Status::kOutOfOrderApiCall});
319 return;
320 }
321 service_controller_->DisconnectFromEndpoint(client, endpoint_id);
322 callback.result_cb({Status::kSuccess});
323 }
324 });
325 }
326
StopAllEndpoints(ClientProxy * client,const ResultCallback & callback)327 void ServiceControllerRouter::StopAllEndpoints(ClientProxy* client,
328 const ResultCallback& callback) {
329 RouteToServiceController([this, client, callback]() {
330 if (ClientHasAcquiredServiceController(client)) {
331 DoneWithStrategySessionForClient(client);
332 }
333 callback.result_cb({Status::kSuccess});
334 });
335 }
336
ClientDisconnecting(ClientProxy * client,const ResultCallback & callback)337 void ServiceControllerRouter::ClientDisconnecting(
338 ClientProxy* client, const ResultCallback& callback) {
339 RouteToServiceController([this, client, callback]() {
340 if (ClientHasAcquiredServiceController(client)) {
341 DoneWithStrategySessionForClient(client);
342 NEARBY_LOG(INFO,
343 "[ServiceControllerRouter:Disconnect]: Client has completed "
344 "the client's connection");
345 }
346 callback.result_cb({Status::kSuccess});
347 });
348 }
349
AcquireServiceControllerForClient(ClientProxy * client,Strategy strategy)350 Status ServiceControllerRouter::AcquireServiceControllerForClient(
351 ClientProxy* client, Strategy strategy) {
352 if (current_strategy_.IsNone()) {
353 // Case 1: There is no existing Strategy at all.
354
355 // Set everything up for the first time.
356 Status status = UpdateCurrentServiceControllerAndStrategy(strategy);
357 if (!status.Ok()) {
358 return status;
359 }
360 clients_.insert(client);
361 return {Status::kSuccess};
362 } else if (strategy == current_strategy_) {
363 // Case 2: The existing Strategy matches.
364
365 // The new client just needs to be added to the set of clients using the
366 // current ServiceController.
367 clients_.insert(client);
368 return {Status::kSuccess};
369 } else {
370 // Case 3: The existing Strategy doesn't match.
371
372 // It's only safe for a client to cause a switch if it's the only client
373 // using the current ServiceController.
374 bool is_the_only_client_of_service_controller =
375 clients_.size() == 1 && ClientHasAcquiredServiceController(client);
376 if (!is_the_only_client_of_service_controller) {
377 NEARBY_LOG(INFO,
378 "[ServiceControllerRouter:AcquireServiceControllerForClient]: "
379 "Client has already active strategy.");
380 return {Status::kAlreadyHaveActiveStrategy};
381 }
382
383 // If the client still has connected endpoints, they must disconnect before
384 // they can switch.
385 if (!client->GetConnectedEndpoints().empty()) {
386 NEARBY_LOG(INFO,
387 "[ServiceControllerRouter:AcquireServiceControllerForClient]: "
388 "Client has connected endpoints.");
389 return {Status::kOutOfOrderApiCall};
390 }
391
392 // By this point, it's safe to switch the Strategy and ServiceController
393 // (and since it's the only client, there's no need to add it to the set of
394 // clients using the current ServiceController).
395 return UpdateCurrentServiceControllerAndStrategy(strategy);
396 }
397 }
398
ClientHasAcquiredServiceController(ClientProxy * client) const399 bool ServiceControllerRouter::ClientHasAcquiredServiceController(
400 ClientProxy* client) const {
401 return clients_.contains(client);
402 }
403
ReleaseServiceControllerForClient(ClientProxy * client)404 void ServiceControllerRouter::ReleaseServiceControllerForClient(
405 ClientProxy* client) {
406 clients_.erase(client);
407
408 // service_controller_ won't be released here. Instead, in desctructor.
409 if (clients_.empty()) {
410 current_strategy_ = Strategy{};
411 }
412 }
413
414 /** Clean up all state for this client. The client is now free to switch
415 * strategies. */
DoneWithStrategySessionForClient(ClientProxy * client)416 void ServiceControllerRouter::DoneWithStrategySessionForClient(
417 ClientProxy* client) {
418 // Disconnect from all the connected endpoints tied to this clientProxy.
419 for (auto& endpoint_id : client->GetPendingConnectedEndpoints()) {
420 service_controller_->DisconnectFromEndpoint(client, endpoint_id);
421 }
422
423 for (auto& endpoint_id : client->GetConnectedEndpoints()) {
424 service_controller_->DisconnectFromEndpoint(client, endpoint_id);
425 }
426
427 // Stop any advertising and discovery that may be underway due to this
428 // clientProxy.
429 service_controller_->StopAdvertising(client);
430 service_controller_->StopDiscovery(client);
431
432 ReleaseServiceControllerForClient(client);
433 }
434
RouteToServiceController(Runnable runnable)435 void ServiceControllerRouter::RouteToServiceController(Runnable runnable) {
436 serializer_.Execute(std::move(runnable));
437 }
438
ClientHasConnectionToAtLeastOneEndpoint(ClientProxy * client,const std::vector<std::string> & remote_endpoint_ids)439 bool ServiceControllerRouter::ClientHasConnectionToAtLeastOneEndpoint(
440 ClientProxy* client, const std::vector<std::string>& remote_endpoint_ids) {
441 for (auto& endpoint_id : remote_endpoint_ids) {
442 if (client->IsConnectedToEndpoint(endpoint_id)) {
443 return true;
444 }
445 }
446 return false;
447 }
448
UpdateCurrentServiceControllerAndStrategy(Strategy strategy)449 Status ServiceControllerRouter::UpdateCurrentServiceControllerAndStrategy(
450 Strategy strategy) {
451 if (!strategy.IsValid()) {
452 NEARBY_LOG(INFO, "Strategy is not valid.");
453 return {Status::kError};
454 }
455
456 service_controller_.reset(service_controller_factory_());
457 current_strategy_ = strategy;
458
459 return {Status::kSuccess};
460 }
461
462 } // namespace connections
463 } // namespace nearby
464 } // namespace location
465