1 /*
2 * Copyright (C) 2016-2019 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "dht_proxy_client.h"
21
22 #include "dhtrunner.h"
23 #include "op_cache.h"
24 #include "utils.h"
25
26 #include <restbed>
27 #include <json/json.h>
28
29 #include <chrono>
30 #include <vector>
31
32 namespace dht {
33
34 struct DhtProxyClient::InfoState {
35 std::atomic_uint ipv4 {0}, ipv6 {0};
36 std::atomic_bool cancel {false};
37 };
38
39 struct DhtProxyClient::ListenState {
40 std::atomic_bool ok {true};
41 std::atomic_bool cancel {false};
42 };
43
44 struct DhtProxyClient::Listener
45 {
46 OpValueCache cache;
47 ValueCallback cb;
48 Sp<restbed::Request> req;
49 std::thread thread;
50 unsigned callbackId;
51 Sp<ListenState> state;
52 Sp<Scheduler::Job> refreshJob;
Listenerdht::DhtProxyClient::Listener53 Listener(OpValueCache&& c, const Sp<restbed::Request>& r)
54 : cache(std::move(c)), req(r) {}
55 };
56
57 struct PermanentPut {
58 Sp<Value> value;
59 Sp<Scheduler::Job> refreshJob;
60 Sp<std::atomic_bool> ok;
PermanentPutdht::PermanentPut61 PermanentPut(const Sp<Value>& v, Sp<Scheduler::Job>&& j, const Sp<std::atomic_bool>& o)
62 : value(v), refreshJob(std::move(j)), ok(o) {}
63 };
64
65 struct DhtProxyClient::ProxySearch {
66 SearchCache ops {};
67 Sp<Scheduler::Job> opExpirationJob {};
68 std::map<size_t, Listener> listeners {};
69 std::map<Value::Id, PermanentPut> puts {};
70 };
71
DhtProxyClient()72 DhtProxyClient::DhtProxyClient() {}
73
DhtProxyClient(std::function<void ()> signal,const std::string & serverHost,const std::string & pushClientId,const Logger & l)74 DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId, const Logger& l)
75 : DhtInterface(l), serverHost_(serverHost), pushClientId_(pushClientId), loopSignal_(signal)
76 {
77 if (serverHost_.find("://") == std::string::npos)
78 serverHost_ = proxy::HTTP_PROTO + serverHost_;
79 if (!serverHost_.empty())
80 startProxy();
81 }
82
83 void
confirmProxy()84 DhtProxyClient::confirmProxy()
85 {
86 if (serverHost_.empty()) return;
87 getConnectivityStatus();
88 }
89
90 void
startProxy()91 DhtProxyClient::startProxy()
92 {
93 if (serverHost_.empty()) return;
94 DHT_LOG.w("Staring proxy client to %s", serverHost_.c_str());
95 nextProxyConfirmation = scheduler.add(scheduler.time(), std::bind(&DhtProxyClient::confirmProxy, this));
96 listenerRestart = std::make_shared<Scheduler::Job>(std::bind(&DhtProxyClient::restartListeners, this));
97 loopSignal_();
98 }
99
~DhtProxyClient()100 DhtProxyClient::~DhtProxyClient()
101 {
102 isDestroying_ = true;
103 cancelAllOperations();
104 cancelAllListeners();
105 if (infoState_)
106 infoState_->cancel = true;
107 if (statusThread_.joinable())
108 statusThread_.join();
109 }
110
111 std::vector<Sp<Value>>
getLocal(const InfoHash & k,const Value::Filter & filter) const112 DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const {
113 std::lock_guard<std::mutex> lock(searchLock_);
114 auto s = searches_.find(k);
115 if (s == searches_.end())
116 return {};
117 return s->second.ops.get(filter);
118 }
119
120 Sp<Value>
getLocalById(const InfoHash & k,Value::Id id) const121 DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const {
122 std::lock_guard<std::mutex> lock(searchLock_);
123 auto s = searches_.find(k);
124 if (s == searches_.end())
125 return {};
126 return s->second.ops.get(id);
127 }
128
129 void
cancelAllOperations()130 DhtProxyClient::cancelAllOperations()
131 {
132 std::lock_guard<std::mutex> lock(lockOperations_);
133 auto operation = operations_.begin();
134 while (operation != operations_.end()) {
135 if (operation->thread.joinable()) {
136 // Close connection to stop operation?
137 if (operation->req) {
138 try {
139 restbed::Http::close(operation->req);
140 } catch (const std::exception& e) {
141 DHT_LOG.w("Error closing socket: %s", e.what());
142 }
143 operation->req.reset();
144 }
145 operation->thread.join();
146 operation = operations_.erase(operation);
147 } else {
148 ++operation;
149 }
150 }
151 }
152
153 void
cancelAllListeners()154 DhtProxyClient::cancelAllListeners()
155 {
156 std::lock_guard<std::mutex> lock(searchLock_);
157 DHT_LOG.w("Cancelling all listeners for %zu searches", searches_.size());
158 for (auto& s: searches_) {
159 s.second.ops.cancelAll([&](size_t token){
160 auto l = s.second.listeners.find(token);
161 if (l == s.second.listeners.end())
162 return;
163 if (l->second.thread.joinable()) {
164 // Close connection to stop listener?
165 l->second.state->cancel = true;
166 if (l->second.req) {
167 try {
168 restbed::Http::close(l->second.req);
169 } catch (const std::exception& e) {
170 DHT_LOG.w("Error closing socket: %s", e.what());
171 }
172 l->second.req.reset();
173 }
174 l->second.thread.join();
175 }
176 s.second.listeners.erase(token);
177 });
178 }
179 }
180
181 void
shutdown(ShutdownCallback cb)182 DhtProxyClient::shutdown(ShutdownCallback cb)
183 {
184 cancelAllOperations();
185 cancelAllListeners();
186 if (cb)
187 cb();
188 }
189
190 NodeStatus
getStatus(sa_family_t af) const191 DhtProxyClient::getStatus(sa_family_t af) const
192 {
193 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
194 switch (af)
195 {
196 case AF_INET:
197 return statusIpv4_;
198 case AF_INET6:
199 return statusIpv6_;
200 default:
201 return NodeStatus::Disconnected;
202 }
203 }
204
205 bool
isRunning(sa_family_t af) const206 DhtProxyClient::isRunning(sa_family_t af) const
207 {
208 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
209 switch (af)
210 {
211 case AF_INET:
212 return statusIpv4_ != NodeStatus::Disconnected;
213 case AF_INET6:
214 return statusIpv6_ != NodeStatus::Disconnected;
215 default:
216 return false;
217 }
218 }
219
220 time_point
periodic(const uint8_t *,size_t,SockAddr)221 DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr)
222 {
223 // Exec all currently stored callbacks
224 scheduler.syncTime();
225 decltype(callbacks_) callbacks;
226 {
227 std::lock_guard<std::mutex> lock(lockCallbacks);
228 callbacks = std::move(callbacks_);
229 }
230 for (auto& callback : callbacks)
231 callback();
232 callbacks.clear();
233
234 // Remove finished operations
235 {
236 std::lock_guard<std::mutex> lock(lockOperations_);
237 auto operation = operations_.begin();
238 while (operation != operations_.end()) {
239 if (*(operation->finished)) {
240 if (operation->thread.joinable()) {
241 // Close connection to stop operation?
242 if (operation->req) {
243 try {
244 restbed::Http::close(operation->req);
245 } catch (const std::exception& e) {
246 DHT_LOG.w("Error closing socket: %s", e.what());
247 }
248 operation->req.reset();
249 }
250 operation->thread.join();
251 }
252 operation = operations_.erase(operation);
253 } else {
254 ++operation;
255 }
256 }
257 }
258 return scheduler.run();
259 }
260
261 void
get(const InfoHash & key,GetCallback cb,DoneCallback donecb,Value::Filter && f,Where && w)262 DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w)
263 {
264 DHT_LOG.d(key, "[search %s]: get", key.to_c_str());
265 restbed::Uri uri(serverHost_ + "/" + key.toString());
266 auto req = std::make_shared<restbed::Request>(uri);
267 Value::Filter filter = w.empty() ? f : f.chain(w.getFilter());
268
269 auto finished = std::make_shared<std::atomic_bool>(false);
270 Operation o;
271 o.req = req;
272 o.finished = finished;
273 o.thread = std::thread([=](){
274 // Try to contact the proxy and set the status to connected when done.
275 // will change the connectivity status
276 struct GetState{ std::atomic_bool ok {true}; std::atomic_bool stop {false}; };
277 auto state = std::make_shared<GetState>();
278 try {
279 restbed::Http::async(req,
280 [=](const std::shared_ptr<restbed::Request>& req,
281 const std::shared_ptr<restbed::Response>& reply) {
282 auto code = reply->get_status_code();
283
284 if (code == 200) {
285 try {
286 while (restbed::Http::is_open(req) and not *finished and not state->stop) {
287 restbed::Http::fetch("\n", reply);
288 if (*finished or state->stop)
289 break;
290 std::string body;
291 reply->get_body(body);
292 reply->set_body(""); // Reset the body for the next fetch
293
294 std::string err;
295 Json::Value json;
296 Json::CharReaderBuilder rbuilder;
297 auto* char_data = reinterpret_cast<const char*>(&body[0]);
298 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
299 if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
300 auto value = std::make_shared<Value>(json);
301 if ((not filter or filter(*value)) and cb) {
302 {
303 std::lock_guard<std::mutex> lock(lockCallbacks);
304 callbacks_.emplace_back([cb, value, state]() {
305 if (not state->stop and not cb({value}))
306 state->stop = true;
307 });
308 }
309 loopSignal_();
310 }
311 } else {
312 state->ok = false;
313 }
314 }
315 } catch (std::runtime_error& e) { }
316 } else {
317 state->ok = false;
318 }
319 }).wait();
320 } catch(const std::exception& e) {
321 state->ok = false;
322 }
323 if (donecb) {
324 {
325 std::lock_guard<std::mutex> lock(lockCallbacks);
326 callbacks_.emplace_back([=](){
327 donecb(state->ok, {});
328 state->stop = true;
329 });
330 }
331 loopSignal_();
332 }
333 if (!state->ok) {
334 // Connection failed, update connectivity
335 opFailed();
336 }
337 *finished = true;
338 });
339 {
340 std::lock_guard<std::mutex> lock(lockOperations_);
341 operations_.emplace_back(std::move(o));
342 }
343 }
344
345 void
put(const InfoHash & key,Sp<Value> val,DoneCallback cb,time_point created,bool permanent)346 DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent)
347 {
348 DHT_LOG.d(key, "[search %s]: put", key.to_c_str());
349 scheduler.syncTime();
350 if (not val) {
351 if (cb) cb(false, {});
352 return;
353 }
354 if (val->id == Value::INVALID_ID) {
355 crypto::random_device rdev;
356 std::uniform_int_distribution<Value::Id> rand_id {};
357 val->id = rand_id(rdev);
358 }
359 if (permanent) {
360 std::lock_guard<std::mutex> lock(searchLock_);
361 auto id = val->id;
362 auto& search = searches_[key];
363 auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN;
364 auto ok = std::make_shared<std::atomic_bool>(false);
365 search.puts.erase(id);
366 search.puts.emplace(std::piecewise_construct,
367 std::forward_as_tuple(id),
368 std::forward_as_tuple(val, scheduler.add(nextRefresh, [this, key, id, ok]{
369 std::lock_guard<std::mutex> lock(searchLock_);
370 auto s = searches_.find(key);
371 if (s != searches_.end()) {
372 auto p = s->second.puts.find(id);
373 if (p != s->second.puts.end()) {
374 doPut(key, p->second.value,
375 [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
376 *ok = result;
377 }, time_point::max(), true);
378 scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
379 }
380 }
381 }), ok));
382 }
383 doPut(key, val, std::move(cb), created, permanent);
384 }
385
386 void
doPut(const InfoHash & key,Sp<Value> val,DoneCallback cb,time_point,bool permanent)387 DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent)
388 {
389 DHT_LOG.d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str());
390 restbed::Uri uri(serverHost_ + "/" + key.toString());
391 auto req = std::make_shared<restbed::Request>(uri);
392 req->set_method("POST");
393
394 auto json = val->toJson();
395 if (permanent) {
396 if (deviceKey_.empty()) {
397 json["permanent"] = true;
398 } else {
399 #ifdef OPENDHT_PUSH_NOTIFICATIONS
400 Json::Value refresh;
401 getPushRequest(refresh);
402 json["permanent"] = refresh;
403 #else
404 json["permanent"] = true;
405 #endif
406 }
407 }
408 Json::StreamWriterBuilder wbuilder;
409 wbuilder["commentStyle"] = "None";
410 wbuilder["indentation"] = "";
411 auto body = Json::writeString(wbuilder, json) + "\n";
412 req->set_body(body);
413 req->set_header("Content-Length", std::to_string(body.size()));
414
415 auto finished = std::make_shared<std::atomic_bool>(false);
416 Operation o;
417 o.req = req;
418 o.finished = finished;
419 o.thread = std::thread([=](){
420 auto ok = std::make_shared<std::atomic_bool>(true);
421 try {
422 restbed::Http::async(req,
423 [ok](const std::shared_ptr<restbed::Request>& /*req*/,
424 const std::shared_ptr<restbed::Response>& reply) {
425 auto code = reply->get_status_code();
426
427 if (code == 200) {
428 restbed::Http::fetch("\n", reply);
429 std::string body;
430 reply->get_body(body);
431 reply->set_body(""); // Reset the body for the next fetch
432
433 try {
434 std::string err;
435 Json::Value json;
436 Json::CharReaderBuilder rbuilder;
437 auto* char_data = reinterpret_cast<const char*>(&body[0]);
438 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
439 if (not reader->parse(char_data, char_data + body.size(), &json, &err))
440 *ok = false;
441 } catch (...) {
442 *ok = false;
443 }
444 } else {
445 *ok = false;
446 }
447 }).wait();
448 } catch(const std::exception& e) {
449 *ok = false;
450 }
451 if (cb) {
452 {
453 std::lock_guard<std::mutex> lock(lockCallbacks);
454 callbacks_.emplace_back([=](){
455 cb(*ok, {});
456 });
457 }
458 loopSignal_();
459 }
460 if (!ok) {
461 // Connection failed, update connectivity
462 opFailed();
463 }
464 *finished = true;
465 });
466 {
467 std::lock_guard<std::mutex> lock(lockOperations_);
468 operations_.emplace_back(std::move(o));
469 }
470 }
471
472 /**
473 * Get data currently being put at the given hash.
474 */
475 std::vector<Sp<Value>>
getPut(const InfoHash & key) const476 DhtProxyClient::getPut(const InfoHash& key) const {
477 std::vector<Sp<Value>> ret;
478 auto search = searches_.find(key);
479 if (search != searches_.end()) {
480 ret.reserve(search->second.puts.size());
481 for (const auto& put : search->second.puts)
482 ret.emplace_back(put.second.value);
483 }
484 return ret;
485 }
486
487 /**
488 * Get data currently being put at the given hash with the given id.
489 */
490 Sp<Value>
getPut(const InfoHash & key,const Value::Id & id) const491 DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const {
492 auto search = searches_.find(key);
493 if (search == searches_.end())
494 return {};
495 auto val = search->second.puts.find(id);
496 if (val == search->second.puts.end())
497 return {};
498 return val->second.value;
499 }
500
501 /**
502 * Stop any put/announce operation at the given location,
503 * for the value with the given id.
504 */
505 bool
cancelPut(const InfoHash & key,const Value::Id & id)506 DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id)
507 {
508 auto search = searches_.find(key);
509 if (search == searches_.end())
510 return false;
511 DHT_LOG.d(key, "[search %s] cancel put", key.to_c_str());
512 return search->second.puts.erase(id) > 0;
513 }
514
515 NodeStats
getNodesStats(sa_family_t af) const516 DhtProxyClient::getNodesStats(sa_family_t af) const
517 {
518 return af == AF_INET ? stats4_ : stats6_;
519 }
520
521 void
getProxyInfos()522 DhtProxyClient::getProxyInfos()
523 {
524 DHT_LOG.d("Requesting proxy server node information");
525 std::lock_guard<std::mutex> l(statusLock_);
526
527 auto infoState = std::make_shared<InfoState>();
528 if (infoState_)
529 infoState_->cancel = true;
530 infoState_ = infoState;
531
532 {
533 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
534 if (statusIpv4_ == NodeStatus::Disconnected)
535 statusIpv4_ = NodeStatus::Connecting;
536 if (statusIpv6_ == NodeStatus::Disconnected)
537 statusIpv6_ = NodeStatus::Connecting;
538 }
539
540 // A node can have a Ipv4 and a Ipv6. So, we need to retrieve all public ips
541 auto serverHost = serverHost_;
542
543 // Try to contact the proxy and set the status to connected when done.
544 // will change the connectivity status
545 if (statusThread_.joinable()) {
546 try {
547 statusThread_.detach();
548 statusThread_ = {};
549 } catch (const std::exception& e) {
550 DHT_LOG.e("Error detaching thread: %s", e.what());
551 }
552 }
553 statusThread_ = std::thread([this, serverHost, infoState]{
554 try {
555 auto endpointStr = serverHost;
556 auto protocol = std::string(proxy::HTTP_PROTO);
557 auto protocolIdx = serverHost.find("://");
558 if (protocolIdx != std::string::npos) {
559 protocol = endpointStr.substr(0, protocolIdx + 3);
560 endpointStr = endpointStr.substr(protocolIdx + 3);
561 }
562 auto hostAndService = splitPort(endpointStr);
563 auto resolved_proxies = SockAddr::resolve(hostAndService.first, hostAndService.second);
564 std::vector<std::future<Sp<restbed::Response>>> reqs;
565 reqs.reserve(resolved_proxies.size());
566 for (const auto& resolved_proxy: resolved_proxies) {
567 auto server = resolved_proxy.toString();
568 if (resolved_proxy.getFamily() == AF_INET6) {
569 // HACK restbed seems to not correctly handle directly http://[ipv6]
570 // See https://github.com/Corvusoft/restbed/issues/290.
571 server = endpointStr;
572 }
573 restbed::Uri uri(protocol + server + "/");
574 auto req = std::make_shared<restbed::Request>(uri);
575 if (infoState->cancel)
576 return;
577 reqs.emplace_back(restbed::Http::async(req,
578 [this, resolved_proxy, infoState](
579 const std::shared_ptr<restbed::Request>&,
580 const std::shared_ptr<restbed::Response>& reply)
581 {
582 auto code = reply->get_status_code();
583 Json::Value proxyInfos;
584 if (code == 200) {
585 restbed::Http::fetch("\n", reply);
586 auto& state = *infoState;
587 if (state.cancel) return;
588 std::string body;
589 reply->get_body(body);
590
591 std::string err;
592 Json::CharReaderBuilder rbuilder;
593 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
594 try {
595 reader->parse(body.data(), body.data() + body.size(), &proxyInfos, &err);
596 } catch (...) {
597 return;
598 }
599 auto family = resolved_proxy.getFamily();
600 if (family == AF_INET) state.ipv4++;
601 else if (family == AF_INET6) state.ipv6++;
602 if (not state.cancel)
603 onProxyInfos(proxyInfos, family);
604 }
605 }));
606 }
607 for (auto& r : reqs)
608 r.get();
609 reqs.clear();
610 } catch (const std::exception& e) {
611 DHT_LOG.e("Error sending proxy info request: %s", e.what());
612 }
613 const auto& state = *infoState;
614 if (state.cancel) return;
615 if (state.ipv4 == 0) onProxyInfos(Json::Value{}, AF_INET);
616 if (state.ipv6 == 0) onProxyInfos(Json::Value{}, AF_INET6);
617 });
618 }
619
620 void
onProxyInfos(const Json::Value & proxyInfos,sa_family_t family)621 DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, sa_family_t family)
622 {
623 if (isDestroying_)
624 return;
625 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
626 auto oldStatus = std::max(statusIpv4_, statusIpv6_);
627 auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_;
628 if (not proxyInfos.isMember("node_id")) {
629 DHT_LOG.e("Proxy info request failed for %s", family == AF_INET ? "IPv4" : "IPv6");
630 status = NodeStatus::Disconnected;
631 } else {
632 DHT_LOG.d("Got proxy reply for %s", family == AF_INET ? "IPv4" : "IPv6");
633 try {
634 myid = InfoHash(proxyInfos["node_id"].asString());
635 stats4_ = NodeStats(proxyInfos["ipv4"]);
636 stats6_ = NodeStats(proxyInfos["ipv6"]);
637 if (stats4_.good_nodes + stats6_.good_nodes)
638 status = NodeStatus::Connected;
639 else if (stats4_.dubious_nodes + stats6_.dubious_nodes)
640 status = NodeStatus::Connecting;
641 else
642 status = NodeStatus::Disconnected;
643
644 auto publicIp = parsePublicAddress(proxyInfos["public_ip"]);
645 auto publicFamily = publicIp.getFamily();
646 if (publicFamily == AF_INET)
647 publicAddressV4_ = publicIp;
648 else if (publicFamily == AF_INET6)
649 publicAddressV6_ = publicIp;
650 } catch (const std::exception& e) {
651 DHT_LOG.w("Error processing proxy infos: %s", e.what());
652 }
653 }
654
655 auto newStatus = std::max(statusIpv4_, statusIpv6_);
656 if (newStatus == NodeStatus::Connected) {
657 if (oldStatus == NodeStatus::Disconnected || oldStatus == NodeStatus::Connecting) {
658 scheduler.edit(listenerRestart, scheduler.time());
659 }
660 scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(15));
661 }
662 else if (newStatus == NodeStatus::Disconnected) {
663 scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(1));
664 }
665 loopSignal_();
666 }
667
668 SockAddr
parsePublicAddress(const Json::Value & val)669 DhtProxyClient::parsePublicAddress(const Json::Value& val)
670 {
671 auto public_ip = val.asString();
672 auto hostAndService = splitPort(public_ip);
673 auto sa = SockAddr::resolve(hostAndService.first);
674 if (sa.empty()) return {};
675 return sa.front().getMappedIPv4();
676 }
677
678 std::vector<SockAddr>
getPublicAddress(sa_family_t family)679 DhtProxyClient::getPublicAddress(sa_family_t family)
680 {
681 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
682 std::vector<SockAddr> result;
683 if (publicAddressV6_ && family != AF_INET) result.emplace_back(publicAddressV6_);
684 if (publicAddressV4_ && family != AF_INET6) result.emplace_back(publicAddressV4_);
685 return result;
686 }
687
688 size_t
listen(const InfoHash & key,ValueCallback cb,Value::Filter filter,Where where)689 DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) {
690 DHT_LOG.d(key, "[search %s]: listen", key.to_c_str());
691 auto& search = searches_[key];
692 auto query = std::make_shared<Query>(Select{}, std::move(where));
693 auto token = search.ops.listen(cb, query, filter, [this, key](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t {
694 scheduler.syncTime();
695 restbed::Uri uri(serverHost_ + "/" + key.toString());
696 std::lock_guard<std::mutex> lock(searchLock_);
697 // Find search
698 auto search = searches_.find(key);
699 if (search == searches_.end()) {
700 DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str());
701 return 0;
702 }
703 DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe");
704
705 // Add listener
706 auto req = std::make_shared<restbed::Request>(uri);
707 auto token = ++listenerToken_;
708 auto l = search->second.listeners.find(token);
709 if (l == search->second.listeners.end()) {
710 l = search->second.listeners.emplace(std::piecewise_construct,
711 std::forward_as_tuple(token),
712 std::forward_as_tuple(std::move(cb), req)).first;
713 } else {
714 if (l->second.state)
715 l->second.state->cancel = true;
716 l->second.req = req;
717 }
718
719 // Add callback
720 auto state = std::make_shared<ListenState>();
721 l->second.state = state;
722 l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
723 if (state->cancel)
724 return false;
725 std::lock_guard<std::mutex> lock(searchLock_);
726 auto s = searches_.find(key);
727 if (s != searches_.end()) {
728 auto l = s->second.listeners.find(token);
729 if (l != s->second.listeners.end()) {
730 return l->second.cache.onValue(values, expired);
731 }
732 }
733 return false;
734 };
735
736 auto vcb = l->second.cb;
737
738 if (not deviceKey_.empty()) {
739 // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason)
740 l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] {
741 if (state->cancel)
742 return;
743 std::lock_guard<std::mutex> lock(searchLock_);
744 auto s = searches_.find(key);
745 if (s != searches_.end()) {
746 auto l = s->second.listeners.find(token);
747 if (l != s->second.listeners.end()) {
748 resubscribe(key, l->second);
749 }
750 }
751 });
752 }
753
754 // Send listen to servers
755 l->second.thread = std::thread([this, req, vcb, state]() {
756 sendListen(req, vcb, state,
757 deviceKey_.empty() ? ListenMethod::LISTEN : ListenMethod::SUBSCRIBE);
758 });
759 return token;
760 });
761 return token;
762 }
763
764 bool
cancelListen(const InfoHash & key,size_t gtoken)765 DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) {
766 scheduler.syncTime();
767 DHT_LOG.d(key, "[search %s]: cancelListen %zu", key.to_c_str(), gtoken);
768 auto it = searches_.find(key);
769 if (it == searches_.end())
770 return false;
771 auto& ops = it->second.ops;
772 bool canceled = ops.cancelListen(gtoken, scheduler.time());
773 if (not it->second.opExpirationJob) {
774 it->second.opExpirationJob = scheduler.add(time_point::max(), [this,key](){
775 auto it = searches_.find(key);
776 if (it != searches_.end()) {
777 auto next = it->second.ops.expire(scheduler.time(), [this,key](size_t ltoken){
778 doCancelListen(key, ltoken);
779 });
780 if (next != time_point::max()) {
781 scheduler.edit(it->second.opExpirationJob, next);
782 }
783 }
784 });
785 }
786 scheduler.edit(it->second.opExpirationJob, ops.getExpiration());
787 loopSignal_();
788 return canceled;
789 }
790
sendListen(const std::shared_ptr<restbed::Request> & req,const ValueCallback & cb,const Sp<ListenState> & state,ListenMethod method)791 void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
792 const ValueCallback &cb,
793 const Sp<ListenState> &state,
794 ListenMethod method) {
795 auto settings = std::make_shared<restbed::Settings>();
796 if (method != ListenMethod::LISTEN) {
797 req->set_method("SUBSCRIBE");
798 } else {
799 std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
800 settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
801 req->set_method("LISTEN");
802 }
803 try {
804 #ifdef OPENDHT_PUSH_NOTIFICATIONS
805 if (method != ListenMethod::LISTEN)
806 fillBody(req, method == ListenMethod::RESUBSCRIBE);
807 #endif
808 restbed::Http::async(req,
809 [this, cb, state](const std::shared_ptr<restbed::Request>& req,
810 const std::shared_ptr<restbed::Response>& reply)
811 {
812 auto code = reply->get_status_code();
813 if (code == 200) {
814 try {
815 while (restbed::Http::is_open(req) and not state->cancel) {
816 restbed::Http::fetch("\n", reply);
817 if (state->cancel)
818 break;
819 std::string body;
820 reply->get_body(body);
821 reply->set_body(""); // Reset the body for the next fetch
822
823 Json::Value json;
824 std::string err;
825 Json::CharReaderBuilder rbuilder;
826 auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
827 if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
828 if (json.size() == 0) {
829 // Empty value, it's the end
830 break;
831 }
832 auto expired = json.get("expired", Json::Value(false)).asBool();
833 auto value = std::make_shared<Value>(json);
834 if (cb) {
835 {
836 std::lock_guard<std::mutex> lock(lockCallbacks);
837 callbacks_.emplace_back([cb, value, state, expired]() {
838 if (not state->cancel and not cb({value}, expired))
839 state->cancel = true;
840 });
841 }
842 loopSignal_();
843 }
844 }
845 }
846 } catch (const std::exception& e) {
847 if (not state->cancel) {
848 DHT_LOG.w("Listen closed by the proxy server: %s", e.what());
849 state->ok = false;
850 }
851 }
852 } else {
853 state->ok = false;
854 }
855 }, settings).get();
856 } catch (const std::exception& e) {
857 state->ok = false;
858 }
859 auto& s = *state;
860 if (not s.ok and not s.cancel)
861 opFailed();
862 }
863
864 bool
doCancelListen(const InfoHash & key,size_t ltoken)865 DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
866 {
867 std::lock_guard<std::mutex> lock(searchLock_);
868
869 auto search = searches_.find(key);
870 if (search == searches_.end())
871 return false;
872
873 auto it = search->second.listeners.find(ltoken);
874 if (it == search->second.listeners.end())
875 return false;
876
877 DHT_LOG.d(key, "[search %s] cancel listen", key.to_c_str());
878
879 auto& listener = it->second;
880 listener.state->cancel = true;
881 if (not deviceKey_.empty()) {
882 // First, be sure to have a token
883 if (listener.thread.joinable()) {
884 listener.thread.join();
885 }
886 // UNSUBSCRIBE
887 restbed::Uri uri(serverHost_ + "/" + key.toString());
888 auto req = std::make_shared<restbed::Request>(uri);
889 req->set_method("UNSUBSCRIBE");
890 // fill request body
891 Json::Value body;
892 body["key"] = deviceKey_;
893 body["client_id"] = pushClientId_;
894 Json::StreamWriterBuilder wbuilder;
895 wbuilder["commentStyle"] = "None";
896 wbuilder["indentation"] = "";
897 auto content = Json::writeString(wbuilder, body) + "\n";
898 std::replace(content.begin(), content.end(), '\n', ' ');
899 req->set_body(content);
900 req->set_header("Content-Length", std::to_string(content.size()));
901 try {
902 restbed::Http::async(req, [](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>&){});
903 } catch (const std::exception& e) {
904 DHT_LOG.w(key, "[search %s] cancelListen: Http::async failed: %s", key.to_c_str(), e.what());
905 }
906 } else {
907 // Just stop the request
908 if (listener.thread.joinable()) {
909 // Close connection to stop listener
910 if (listener.req) {
911 try {
912 restbed::Http::close(listener.req);
913 } catch (const std::exception& e) {
914 DHT_LOG.w("Error closing socket: %s", e.what());
915 }
916 listener.req.reset();
917 }
918 listener.thread.join();
919 }
920 }
921 search->second.listeners.erase(it);
922 DHT_LOG.d(key, "[search %s] cancelListen: %zu listener remaining", key.to_c_str(), search->second.listeners.size());
923 if (search->second.listeners.empty()) {
924 searches_.erase(search);
925 }
926
927 return true;
928 }
929
930 void
opFailed()931 DhtProxyClient::opFailed()
932 {
933 DHT_LOG.e("Proxy request failed");
934 {
935 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
936 statusIpv4_ = NodeStatus::Disconnected;
937 statusIpv6_ = NodeStatus::Disconnected;
938 }
939 getConnectivityStatus();
940 loopSignal_();
941 }
942
943 void
getConnectivityStatus()944 DhtProxyClient::getConnectivityStatus()
945 {
946 if (!isDestroying_) getProxyInfos();
947 }
948
949 void
restartListeners()950 DhtProxyClient::restartListeners()
951 {
952 if (isDestroying_) return;
953 std::lock_guard<std::mutex> lock(searchLock_);
954 DHT_LOG.d("Refresh permanent puts");
955 for (auto& search : searches_) {
956 for (auto& put : search.second.puts) {
957 if (!*put.second.ok) {
958 auto ok = put.second.ok;
959 doPut(search.first, put.second.value,
960 [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
961 *ok = result;
962 }, time_point::max(), true);
963 scheduler.edit(put.second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
964 }
965 }
966 }
967 if (not deviceKey_.empty()) {
968 DHT_LOG.d("resubscribe due to a connectivity change");
969 // Connectivity changed, refresh all subscribe
970 for (auto& search : searches_)
971 for (auto& listener : search.second.listeners)
972 if (!listener.second.state->ok)
973 resubscribe(search.first, listener.second);
974 return;
975 }
976 DHT_LOG.d("Restarting listeners");
977 for (auto& search: searches_) {
978 for (auto& l: search.second.listeners) {
979 auto& listener = l.second;
980 if (auto state = listener.state)
981 state->cancel = true;
982 if (listener.req) {
983 try {
984 restbed::Http::close(listener.req);
985 } catch (const std::exception& e) {
986 DHT_LOG.w("Error closing socket: %s", e.what());
987 }
988 listener.req.reset();
989 }
990 }
991 }
992 for (auto& search: searches_) {
993 for (auto& l: search.second.listeners) {
994 auto& listener = l.second;
995 auto state = listener.state;
996 if (listener.thread.joinable()) {
997 listener.thread.join();
998 }
999 // Redo listen
1000 state->cancel = false;
1001 state->ok = true;
1002 auto cb = listener.cb;
1003 restbed::Uri uri(serverHost_ + "/" + search.first.toString());
1004 auto req = std::make_shared<restbed::Request>(uri);
1005 req->set_method("LISTEN");
1006 listener.req = req;
1007 listener.thread = std::thread([this, req, cb, state]() {
1008 sendListen(req, cb, state);
1009 });
1010 }
1011 }
1012 }
1013
1014 void
pushNotificationReceived(const std::map<std::string,std::string> & notification)1015 DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification)
1016 {
1017 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1018 scheduler.syncTime();
1019 {
1020 // If a push notification is received, the proxy is up and running
1021 std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
1022 statusIpv4_ = NodeStatus::Connected;
1023 statusIpv6_ = NodeStatus::Connected;
1024 }
1025 try {
1026 std::lock_guard<std::mutex> lock(searchLock_);
1027 auto timeout = notification.find("timeout");
1028 if (timeout != notification.cend()) {
1029 InfoHash key(timeout->second);
1030 auto& search = searches_.at(key);
1031 auto vidIt = notification.find("vid");
1032 if (vidIt != notification.end()) {
1033 // Refresh put
1034 auto vid = std::stoull(vidIt->second);
1035 auto& put = search.puts.at(vid);
1036 scheduler.edit(put.refreshJob, scheduler.time());
1037 loopSignal_();
1038 } else {
1039 // Refresh listen
1040 for (auto& list : search.listeners)
1041 resubscribe(key, list.second);
1042 }
1043 } else {
1044 auto key = InfoHash(notification.at("key"));
1045 auto& search = searches_.at(key);
1046 for (auto& list : search.listeners) {
1047 if (list.second.state->cancel)
1048 continue;
1049 DHT_LOG.d(key, "[search %s] handling push notification", key.to_c_str());
1050 auto expired = notification.find("exp");
1051 auto token = list.first;
1052 auto state = list.second.state;
1053 if (expired == notification.end()) {
1054 auto cb = list.second.cb;
1055 auto oldValues = list.second.cache.getValues();
1056 get(key, [cb](const std::vector<Sp<Value>>& vals) {
1057 return cb(vals, false);
1058 }, [cb, oldValues](bool /*ok*/) {
1059 // Decrement old values refcount to expire values not present in the new list
1060 cb(oldValues, true);
1061 });
1062 } else {
1063 std::stringstream ss(expired->second);
1064 std::vector<Value::Id> ids;
1065 while(ss.good()){
1066 std::string substr;
1067 getline(ss, substr, ',');
1068 ids.emplace_back(std::stoull(substr));
1069 }
1070 {
1071 std::lock_guard<std::mutex> lock(lockCallbacks);
1072 callbacks_.emplace_back([this, key, token, state, ids]() {
1073 if (state->cancel) return;
1074 std::lock_guard<std::mutex> lock(searchLock_);
1075 auto s = searches_.find(key);
1076 if (s == searches_.end()) return;
1077 auto l = s->second.listeners.find(token);
1078 if (l == s->second.listeners.end()) return;
1079 if (not state->cancel and not l->second.cache.onValuesExpired(ids))
1080 state->cancel = true;
1081 });
1082 }
1083 loopSignal_();
1084 }
1085 }
1086 }
1087 } catch (const std::exception& e) {
1088 DHT_LOG.e("Error handling push notification: %s", e.what());
1089 }
1090 #else
1091 (void) notification;
1092 #endif
1093 }
1094
1095 void
resubscribe(const InfoHash & key,Listener & listener)1096 DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
1097 {
1098 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1099 if (deviceKey_.empty()) return;
1100 scheduler.syncTime();
1101 DHT_LOG.d(key, "[search %s] resubscribe push listener", key.to_c_str());
1102 // Subscribe
1103 auto state = listener.state;
1104 if (listener.thread.joinable()) {
1105 state->cancel = true;
1106 if (listener.req) {
1107 try {
1108 restbed::Http::close(listener.req);
1109 } catch (const std::exception& e) {
1110 DHT_LOG.w("Error closing socket: %s", e.what());
1111 }
1112 listener.req.reset();
1113 }
1114 listener.thread.join();
1115 }
1116 state->cancel = false;
1117 state->ok = true;
1118 auto req = std::make_shared<restbed::Request>(restbed::Uri {serverHost_ + "/" + key.toString()});
1119 req->set_method("SUBSCRIBE");
1120 listener.req = req;
1121 scheduler.edit(listener.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
1122 auto vcb = listener.cb;
1123 listener.thread = std::thread([this, req, vcb, state]() {
1124 sendListen(req, vcb, state, ListenMethod::RESUBSCRIBE);
1125 });
1126 #else
1127 (void) key;
1128 (void) listener;
1129 #endif
1130 }
1131
1132 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1133 void
getPushRequest(Json::Value & body) const1134 DhtProxyClient::getPushRequest(Json::Value& body) const
1135 {
1136 body["key"] = deviceKey_;
1137 body["client_id"] = pushClientId_;
1138 #ifdef __ANDROID__
1139 body["platform"] = "android";
1140 #endif
1141 #ifdef __APPLE__
1142 body["platform"] = "apple";
1143 #endif
1144 }
1145
1146 void
fillBody(std::shared_ptr<restbed::Request> req,bool resubscribe)1147 DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe)
1148 {
1149 // Fill body with
1150 // {
1151 // "key":"device_key",
1152 // }
1153 Json::Value body;
1154 getPushRequest(body);
1155 if (resubscribe) {
1156 // This is the first listen, we want to retrieve previous values.
1157 body["refresh"] = true;
1158 }
1159 Json::StreamWriterBuilder wbuilder;
1160 wbuilder["commentStyle"] = "None";
1161 wbuilder["indentation"] = "";
1162 auto content = Json::writeString(wbuilder, body) + "\n";
1163 std::replace(content.begin(), content.end(), '\n', ' ');
1164 req->set_body(content);
1165 req->set_header("Content-Length", std::to_string(content.size()));
1166 }
1167 #endif // OPENDHT_PUSH_NOTIFICATIONS
1168
1169 } // namespace dht
1170