1 /*
2 * Copyright (C) 2014-2019 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21 #include "dhtrunner.h"
22 #include "securedht.h"
23 #include "peer_discovery.h"
24 #include "network_utils.h"
25
26 #ifdef OPENDHT_PROXY_CLIENT
27 #include "dht_proxy_client.h"
28 #endif
29
30 #ifdef _WIN32
31 #include <cstring>
32 #define close(x) closesocket(x)
33 #define write(s, b, f) send(s, b, (int)strlen(b), 0)
34 #endif
35
36 namespace dht {
37
38 constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD;
39 static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16;
40 static constexpr std::chrono::milliseconds RX_QUEUE_MAX_DELAY(500);
41 static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht";
42
43 struct DhtRunner::Listener {
44 size_t tokenClassicDht {0};
45 size_t tokenProxyDht {0};
46 ValueCallback gcb;
47 InfoHash hash {};
48 Value::Filter f;
49 Where w;
50 };
51
52 struct NodeInsertionPack {
53 dht::InfoHash nodeId;
54 in_port_t port;
55 dht::NetId net;
56 MSGPACK_DEFINE(nodeId, port, net)
57 };
58
DhtRunner()59 DhtRunner::DhtRunner() : dht_()
60 #ifdef OPENDHT_PROXY_CLIENT
61 , dht_via_proxy_()
62 #endif //OPENDHT_PROXY_CLIENT
63 {
64 #ifdef _WIN32
65 WSADATA wsd;
66 if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
67 throw DhtException("Can't initialize Winsock2");
68 #endif
69 }
70
~DhtRunner()71 DhtRunner::~DhtRunner()
72 {
73 join();
74 #ifdef _WIN32
75 WSACleanup();
76 #endif
77 }
78
79 void
run(in_port_t port,const Config & config,Context && context)80 DhtRunner::run(in_port_t port, const Config& config, Context&& context)
81 {
82 SockAddr sin4;
83 sin4.setFamily(AF_INET);
84 sin4.setPort(port);
85 SockAddr sin6;
86 sin6.setFamily(AF_INET6);
87 sin6.setPort(port);
88 run(sin4, sin6, config, std::move(context));
89 }
90
91 void
run(const char * ip4,const char * ip6,const char * service,const Config & config,Context && context)92 DhtRunner::run(const char* ip4, const char* ip6, const char* service, const Config& config, Context&& context)
93 {
94 auto res4 = SockAddr::resolve(ip4, service);
95 auto res6 = SockAddr::resolve(ip6, service);
96 run(res4.empty() ? SockAddr() : res4.front(),
97 res6.empty() ? SockAddr() : res6.front(), config, std::move(context));
98 }
99
100 void
run(const SockAddr & local4,const SockAddr & local6,const Config & config,Context && context)101 DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context)
102 {
103 if (not running) {
104 if (not context.sock)
105 context.sock.reset(new net::UdpSocket(local4, local6, context.logger ? *context.logger : Logger{}));
106 run(config, std::move(context));
107 }
108 }
109
110 void
run(const Config & config,Context && context)111 DhtRunner::run(const Config& config, Context&& context)
112 {
113 std::lock_guard<std::mutex> lck(dht_mtx);
114 if (running)
115 return;
116
117 context.sock->setOnReceive([&] (std::unique_ptr<net::ReceivedPacket>&& pkt) {
118 {
119 std::lock_guard<std::mutex> lck(sock_mtx);
120 if (rcv.size() >= RX_QUEUE_MAX_SIZE) {
121 std::cerr << "Dropping packet: queue is full!" << std::endl;
122 rcv.pop();
123 }
124 rcv.emplace(std::move(pkt));
125 }
126 cv.notify_all();
127 });
128
129 auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config)));
130 dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config));
131
132 #ifdef OPENDHT_PROXY_CLIENT
133 config_ = config;
134 #endif
135 enableProxy(not config.proxy_server.empty());
136 if (context.logger and dht_via_proxy_) {
137 dht_via_proxy_->setLogger(*context.logger);
138 }
139 if (context.statusChangedCallback) {
140 statusCb = std::move(context.statusChangedCallback);
141 }
142 if (context.certificateStore) {
143 dht_->setLocalCertificateStore(std::move(context.certificateStore));
144 if (dht_via_proxy_)
145 dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore));
146 }
147
148 running = true;
149 if (not config.threaded)
150 return;
151 dht_thread = std::thread([this]() {
152 while (running) {
153 std::unique_lock<std::mutex> lk(dht_mtx);
154 time_point wakeup = loop_();
155
156 auto hasJobToDo = [this]() {
157 if (not running)
158 return true;
159 {
160 std::lock_guard<std::mutex> lck(sock_mtx);
161 if (not rcv.empty())
162 return true;
163 }
164 {
165 std::lock_guard<std::mutex> lck(storage_mtx);
166 if (not pending_ops_prio.empty())
167 return true;
168 auto s = getStatus();
169 if (not pending_ops.empty() and (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping)))
170 return true;
171 }
172 return false;
173 };
174 if (wakeup == time_point::max())
175 cv.wait(lk, hasJobToDo);
176 else
177 cv.wait_until(lk, wakeup, hasJobToDo);
178 }
179 });
180
181 if (config.peer_discovery or config.peer_publish) {
182 peerDiscovery_ = context.peerDiscovery ?
183 std::move(context.peerDiscovery) :
184 std::make_shared<PeerDiscovery>();
185 }
186
187 auto netId = config.dht_config.node_config.network;
188 if (config.peer_discovery) {
189 peerDiscovery_->startDiscovery<NodeInsertionPack>(PEER_DISCOVERY_DHT_SERVICE, [this, netId](NodeInsertionPack&& v, SockAddr&& addr){
190 addr.setPort(v.port);
191 if (v.nodeId != dht_->getNodeId() && netId == v.net){
192 bootstrap(v.nodeId, addr);
193 }
194 });
195 }
196 if (config.peer_publish) {
197 msgpack::sbuffer sbuf_node;
198 NodeInsertionPack adc;
199 adc.net = netId;
200 adc.nodeId = dht_->getNodeId();
201 // IPv4
202 if (auto bound4 = dht_->getSocket()->getBound(AF_INET)) {
203 adc.port = bound4.getPort();
204 msgpack::pack(sbuf_node, adc);
205 peerDiscovery_->startPublish(AF_INET, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
206 }
207 // IPv6
208 if (auto bound6 = dht_->getSocket()->getBound(AF_INET6)) {
209 adc.port = bound6.getPort();
210 sbuf_node.clear();
211 msgpack::pack(sbuf_node, adc);
212 peerDiscovery_->startPublish(AF_INET6, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
213 }
214 }
215 }
216
217 void
shutdown(ShutdownCallback cb)218 DhtRunner::shutdown(ShutdownCallback cb) {
219 if (not running) {
220 cb();
221 return;
222 }
223 std::lock_guard<std::mutex> lck(storage_mtx);
224 pending_ops_prio.emplace([=](SecureDht&) mutable {
225 #ifdef OPENDHT_PROXY_CLIENT
226 if (dht_via_proxy_)
227 dht_via_proxy_->shutdown(cb);
228 #endif
229 if (dht_)
230 dht_->shutdown(cb);
231 });
232 cv.notify_all();
233 }
234
235 void
join()236 DhtRunner::join()
237 {
238 if (peerDiscovery_)
239 peerDiscovery_->stop();
240
241 {
242 std::lock_guard<std::mutex> lck(dht_mtx);
243 running = false;
244 cv.notify_all();
245 bootstrap_cv.notify_all();
246 if (dht_)
247 if (auto sock = dht_->getSocket())
248 sock->stop();
249 }
250
251 if (dht_thread.joinable())
252 dht_thread.join();
253
254 if (bootstrap_thread.joinable())
255 bootstrap_thread.join();
256
257 if (peerDiscovery_) {
258 peerDiscovery_->join();
259 }
260
261 {
262 std::lock_guard<std::mutex> lck(storage_mtx);
263 pending_ops = decltype(pending_ops)();
264 pending_ops_prio = decltype(pending_ops_prio)();
265 }
266 {
267 std::lock_guard<std::mutex> lck(dht_mtx);
268 resetDht();
269 status4 = NodeStatus::Disconnected;
270 status6 = NodeStatus::Disconnected;
271 }
272 }
273
274 SockAddr
getBound(sa_family_t af) const275 DhtRunner::getBound(sa_family_t af) const {
276 std::lock_guard<std::mutex> lck(dht_mtx);
277 if (dht_)
278 if (auto sock = dht_->getSocket())
279 return sock->getBound(af);
280 return SockAddr{};
281 }
282
283 void
dumpTables() const284 DhtRunner::dumpTables() const
285 {
286 std::lock_guard<std::mutex> lck(dht_mtx);
287 activeDht()->dumpTables();
288 }
289
290 InfoHash
getId() const291 DhtRunner::getId() const
292 {
293 if (auto dht = activeDht())
294 return dht->getId();
295 return {};
296 }
297
298 InfoHash
getNodeId() const299 DhtRunner::getNodeId() const
300 {
301 if (auto dht = activeDht())
302 return dht->getNodeId();
303 return {};
304 }
305
306
307 std::pair<size_t, size_t>
getStoreSize() const308 DhtRunner::getStoreSize() const {
309 std::lock_guard<std::mutex> lck(dht_mtx);
310 if (!dht_)
311 return {};
312 return dht_->getStoreSize();
313 }
314
315 void
setStorageLimit(size_t limit)316 DhtRunner::setStorageLimit(size_t limit) {
317 std::lock_guard<std::mutex> lck(dht_mtx);
318 if (!dht_)
319 throw std::runtime_error("dht is not running");
320 return dht_->setStorageLimit(limit);
321 }
322
323 std::vector<NodeExport>
exportNodes() const324 DhtRunner::exportNodes() const {
325 std::lock_guard<std::mutex> lck(dht_mtx);
326 if (!dht_)
327 return {};
328 return dht_->exportNodes();
329 }
330
331 std::vector<ValuesExport>
exportValues() const332 DhtRunner::exportValues() const {
333 std::lock_guard<std::mutex> lck(dht_mtx);
334 if (!dht_)
335 return {};
336 return dht_->exportValues();
337 }
338
339 void
setLogger(const Logger & logger)340 DhtRunner::setLogger(const Logger& logger) {
341 std::lock_guard<std::mutex> lck(dht_mtx);
342 if (dht_)
343 dht_->setLogger(logger);
344 #ifdef OPENDHT_PROXY_CLIENT
345 if (dht_via_proxy_)
346 dht_via_proxy_->setLogger(logger);
347 #endif
348 }
349
350 void
setLoggers(LogMethod error,LogMethod warn,LogMethod debug)351 DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) {
352 Logger logger {std::move(error), std::move(warn), std::move(debug)};
353 setLogger(logger);
354 }
355
356 void
setLogFilter(const InfoHash & f)357 DhtRunner::setLogFilter(const InfoHash& f) {
358 std::lock_guard<std::mutex> lck(dht_mtx);
359 activeDht()->setLogFilter(f);
360 if (dht_)
361 dht_->setLogFilter(f);
362 #ifdef OPENDHT_PROXY_CLIENT
363 if (dht_via_proxy_)
364 dht_via_proxy_->setLogFilter(f);
365 #endif
366 }
367
368 void
registerType(const ValueType & type)369 DhtRunner::registerType(const ValueType& type) {
370 std::lock_guard<std::mutex> lck(dht_mtx);
371 activeDht()->registerType(type);
372 }
373
374 void
importValues(const std::vector<ValuesExport> & values)375 DhtRunner::importValues(const std::vector<ValuesExport>& values) {
376 std::lock_guard<std::mutex> lck(dht_mtx);
377 dht_->importValues(values);
378 }
379
380 unsigned
getNodesStats(sa_family_t af,unsigned * good_return,unsigned * dubious_return,unsigned * cached_return,unsigned * incoming_return) const381 DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
382 {
383 std::lock_guard<std::mutex> lck(dht_mtx);
384 const auto stats = activeDht()->getNodesStats(af);
385 if (good_return)
386 *good_return = stats.good_nodes;
387 if (dubious_return)
388 *dubious_return = stats.dubious_nodes;
389 if (cached_return)
390 *cached_return = stats.cached_nodes;
391 if (incoming_return)
392 *incoming_return = stats.incoming_nodes;
393 return stats.good_nodes + stats.dubious_nodes;
394 }
395
396 NodeStats
getNodesStats(sa_family_t af) const397 DhtRunner::getNodesStats(sa_family_t af) const
398 {
399 std::lock_guard<std::mutex> lck(dht_mtx);
400 return activeDht()->getNodesStats(af);
401 }
402
403 NodeInfo
getNodeInfo() const404 DhtRunner::getNodeInfo() const {
405 std::lock_guard<std::mutex> lck(dht_mtx);
406 NodeInfo info;
407 info.id = getId();
408 info.node_id = getNodeId();
409 info.ipv4 = dht_->getNodesStats(AF_INET);
410 info.ipv6 = dht_->getNodesStats(AF_INET6);
411 return info;
412 }
413
414 std::vector<unsigned>
getNodeMessageStats(bool in) const415 DhtRunner::getNodeMessageStats(bool in) const
416 {
417 std::lock_guard<std::mutex> lck(dht_mtx);
418 return activeDht()->getNodeMessageStats(in);
419 }
420
421 std::string
getStorageLog() const422 DhtRunner::getStorageLog() const
423 {
424 std::lock_guard<std::mutex> lck(dht_mtx);
425 return activeDht()->getStorageLog();
426 }
427 std::string
getStorageLog(const InfoHash & f) const428 DhtRunner::getStorageLog(const InfoHash& f) const
429 {
430 std::lock_guard<std::mutex> lck(dht_mtx);
431 return activeDht()->getStorageLog(f);
432 }
433 std::string
getRoutingTablesLog(sa_family_t af) const434 DhtRunner::getRoutingTablesLog(sa_family_t af) const
435 {
436 std::lock_guard<std::mutex> lck(dht_mtx);
437 return activeDht()->getRoutingTablesLog(af);
438 }
439 std::string
getSearchesLog(sa_family_t af) const440 DhtRunner::getSearchesLog(sa_family_t af) const
441 {
442 std::lock_guard<std::mutex> lck(dht_mtx);
443 return activeDht()->getSearchesLog(af);
444 }
445 std::string
getSearchLog(const InfoHash & f,sa_family_t af) const446 DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const
447 {
448 std::lock_guard<std::mutex> lck(dht_mtx);
449 return activeDht()->getSearchLog(f, af);
450 }
451 std::vector<SockAddr>
getPublicAddress(sa_family_t af)452 DhtRunner::getPublicAddress(sa_family_t af)
453 {
454 std::lock_guard<std::mutex> lck(dht_mtx);
455 if (auto dht = activeDht())
456 return dht->getPublicAddress(af);
457 return {};
458 }
459 std::vector<std::string>
getPublicAddressStr(sa_family_t af)460 DhtRunner::getPublicAddressStr(sa_family_t af)
461 {
462 auto addrs = getPublicAddress(af);
463 std::vector<std::string> ret(addrs.size());
464 std::transform(addrs.begin(), addrs.end(), ret.begin(), [](const SockAddr& a) { return a.toString(); });
465 return ret;
466 }
467
468 void
registerCertificate(std::shared_ptr<crypto::Certificate> cert)469 DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) {
470 std::lock_guard<std::mutex> lck(dht_mtx);
471 activeDht()->registerCertificate(cert);
472 }
473 void
setLocalCertificateStore(CertificateStoreQuery && query_method)474 DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) {
475 std::lock_guard<std::mutex> lck(dht_mtx);
476 #ifdef OPENDHT_PROXY_CLIENT
477 if (dht_via_proxy_)
478 dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
479 #endif
480 if (dht_)
481 dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
482 }
483
484 time_point
loop_()485 DhtRunner::loop_()
486 {
487 auto dht = activeDht();
488 if (not dht)
489 return {};
490
491 decltype(pending_ops) ops {};
492 {
493 std::lock_guard<std::mutex> lck(storage_mtx);
494 auto s = getStatus();
495 ops = (pending_ops_prio.empty() && (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping))) ?
496 std::move(pending_ops) : std::move(pending_ops_prio);
497 }
498 while (not ops.empty()) {
499 ops.front()(*dht);
500 ops.pop();
501 }
502
503 time_point wakeup {};
504 decltype(rcv) received {};
505 {
506 std::lock_guard<std::mutex> lck(sock_mtx);
507 // move to stack
508 received = std::move(rcv);
509 }
510
511 // Discard old packets
512 size_t dropped {0};
513 if (not received.empty()) {
514 auto now = clock::now();
515 while (not received.empty() and now - received.front()->received > RX_QUEUE_MAX_DELAY) {
516 received.pop();
517 dropped++;
518 }
519 }
520
521 // Handle packets
522 if (not received.empty()) {
523 while (not received.empty()) {
524 auto& pck = received.front();
525 if (clock::now() - pck->received > RX_QUEUE_MAX_DELAY)
526 dropped++;
527 else
528 wakeup = dht->periodic(pck->data.data(), pck->data.size(), std::move(pck->from));
529 received.pop();
530 }
531 } else {
532 // Or just run the scheduler
533 wakeup = dht->periodic(nullptr, 0, nullptr, 0);
534 }
535
536 if (dropped)
537 std::cerr << "Dropped " << dropped << " packets with high delay" << std::endl;
538
539 NodeStatus nstatus4 = dht->getStatus(AF_INET);
540 NodeStatus nstatus6 = dht->getStatus(AF_INET6);
541 if (nstatus4 != status4 || nstatus6 != status6) {
542 status4 = nstatus4;
543 status6 = nstatus6;
544 if (status4 == NodeStatus::Disconnected and status6 == NodeStatus::Disconnected) {
545 // We have lost connection with the DHT. Try to recover using bootstrap nodes.
546 std::unique_lock<std::mutex> lck(bootstrap_mtx);
547 bootstrap_nodes = bootstrap_nodes_all;
548 tryBootstrapContinuously();
549 } else {
550 std::unique_lock<std::mutex> lck(bootstrap_mtx);
551 bootstrap_nodes.clear();
552 }
553 if (statusCb)
554 statusCb(status4, status6);
555 }
556
557 return wakeup;
558 }
559
560 void
get(InfoHash hash,GetCallback vcb,DoneCallback dcb,Value::Filter f,Where w)561 DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
562 {
563 {
564 std::lock_guard<std::mutex> lck(storage_mtx);
565 pending_ops.emplace([=](SecureDht& dht) mutable {
566 dht.get(hash, std::move(vcb), std::move(dcb), std::move(f), std::move(w));
567 });
568 }
569 cv.notify_all();
570 }
571
572 void
get(const std::string & key,GetCallback vcb,DoneCallbackSimple dcb,Value::Filter f,Where w)573 DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, Value::Filter f, Where w)
574 {
575 get(InfoHash::get(key), std::move(vcb), std::move(dcb), std::move(f), std::move(w));
576 }
577 void
query(const InfoHash & hash,QueryCallback cb,DoneCallback done_cb,Query q)578 DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
579 {
580 std::lock_guard<std::mutex> lck(storage_mtx);
581 pending_ops.emplace([=](SecureDht& dht) mutable {
582 dht.query(hash, std::move(cb), std::move(done_cb), std::move(q));
583 });
584 }
585 cv.notify_all();
586 }
587
588 std::future<size_t>
listen(InfoHash hash,ValueCallback vcb,Value::Filter f,Where w)589 DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
590 {
591 auto ret_token = std::make_shared<std::promise<size_t>>();
592 {
593 std::lock_guard<std::mutex> lck(storage_mtx);
594 pending_ops.emplace([=](SecureDht& dht) mutable {
595 #ifdef OPENDHT_PROXY_CLIENT
596 auto tokenbGlobal = listener_token_++;
597 auto& listener = listeners_[tokenbGlobal];
598 listener.hash = hash;
599 listener.f = std::move(f);
600 listener.w = std::move(w);
601 listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals, bool expired) {
602 if (not vcb(vals, expired)) {
603 cancelListen(hash, tokenbGlobal);
604 return false;
605 }
606 return true;
607 };
608 if (auto token = dht.listen(hash, listener.gcb, listener.f, listener.w)) {
609 if (use_proxy) listener.tokenProxyDht = token;
610 else listener.tokenClassicDht = token;
611 }
612 ret_token->set_value(tokenbGlobal);
613 #else
614 ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w)));
615 #endif
616 });
617 }
618 cv.notify_all();
619 return ret_token->get_future();
620 }
621
622 std::future<size_t>
listen(const std::string & key,GetCallback vcb,Value::Filter f,Where w)623 DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Where w)
624 {
625 return listen(InfoHash::get(key), std::move(vcb), std::move(f), std::move(w));
626 }
627
628 void
cancelListen(InfoHash h,size_t token)629 DhtRunner::cancelListen(InfoHash h, size_t token)
630 {
631 {
632 std::lock_guard<std::mutex> lck(storage_mtx);
633 #ifdef OPENDHT_PROXY_CLIENT
634 pending_ops.emplace([=](SecureDht&) {
635 auto it = listeners_.find(token);
636 if (it == listeners_.end()) return;
637 if (it->second.tokenClassicDht)
638 dht_->cancelListen(h, it->second.tokenClassicDht);
639 if (it->second.tokenProxyDht and dht_via_proxy_)
640 dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
641 listeners_.erase(it);
642 });
643 #else
644 pending_ops.emplace([=](SecureDht& dht) {
645 dht.cancelListen(h, token);
646 });
647 #endif // OPENDHT_PROXY_CLIENT
648 }
649 cv.notify_all();
650 }
651
652 void
cancelListen(InfoHash h,std::shared_future<size_t> ftoken)653 DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
654 {
655 {
656 std::lock_guard<std::mutex> lck(storage_mtx);
657 #ifdef OPENDHT_PROXY_CLIENT
658 pending_ops.emplace([=](SecureDht&) {
659 auto it = listeners_.find(ftoken.get());
660 if (it == listeners_.end()) return;
661 if (it->second.tokenClassicDht)
662 dht_->cancelListen(h, it->second.tokenClassicDht);
663 if (it->second.tokenProxyDht and dht_via_proxy_)
664 dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
665 listeners_.erase(it);
666 });
667 #else
668 pending_ops.emplace([=](SecureDht& dht) {
669 dht.cancelListen(h, ftoken.get());
670 });
671 #endif // OPENDHT_PROXY_CLIENT
672 }
673 cv.notify_all();
674 }
675
676 void
put(InfoHash hash,Value && value,DoneCallback cb,time_point created,bool permanent)677 DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
678 {
679 {
680 std::lock_guard<std::mutex> lck(storage_mtx);
681 auto sv = std::make_shared<Value>(std::move(value));
682 pending_ops.emplace([=](SecureDht& dht) {
683 dht.put(hash, sv, cb, created, permanent);
684 });
685 }
686 cv.notify_all();
687 }
688
689 void
put(InfoHash hash,std::shared_ptr<Value> value,DoneCallback cb,time_point created,bool permanent)690 DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
691 {
692 {
693 std::lock_guard<std::mutex> lck(storage_mtx);
694 pending_ops.emplace([=](SecureDht& dht) {
695 dht.put(hash, value, cb, created, permanent);
696 });
697 }
698 cv.notify_all();
699 }
700
701 void
put(const std::string & key,Value && value,DoneCallbackSimple cb,time_point created,bool permanent)702 DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, time_point created, bool permanent)
703 {
704 put(InfoHash::get(key), std::forward<Value>(value), std::move(cb), created, permanent);
705 }
706
707 void
cancelPut(const InfoHash & h,const Value::Id & id)708 DhtRunner::cancelPut(const InfoHash& h , const Value::Id& id)
709 {
710 {
711 std::lock_guard<std::mutex> lck(storage_mtx);
712 pending_ops.emplace([=](SecureDht& dht) {
713 dht.cancelPut(h, id);
714 });
715 }
716 cv.notify_all();
717 }
718
719 void
putSigned(InfoHash hash,std::shared_ptr<Value> value,DoneCallback cb)720 DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb)
721 {
722 {
723 std::lock_guard<std::mutex> lck(storage_mtx);
724 pending_ops.emplace([=](SecureDht& dht) {
725 dht.putSigned(hash, value, cb);
726 });
727 }
728 cv.notify_all();
729 }
730
731 void
putSigned(InfoHash hash,Value && value,DoneCallback cb)732 DhtRunner::putSigned(InfoHash hash, Value&& value, DoneCallback cb)
733 {
734 putSigned(hash, std::make_shared<Value>(std::move(value)), std::move(cb));
735 }
736
737 void
putSigned(const std::string & key,Value && value,DoneCallbackSimple cb)738 DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb)
739 {
740 putSigned(InfoHash::get(key), std::forward<Value>(value), std::move(cb));
741 }
742
743 void
putEncrypted(InfoHash hash,InfoHash to,std::shared_ptr<Value> value,DoneCallback cb)744 DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb)
745 {
746 {
747 std::lock_guard<std::mutex> lck(storage_mtx);
748 pending_ops.emplace([=](SecureDht& dht) {
749 dht.putEncrypted(hash, to, value, cb);
750 });
751 }
752 cv.notify_all();
753 }
754
755 void
putEncrypted(InfoHash hash,InfoHash to,Value && value,DoneCallback cb)756 DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb)
757 {
758 putEncrypted(hash, to, std::make_shared<Value>(std::move(value)), std::move(cb));
759 }
760
761 void
putEncrypted(const std::string & key,InfoHash to,Value && value,DoneCallback cb)762 DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb)
763 {
764 putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), std::move(cb));
765 }
766
767 void
tryBootstrapContinuously()768 DhtRunner::tryBootstrapContinuously()
769 {
770 if (bootstrap_thread.joinable()) {
771 if (bootstraping)
772 return; // already running
773 else
774 bootstrap_thread.join();
775 }
776 bootstraping = true;
777 bootstrap_thread = std::thread([this]() {
778 auto next = clock::now();
779 do {
780 decltype(bootstrap_nodes) nodes;
781 {
782 std::lock_guard<std::mutex> lck(bootstrap_mtx);
783 nodes = bootstrap_nodes;
784 }
785
786 next += BOOTSTRAP_PERIOD;
787 {
788 std::mutex mtx;
789 std::unique_lock<std::mutex> blck(mtx);
790 unsigned ping_count(0);
791 // Reverse: try last inserted bootstrap nodes first
792 for (auto it = nodes.rbegin(); it != nodes.rend(); it++) {
793 ++ping_count;
794 try {
795 bootstrap(SockAddr::resolve(it->first, it->second), [&](bool) {
796 if (not running)
797 return;
798 {
799 std::unique_lock<std::mutex> blck(mtx);
800 --ping_count;
801 }
802 bootstrap_cv.notify_all();
803 });
804 } catch (std::invalid_argument& e) {
805 --ping_count;
806 std::cerr << e.what() << std::endl;
807 }
808 }
809 // wait at least until the next BOOTSTRAP_PERIOD
810 bootstrap_cv.wait_until(blck, next, [&]() { return not running; });
811 // wait for bootstrap requests to end.
812 if (running)
813 bootstrap_cv.wait(blck, [&]() { return not running or ping_count == 0; });
814 }
815 // update state
816 {
817 std::lock_guard<std::mutex> lck(dht_mtx);
818 bootstraping = running and
819 status4 == NodeStatus::Disconnected and
820 status6 == NodeStatus::Disconnected;
821 }
822 } while (bootstraping);
823 });
824 }
825
826 void
bootstrap(const std::string & host,const std::string & service)827 DhtRunner::bootstrap(const std::string& host, const std::string& service)
828 {
829 std::lock_guard<std::mutex> lck(bootstrap_mtx);
830 bootstrap_nodes_all.emplace_back(host, service);
831 bootstrap_nodes.emplace_back(host, service);
832 tryBootstrapContinuously();
833 }
834
835 void
bootstrap(const std::string & hostService)836 DhtRunner::bootstrap(const std::string& hostService)
837 {
838 std::lock_guard<std::mutex> lck(bootstrap_mtx);
839 auto host_service = splitPort(hostService);
840 bootstrap_nodes_all.emplace_back(host_service.first, host_service.second);
841 bootstrap_nodes.emplace_back(std::move(host_service.first), std::move(host_service.second));
842 tryBootstrapContinuously();
843 }
844
845 void
clearBootstrap()846 DhtRunner::clearBootstrap()
847 {
848 std::lock_guard<std::mutex> lck(bootstrap_mtx);
849 bootstrap_nodes_all.clear();
850 }
851
852 void
bootstrap(std::vector<SockAddr> nodes,DoneCallbackSimple && cb)853 DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
854 {
855 std::lock_guard<std::mutex> lck(storage_mtx);
856 pending_ops_prio.emplace([=](SecureDht& dht) mutable {
857 auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr;
858 for (auto& node : nodes) {
859 if (node.getPort() == 0)
860 node.setPort(net::DHT_DEFAULT_PORT);
861 dht.pingNode(std::move(node), cb ? [rem,cb](bool ok) {
862 auto& r = *rem;
863 r.first--;
864 r.second |= ok;
865 if (not r.first)
866 cb(r.second);
867 } : DoneCallbackSimple{});
868 }
869 });
870 cv.notify_all();
871 }
872
873 void
bootstrap(const SockAddr & addr,DoneCallbackSimple && cb)874 DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
875 {
876 std::lock_guard<std::mutex> lck(storage_mtx);
877 pending_ops_prio.emplace([addr, cb](SecureDht& dht) mutable {
878 dht.pingNode(std::move(addr), std::move(cb));
879 });
880 cv.notify_all();
881 }
882
883 void
bootstrap(const InfoHash & id,const SockAddr & address)884 DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
885 {
886 {
887 std::unique_lock<std::mutex> lck(storage_mtx);
888 pending_ops_prio.emplace([id, address](SecureDht& dht) mutable {
889 dht.insertNode(id, address);
890 });
891 }
892 cv.notify_all();
893 }
894
895 void
bootstrap(const std::vector<NodeExport> & nodes)896 DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
897 {
898 {
899 std::lock_guard<std::mutex> lck(storage_mtx);
900 pending_ops_prio.emplace([=](SecureDht& dht) {
901 for (auto& node : nodes)
902 dht.insertNode(node);
903 });
904 }
905 cv.notify_all();
906 }
907
908 void
connectivityChanged()909 DhtRunner::connectivityChanged()
910 {
911 {
912 std::lock_guard<std::mutex> lck(storage_mtx);
913 pending_ops_prio.emplace([=](SecureDht& dht) {
914 dht.connectivityChanged();
915 });
916 }
917 cv.notify_all();
918 }
919
920 void
findCertificate(InfoHash hash,std::function<void (const std::shared_ptr<crypto::Certificate>)> cb)921 DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) {
922 {
923 std::lock_guard<std::mutex> lck(storage_mtx);
924 pending_ops.emplace([=](SecureDht& dht) {
925 dht.findCertificate(hash, cb);
926 });
927 }
928 cv.notify_all();
929 }
930
931 void
resetDht()932 DhtRunner::resetDht()
933 {
934 peerDiscovery_.reset();
935 #ifdef OPENDHT_PROXY_CLIENT
936 listeners_.clear();
937 dht_via_proxy_.reset();
938 #endif // OPENDHT_PROXY_CLIENT
939 dht_.reset();
940 }
941
942 SecureDht*
activeDht() const943 DhtRunner::activeDht() const
944 {
945 #ifdef OPENDHT_PROXY_CLIENT
946 return use_proxy? dht_via_proxy_.get() : dht_.get();
947 #else
948 return dht_.get();
949 #endif // OPENDHT_PROXY_CLIENT
950 }
951
952 void
setProxyServer(const std::string & proxy,const std::string & pushNodeId)953 DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeId)
954 {
955 #ifdef OPENDHT_PROXY_CLIENT
956 std::lock_guard<std::mutex> lck(dht_mtx);
957 if (config_.proxy_server == proxy and config_.push_node_id == pushNodeId)
958 return;
959 config_.proxy_server = proxy;
960 config_.push_node_id = pushNodeId;
961 enableProxy(use_proxy and not config_.proxy_server.empty());
962 #else
963 if (not proxy.empty())
964 std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
965 #endif
966 }
967
968 void
enableProxy(bool proxify)969 DhtRunner::enableProxy(bool proxify)
970 {
971 #ifdef OPENDHT_PROXY_CLIENT
972 if (dht_via_proxy_) {
973 dht_via_proxy_->shutdown({});
974 }
975 if (proxify) {
976 // Init the proxy client
977 auto dht_via_proxy = std::unique_ptr<DhtInterface>(
978 new DhtProxyClient([this]{
979 if (config_.threaded) {
980 {
981 std::lock_guard<std::mutex> lck(storage_mtx);
982 pending_ops_prio.emplace([=](SecureDht&) mutable {});
983 }
984 cv.notify_all();
985 }
986 }, config_.proxy_server, config_.push_node_id)
987 );
988 dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config));
989 #ifdef OPENDHT_PUSH_NOTIFICATIONS
990 if (not pushToken_.empty())
991 dht_via_proxy_->setPushNotificationToken(pushToken_);
992 #endif
993 // add current listeners
994 for (auto& l: listeners_)
995 l.second.tokenProxyDht = dht_via_proxy_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
996 // and use it
997 use_proxy = proxify;
998 } else {
999 use_proxy = proxify;
1000 std::lock_guard<std::mutex> lck(storage_mtx);
1001 if (not listeners_.empty()) {
1002 pending_ops.emplace([this](SecureDht& /*dht*/) mutable {
1003 if (not dht_)
1004 return;
1005 for (auto& l : listeners_) {
1006 if (not l.second.tokenClassicDht) {
1007 l.second.tokenClassicDht = dht_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
1008 }
1009 }
1010 });
1011 }
1012 }
1013 #else
1014 if (proxify)
1015 std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
1016 #endif
1017 }
1018
1019 void
forwardAllMessages(bool forward)1020 DhtRunner::forwardAllMessages(bool forward)
1021 {
1022 std::lock_guard<std::mutex> lck(dht_mtx);
1023 #ifdef OPENDHT_PROXY_SERVER
1024 #ifdef OPENDHT_PROXY_CLIENT
1025 if (dht_via_proxy_)
1026 dht_via_proxy_->forwardAllMessages(forward);
1027 #endif // OPENDHT_PROXY_CLIENT
1028 if (dht_)
1029 dht_->forwardAllMessages(forward);
1030 #else
1031 (void) forward;
1032 #endif // OPENDHT_PROXY_SERVER
1033 }
1034
1035 /**
1036 * Updates the push notification device token
1037 */
1038 void
setPushNotificationToken(const std::string & token)1039 DhtRunner::setPushNotificationToken(const std::string& token) {
1040 std::lock_guard<std::mutex> lck(dht_mtx);
1041 #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
1042 pushToken_ = token;
1043 if (dht_via_proxy_)
1044 dht_via_proxy_->setPushNotificationToken(token);
1045 #else
1046 (void) token;
1047 #endif
1048 }
1049
1050 void
pushNotificationReceived(const std::map<std::string,std::string> & data)1051 DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data)
1052 {
1053 #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
1054 {
1055 std::lock_guard<std::mutex> lck(storage_mtx);
1056 pending_ops_prio.emplace([=](SecureDht&) {
1057 if (dht_via_proxy_)
1058 dht_via_proxy_->pushNotificationReceived(data);
1059 });
1060 }
1061 cv.notify_all();
1062 #else
1063 (void) data;
1064 #endif
1065 }
1066
1067 }
1068