1 /*
2  *  Copyright (C) 2014-2019 Savoir-faire Linux Inc.
3  *  Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  *           Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  *           Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 3 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #include "dhtrunner.h"
22 #include "securedht.h"
23 #include "peer_discovery.h"
24 #include "network_utils.h"
25 
26 #ifdef OPENDHT_PROXY_CLIENT
27 #include "dht_proxy_client.h"
28 #endif
29 
30 #ifdef _WIN32
31 #include <cstring>
32 #define close(x) closesocket(x)
33 #define write(s, b, f) send(s, b, (int)strlen(b), 0)
34 #endif
35 
36 namespace dht {
37 
38 constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD;
39 static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16;
40 static constexpr std::chrono::milliseconds RX_QUEUE_MAX_DELAY(500);
41 static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht";
42 
43 struct DhtRunner::Listener {
44     size_t tokenClassicDht {0};
45     size_t tokenProxyDht {0};
46     ValueCallback gcb;
47     InfoHash hash {};
48     Value::Filter f;
49     Where w;
50 };
51 
52 struct NodeInsertionPack {
53     dht::InfoHash nodeId;
54     in_port_t port;
55     dht::NetId net;
56     MSGPACK_DEFINE(nodeId, port, net)
57 };
58 
DhtRunner()59 DhtRunner::DhtRunner() : dht_()
60 #ifdef OPENDHT_PROXY_CLIENT
61 , dht_via_proxy_()
62 #endif //OPENDHT_PROXY_CLIENT
63 {
64 #ifdef _WIN32
65     WSADATA wsd;
66     if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
67         throw DhtException("Can't initialize Winsock2");
68 #endif
69 }
70 
~DhtRunner()71 DhtRunner::~DhtRunner()
72 {
73     join();
74 #ifdef _WIN32
75     WSACleanup();
76 #endif
77 }
78 
79 void
run(in_port_t port,const Config & config,Context && context)80 DhtRunner::run(in_port_t port, const Config& config, Context&& context)
81 {
82     SockAddr sin4;
83     sin4.setFamily(AF_INET);
84     sin4.setPort(port);
85     SockAddr sin6;
86     sin6.setFamily(AF_INET6);
87     sin6.setPort(port);
88     run(sin4, sin6, config, std::move(context));
89 }
90 
91 void
run(const char * ip4,const char * ip6,const char * service,const Config & config,Context && context)92 DhtRunner::run(const char* ip4, const char* ip6, const char* service, const Config& config, Context&& context)
93 {
94     auto res4 = SockAddr::resolve(ip4, service);
95     auto res6 = SockAddr::resolve(ip6, service);
96     run(res4.empty() ? SockAddr() : res4.front(),
97         res6.empty() ? SockAddr() : res6.front(), config, std::move(context));
98 }
99 
100 void
run(const SockAddr & local4,const SockAddr & local6,const Config & config,Context && context)101 DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context)
102 {
103     if (not running) {
104         if (not context.sock)
105             context.sock.reset(new net::UdpSocket(local4, local6, context.logger ? *context.logger : Logger{}));
106         run(config, std::move(context));
107     }
108 }
109 
110 void
run(const Config & config,Context && context)111 DhtRunner::run(const Config& config, Context&& context)
112 {
113     std::lock_guard<std::mutex> lck(dht_mtx);
114     if (running)
115         return;
116 
117     context.sock->setOnReceive([&] (std::unique_ptr<net::ReceivedPacket>&& pkt) {
118         {
119             std::lock_guard<std::mutex> lck(sock_mtx);
120             if (rcv.size() >= RX_QUEUE_MAX_SIZE) {
121                 std::cerr << "Dropping packet: queue is full!" << std::endl;
122                 rcv.pop();
123             }
124             rcv.emplace(std::move(pkt));
125         }
126         cv.notify_all();
127     });
128 
129     auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config)));
130     dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config));
131 
132 #ifdef OPENDHT_PROXY_CLIENT
133     config_ = config;
134 #endif
135     enableProxy(not config.proxy_server.empty());
136     if (context.logger and dht_via_proxy_) {
137         dht_via_proxy_->setLogger(*context.logger);
138     }
139     if (context.statusChangedCallback) {
140         statusCb = std::move(context.statusChangedCallback);
141     }
142     if (context.certificateStore) {
143         dht_->setLocalCertificateStore(std::move(context.certificateStore));
144         if (dht_via_proxy_)
145             dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore));
146     }
147 
148     running = true;
149     if (not config.threaded)
150         return;
151     dht_thread = std::thread([this]() {
152         while (running) {
153             std::unique_lock<std::mutex> lk(dht_mtx);
154             time_point wakeup = loop_();
155 
156             auto hasJobToDo = [this]() {
157                 if (not running)
158                     return true;
159                 {
160                     std::lock_guard<std::mutex> lck(sock_mtx);
161                     if (not rcv.empty())
162                         return true;
163                 }
164                 {
165                     std::lock_guard<std::mutex> lck(storage_mtx);
166                     if (not pending_ops_prio.empty())
167                         return true;
168                     auto s = getStatus();
169                     if (not pending_ops.empty() and (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping)))
170                         return true;
171                 }
172                 return false;
173             };
174             if (wakeup == time_point::max())
175                 cv.wait(lk, hasJobToDo);
176             else
177                 cv.wait_until(lk, wakeup, hasJobToDo);
178         }
179     });
180 
181     if (config.peer_discovery or config.peer_publish) {
182         peerDiscovery_ = context.peerDiscovery ?
183             std::move(context.peerDiscovery) :
184             std::make_shared<PeerDiscovery>();
185     }
186 
187     auto netId = config.dht_config.node_config.network;
188     if (config.peer_discovery) {
189         peerDiscovery_->startDiscovery<NodeInsertionPack>(PEER_DISCOVERY_DHT_SERVICE, [this, netId](NodeInsertionPack&& v, SockAddr&& addr){
190             addr.setPort(v.port);
191             if (v.nodeId != dht_->getNodeId() && netId == v.net){
192                 bootstrap(v.nodeId, addr);
193             }
194         });
195     }
196     if (config.peer_publish) {
197         msgpack::sbuffer sbuf_node;
198         NodeInsertionPack adc;
199         adc.net = netId;
200         adc.nodeId = dht_->getNodeId();
201         // IPv4
202         if (auto bound4 = dht_->getSocket()->getBound(AF_INET)) {
203             adc.port = bound4.getPort();
204             msgpack::pack(sbuf_node, adc);
205             peerDiscovery_->startPublish(AF_INET, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
206         }
207         // IPv6
208         if (auto bound6 = dht_->getSocket()->getBound(AF_INET6)) {
209             adc.port = bound6.getPort();
210             sbuf_node.clear();
211             msgpack::pack(sbuf_node, adc);
212             peerDiscovery_->startPublish(AF_INET6, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
213         }
214     }
215 }
216 
217 void
shutdown(ShutdownCallback cb)218 DhtRunner::shutdown(ShutdownCallback cb) {
219     if (not running) {
220         cb();
221         return;
222     }
223     std::lock_guard<std::mutex> lck(storage_mtx);
224     pending_ops_prio.emplace([=](SecureDht&) mutable {
225 #ifdef OPENDHT_PROXY_CLIENT
226         if (dht_via_proxy_)
227             dht_via_proxy_->shutdown(cb);
228 #endif
229         if (dht_)
230             dht_->shutdown(cb);
231     });
232     cv.notify_all();
233 }
234 
235 void
join()236 DhtRunner::join()
237 {
238     if (peerDiscovery_)
239         peerDiscovery_->stop();
240 
241     {
242         std::lock_guard<std::mutex> lck(dht_mtx);
243         running = false;
244         cv.notify_all();
245         bootstrap_cv.notify_all();
246         if (dht_)
247             if (auto sock = dht_->getSocket())
248                 sock->stop();
249     }
250 
251     if (dht_thread.joinable())
252         dht_thread.join();
253 
254     if (bootstrap_thread.joinable())
255         bootstrap_thread.join();
256 
257     if (peerDiscovery_) {
258         peerDiscovery_->join();
259     }
260 
261     {
262         std::lock_guard<std::mutex> lck(storage_mtx);
263         pending_ops = decltype(pending_ops)();
264         pending_ops_prio = decltype(pending_ops_prio)();
265     }
266     {
267         std::lock_guard<std::mutex> lck(dht_mtx);
268         resetDht();
269         status4 = NodeStatus::Disconnected;
270         status6 = NodeStatus::Disconnected;
271     }
272 }
273 
274 SockAddr
getBound(sa_family_t af) const275 DhtRunner::getBound(sa_family_t af) const {
276     std::lock_guard<std::mutex> lck(dht_mtx);
277     if (dht_)
278         if (auto sock = dht_->getSocket())
279             return sock->getBound(af);
280     return SockAddr{};
281 }
282 
283 void
dumpTables() const284 DhtRunner::dumpTables() const
285 {
286     std::lock_guard<std::mutex> lck(dht_mtx);
287     activeDht()->dumpTables();
288 }
289 
290 InfoHash
getId() const291 DhtRunner::getId() const
292 {
293     if (auto dht = activeDht())
294         return dht->getId();
295     return {};
296 }
297 
298 InfoHash
getNodeId() const299 DhtRunner::getNodeId() const
300 {
301     if (auto dht = activeDht())
302         return dht->getNodeId();
303     return {};
304 }
305 
306 
307 std::pair<size_t, size_t>
getStoreSize() const308 DhtRunner::getStoreSize() const {
309     std::lock_guard<std::mutex> lck(dht_mtx);
310     if (!dht_)
311         return {};
312     return dht_->getStoreSize();
313 }
314 
315 void
setStorageLimit(size_t limit)316 DhtRunner::setStorageLimit(size_t limit) {
317     std::lock_guard<std::mutex> lck(dht_mtx);
318     if (!dht_)
319         throw std::runtime_error("dht is not running");
320     return dht_->setStorageLimit(limit);
321 }
322 
323 std::vector<NodeExport>
exportNodes() const324 DhtRunner::exportNodes() const {
325     std::lock_guard<std::mutex> lck(dht_mtx);
326     if (!dht_)
327         return {};
328     return dht_->exportNodes();
329 }
330 
331 std::vector<ValuesExport>
exportValues() const332 DhtRunner::exportValues() const {
333     std::lock_guard<std::mutex> lck(dht_mtx);
334     if (!dht_)
335         return {};
336     return dht_->exportValues();
337 }
338 
339 void
setLogger(const Logger & logger)340 DhtRunner::setLogger(const Logger& logger) {
341     std::lock_guard<std::mutex> lck(dht_mtx);
342     if (dht_)
343         dht_->setLogger(logger);
344 #ifdef OPENDHT_PROXY_CLIENT
345     if (dht_via_proxy_)
346         dht_via_proxy_->setLogger(logger);
347 #endif
348 }
349 
350 void
setLoggers(LogMethod error,LogMethod warn,LogMethod debug)351 DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) {
352     Logger logger {std::move(error), std::move(warn), std::move(debug)};
353     setLogger(logger);
354 }
355 
356 void
setLogFilter(const InfoHash & f)357 DhtRunner::setLogFilter(const InfoHash& f) {
358     std::lock_guard<std::mutex> lck(dht_mtx);
359     activeDht()->setLogFilter(f);
360     if (dht_)
361         dht_->setLogFilter(f);
362 #ifdef OPENDHT_PROXY_CLIENT
363     if (dht_via_proxy_)
364         dht_via_proxy_->setLogFilter(f);
365 #endif
366 }
367 
368 void
registerType(const ValueType & type)369 DhtRunner::registerType(const ValueType& type) {
370     std::lock_guard<std::mutex> lck(dht_mtx);
371     activeDht()->registerType(type);
372 }
373 
374 void
importValues(const std::vector<ValuesExport> & values)375 DhtRunner::importValues(const std::vector<ValuesExport>& values) {
376     std::lock_guard<std::mutex> lck(dht_mtx);
377     dht_->importValues(values);
378 }
379 
380 unsigned
getNodesStats(sa_family_t af,unsigned * good_return,unsigned * dubious_return,unsigned * cached_return,unsigned * incoming_return) const381 DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
382 {
383     std::lock_guard<std::mutex> lck(dht_mtx);
384     const auto stats = activeDht()->getNodesStats(af);
385     if (good_return)
386         *good_return = stats.good_nodes;
387     if (dubious_return)
388         *dubious_return = stats.dubious_nodes;
389     if (cached_return)
390         *cached_return = stats.cached_nodes;
391     if (incoming_return)
392         *incoming_return = stats.incoming_nodes;
393     return stats.good_nodes + stats.dubious_nodes;
394 }
395 
396 NodeStats
getNodesStats(sa_family_t af) const397 DhtRunner::getNodesStats(sa_family_t af) const
398 {
399     std::lock_guard<std::mutex> lck(dht_mtx);
400     return activeDht()->getNodesStats(af);
401 }
402 
403 NodeInfo
getNodeInfo() const404 DhtRunner::getNodeInfo() const {
405     std::lock_guard<std::mutex> lck(dht_mtx);
406     NodeInfo info;
407     info.id = getId();
408     info.node_id = getNodeId();
409     info.ipv4 = dht_->getNodesStats(AF_INET);
410     info.ipv6 = dht_->getNodesStats(AF_INET6);
411     return info;
412 }
413 
414 std::vector<unsigned>
getNodeMessageStats(bool in) const415 DhtRunner::getNodeMessageStats(bool in) const
416 {
417     std::lock_guard<std::mutex> lck(dht_mtx);
418     return activeDht()->getNodeMessageStats(in);
419 }
420 
421 std::string
getStorageLog() const422 DhtRunner::getStorageLog() const
423 {
424     std::lock_guard<std::mutex> lck(dht_mtx);
425     return activeDht()->getStorageLog();
426 }
427 std::string
getStorageLog(const InfoHash & f) const428 DhtRunner::getStorageLog(const InfoHash& f) const
429 {
430     std::lock_guard<std::mutex> lck(dht_mtx);
431     return activeDht()->getStorageLog(f);
432 }
433 std::string
getRoutingTablesLog(sa_family_t af) const434 DhtRunner::getRoutingTablesLog(sa_family_t af) const
435 {
436     std::lock_guard<std::mutex> lck(dht_mtx);
437     return activeDht()->getRoutingTablesLog(af);
438 }
439 std::string
getSearchesLog(sa_family_t af) const440 DhtRunner::getSearchesLog(sa_family_t af) const
441 {
442     std::lock_guard<std::mutex> lck(dht_mtx);
443     return activeDht()->getSearchesLog(af);
444 }
445 std::string
getSearchLog(const InfoHash & f,sa_family_t af) const446 DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const
447 {
448     std::lock_guard<std::mutex> lck(dht_mtx);
449     return activeDht()->getSearchLog(f, af);
450 }
451 std::vector<SockAddr>
getPublicAddress(sa_family_t af)452 DhtRunner::getPublicAddress(sa_family_t af)
453 {
454     std::lock_guard<std::mutex> lck(dht_mtx);
455     if (auto dht = activeDht())
456         return dht->getPublicAddress(af);
457     return {};
458 }
459 std::vector<std::string>
getPublicAddressStr(sa_family_t af)460 DhtRunner::getPublicAddressStr(sa_family_t af)
461 {
462     auto addrs = getPublicAddress(af);
463     std::vector<std::string> ret(addrs.size());
464     std::transform(addrs.begin(), addrs.end(), ret.begin(), [](const SockAddr& a) { return a.toString(); });
465     return ret;
466 }
467 
468 void
registerCertificate(std::shared_ptr<crypto::Certificate> cert)469 DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) {
470     std::lock_guard<std::mutex> lck(dht_mtx);
471     activeDht()->registerCertificate(cert);
472 }
473 void
setLocalCertificateStore(CertificateStoreQuery && query_method)474 DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) {
475     std::lock_guard<std::mutex> lck(dht_mtx);
476 #ifdef OPENDHT_PROXY_CLIENT
477     if (dht_via_proxy_)
478         dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
479 #endif
480     if (dht_)
481         dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
482 }
483 
484 time_point
loop_()485 DhtRunner::loop_()
486 {
487     auto dht = activeDht();
488     if (not dht)
489         return {};
490 
491     decltype(pending_ops) ops {};
492     {
493         std::lock_guard<std::mutex> lck(storage_mtx);
494         auto s = getStatus();
495         ops = (pending_ops_prio.empty() && (s == NodeStatus::Connected or (s == NodeStatus::Disconnected and not bootstraping))) ?
496                std::move(pending_ops) : std::move(pending_ops_prio);
497     }
498     while (not ops.empty()) {
499         ops.front()(*dht);
500         ops.pop();
501     }
502 
503     time_point wakeup {};
504     decltype(rcv) received {};
505     {
506         std::lock_guard<std::mutex> lck(sock_mtx);
507         // move to stack
508         received = std::move(rcv);
509     }
510 
511     // Discard old packets
512     size_t dropped {0};
513     if (not received.empty()) {
514         auto now = clock::now();
515         while (not received.empty() and now - received.front()->received > RX_QUEUE_MAX_DELAY) {
516             received.pop();
517             dropped++;
518         }
519     }
520 
521     // Handle packets
522     if (not received.empty()) {
523         while (not received.empty()) {
524             auto& pck = received.front();
525             if (clock::now() - pck->received > RX_QUEUE_MAX_DELAY)
526                 dropped++;
527             else
528                 wakeup = dht->periodic(pck->data.data(), pck->data.size(), std::move(pck->from));
529             received.pop();
530         }
531     } else {
532         // Or just run the scheduler
533         wakeup = dht->periodic(nullptr, 0, nullptr, 0);
534     }
535 
536     if (dropped)
537         std::cerr << "Dropped " << dropped << " packets with high delay" << std::endl;
538 
539     NodeStatus nstatus4 = dht->getStatus(AF_INET);
540     NodeStatus nstatus6 = dht->getStatus(AF_INET6);
541     if (nstatus4 != status4 || nstatus6 != status6) {
542         status4 = nstatus4;
543         status6 = nstatus6;
544         if (status4 == NodeStatus::Disconnected and status6 == NodeStatus::Disconnected) {
545             // We have lost connection with the DHT.  Try to recover using bootstrap nodes.
546             std::unique_lock<std::mutex> lck(bootstrap_mtx);
547             bootstrap_nodes = bootstrap_nodes_all;
548             tryBootstrapContinuously();
549         } else {
550             std::unique_lock<std::mutex> lck(bootstrap_mtx);
551             bootstrap_nodes.clear();
552         }
553         if (statusCb)
554             statusCb(status4, status6);
555     }
556 
557     return wakeup;
558 }
559 
560 void
get(InfoHash hash,GetCallback vcb,DoneCallback dcb,Value::Filter f,Where w)561 DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
562 {
563     {
564         std::lock_guard<std::mutex> lck(storage_mtx);
565         pending_ops.emplace([=](SecureDht& dht) mutable {
566             dht.get(hash, std::move(vcb), std::move(dcb), std::move(f), std::move(w));
567         });
568     }
569     cv.notify_all();
570 }
571 
572 void
get(const std::string & key,GetCallback vcb,DoneCallbackSimple dcb,Value::Filter f,Where w)573 DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, Value::Filter f, Where w)
574 {
575     get(InfoHash::get(key), std::move(vcb), std::move(dcb), std::move(f), std::move(w));
576 }
577 void
query(const InfoHash & hash,QueryCallback cb,DoneCallback done_cb,Query q)578 DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
579     {
580         std::lock_guard<std::mutex> lck(storage_mtx);
581         pending_ops.emplace([=](SecureDht& dht) mutable {
582             dht.query(hash, std::move(cb), std::move(done_cb), std::move(q));
583         });
584     }
585     cv.notify_all();
586 }
587 
588 std::future<size_t>
listen(InfoHash hash,ValueCallback vcb,Value::Filter f,Where w)589 DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
590 {
591     auto ret_token = std::make_shared<std::promise<size_t>>();
592     {
593         std::lock_guard<std::mutex> lck(storage_mtx);
594         pending_ops.emplace([=](SecureDht& dht) mutable {
595 #ifdef OPENDHT_PROXY_CLIENT
596             auto tokenbGlobal = listener_token_++;
597             auto& listener = listeners_[tokenbGlobal];
598             listener.hash = hash;
599             listener.f = std::move(f);
600             listener.w = std::move(w);
601             listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals, bool expired) {
602                 if (not vcb(vals, expired)) {
603                     cancelListen(hash, tokenbGlobal);
604                     return false;
605                 }
606                 return true;
607             };
608             if (auto token = dht.listen(hash, listener.gcb, listener.f, listener.w)) {
609                 if (use_proxy)  listener.tokenProxyDht = token;
610                 else            listener.tokenClassicDht = token;
611             }
612             ret_token->set_value(tokenbGlobal);
613 #else
614             ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w)));
615 #endif
616         });
617     }
618     cv.notify_all();
619     return ret_token->get_future();
620 }
621 
622 std::future<size_t>
listen(const std::string & key,GetCallback vcb,Value::Filter f,Where w)623 DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Where w)
624 {
625     return listen(InfoHash::get(key), std::move(vcb), std::move(f), std::move(w));
626 }
627 
628 void
cancelListen(InfoHash h,size_t token)629 DhtRunner::cancelListen(InfoHash h, size_t token)
630 {
631     {
632         std::lock_guard<std::mutex> lck(storage_mtx);
633 #ifdef OPENDHT_PROXY_CLIENT
634         pending_ops.emplace([=](SecureDht&) {
635             auto it = listeners_.find(token);
636             if (it == listeners_.end()) return;
637             if (it->second.tokenClassicDht)
638                 dht_->cancelListen(h, it->second.tokenClassicDht);
639             if (it->second.tokenProxyDht and dht_via_proxy_)
640                 dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
641             listeners_.erase(it);
642         });
643 #else
644         pending_ops.emplace([=](SecureDht& dht) {
645             dht.cancelListen(h, token);
646         });
647 #endif // OPENDHT_PROXY_CLIENT
648     }
649     cv.notify_all();
650 }
651 
652 void
cancelListen(InfoHash h,std::shared_future<size_t> ftoken)653 DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
654 {
655     {
656         std::lock_guard<std::mutex> lck(storage_mtx);
657 #ifdef OPENDHT_PROXY_CLIENT
658         pending_ops.emplace([=](SecureDht&) {
659             auto it = listeners_.find(ftoken.get());
660             if (it == listeners_.end()) return;
661             if (it->second.tokenClassicDht)
662                 dht_->cancelListen(h, it->second.tokenClassicDht);
663             if (it->second.tokenProxyDht and dht_via_proxy_)
664                 dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
665             listeners_.erase(it);
666         });
667 #else
668         pending_ops.emplace([=](SecureDht& dht) {
669             dht.cancelListen(h, ftoken.get());
670         });
671 #endif // OPENDHT_PROXY_CLIENT
672     }
673     cv.notify_all();
674 }
675 
676 void
put(InfoHash hash,Value && value,DoneCallback cb,time_point created,bool permanent)677 DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
678 {
679     {
680         std::lock_guard<std::mutex> lck(storage_mtx);
681         auto sv = std::make_shared<Value>(std::move(value));
682         pending_ops.emplace([=](SecureDht& dht) {
683             dht.put(hash, sv, cb, created, permanent);
684         });
685     }
686     cv.notify_all();
687 }
688 
689 void
put(InfoHash hash,std::shared_ptr<Value> value,DoneCallback cb,time_point created,bool permanent)690 DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
691 {
692     {
693         std::lock_guard<std::mutex> lck(storage_mtx);
694         pending_ops.emplace([=](SecureDht& dht) {
695             dht.put(hash, value, cb, created, permanent);
696         });
697     }
698     cv.notify_all();
699 }
700 
701 void
put(const std::string & key,Value && value,DoneCallbackSimple cb,time_point created,bool permanent)702 DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, time_point created, bool permanent)
703 {
704     put(InfoHash::get(key), std::forward<Value>(value), std::move(cb), created, permanent);
705 }
706 
707 void
cancelPut(const InfoHash & h,const Value::Id & id)708 DhtRunner::cancelPut(const InfoHash& h , const Value::Id& id)
709 {
710     {
711         std::lock_guard<std::mutex> lck(storage_mtx);
712         pending_ops.emplace([=](SecureDht& dht) {
713             dht.cancelPut(h, id);
714         });
715     }
716     cv.notify_all();
717 }
718 
719 void
putSigned(InfoHash hash,std::shared_ptr<Value> value,DoneCallback cb)720 DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb)
721 {
722     {
723         std::lock_guard<std::mutex> lck(storage_mtx);
724         pending_ops.emplace([=](SecureDht& dht) {
725             dht.putSigned(hash, value, cb);
726         });
727     }
728     cv.notify_all();
729 }
730 
731 void
putSigned(InfoHash hash,Value && value,DoneCallback cb)732 DhtRunner::putSigned(InfoHash hash, Value&& value, DoneCallback cb)
733 {
734     putSigned(hash, std::make_shared<Value>(std::move(value)), std::move(cb));
735 }
736 
737 void
putSigned(const std::string & key,Value && value,DoneCallbackSimple cb)738 DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb)
739 {
740     putSigned(InfoHash::get(key), std::forward<Value>(value), std::move(cb));
741 }
742 
743 void
putEncrypted(InfoHash hash,InfoHash to,std::shared_ptr<Value> value,DoneCallback cb)744 DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb)
745 {
746     {
747         std::lock_guard<std::mutex> lck(storage_mtx);
748         pending_ops.emplace([=](SecureDht& dht) {
749             dht.putEncrypted(hash, to, value, cb);
750         });
751     }
752     cv.notify_all();
753 }
754 
755 void
putEncrypted(InfoHash hash,InfoHash to,Value && value,DoneCallback cb)756 DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb)
757 {
758     putEncrypted(hash, to, std::make_shared<Value>(std::move(value)), std::move(cb));
759 }
760 
761 void
putEncrypted(const std::string & key,InfoHash to,Value && value,DoneCallback cb)762 DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb)
763 {
764     putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), std::move(cb));
765 }
766 
767 void
tryBootstrapContinuously()768 DhtRunner::tryBootstrapContinuously()
769 {
770     if (bootstrap_thread.joinable()) {
771         if (bootstraping)
772             return; // already running
773         else
774             bootstrap_thread.join();
775     }
776     bootstraping = true;
777     bootstrap_thread = std::thread([this]() {
778         auto next = clock::now();
779         do {
780             decltype(bootstrap_nodes) nodes;
781             {
782                 std::lock_guard<std::mutex> lck(bootstrap_mtx);
783                 nodes = bootstrap_nodes;
784             }
785 
786             next += BOOTSTRAP_PERIOD;
787             {
788                 std::mutex mtx;
789                 std::unique_lock<std::mutex> blck(mtx);
790                 unsigned ping_count(0);
791                 // Reverse: try last inserted bootstrap nodes first
792                 for (auto it = nodes.rbegin(); it != nodes.rend(); it++) {
793                     ++ping_count;
794                     try {
795                         bootstrap(SockAddr::resolve(it->first, it->second), [&](bool) {
796                             if (not running)
797                                 return;
798                             {
799                                 std::unique_lock<std::mutex> blck(mtx);
800                                 --ping_count;
801                             }
802                             bootstrap_cv.notify_all();
803                         });
804                     } catch (std::invalid_argument& e) {
805                         --ping_count;
806                         std::cerr << e.what() << std::endl;
807                     }
808                 }
809                 // wait at least until the next BOOTSTRAP_PERIOD
810                 bootstrap_cv.wait_until(blck, next, [&]() { return not running; });
811                 // wait for bootstrap requests to end.
812                 if (running)
813                    bootstrap_cv.wait(blck, [&]() { return not running or ping_count == 0; });
814             }
815             // update state
816             {
817                 std::lock_guard<std::mutex> lck(dht_mtx);
818                 bootstraping = running and
819                                status4 == NodeStatus::Disconnected and
820                                status6 == NodeStatus::Disconnected;
821             }
822         } while (bootstraping);
823     });
824 }
825 
826 void
bootstrap(const std::string & host,const std::string & service)827 DhtRunner::bootstrap(const std::string& host, const std::string& service)
828 {
829     std::lock_guard<std::mutex> lck(bootstrap_mtx);
830     bootstrap_nodes_all.emplace_back(host, service);
831     bootstrap_nodes.emplace_back(host, service);
832     tryBootstrapContinuously();
833 }
834 
835 void
bootstrap(const std::string & hostService)836 DhtRunner::bootstrap(const std::string& hostService)
837 {
838     std::lock_guard<std::mutex> lck(bootstrap_mtx);
839     auto host_service = splitPort(hostService);
840     bootstrap_nodes_all.emplace_back(host_service.first, host_service.second);
841     bootstrap_nodes.emplace_back(std::move(host_service.first), std::move(host_service.second));
842     tryBootstrapContinuously();
843 }
844 
845 void
clearBootstrap()846 DhtRunner::clearBootstrap()
847 {
848     std::lock_guard<std::mutex> lck(bootstrap_mtx);
849     bootstrap_nodes_all.clear();
850 }
851 
852 void
bootstrap(std::vector<SockAddr> nodes,DoneCallbackSimple && cb)853 DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
854 {
855     std::lock_guard<std::mutex> lck(storage_mtx);
856     pending_ops_prio.emplace([=](SecureDht& dht) mutable {
857         auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr;
858         for (auto& node : nodes) {
859             if (node.getPort() == 0)
860                 node.setPort(net::DHT_DEFAULT_PORT);
861             dht.pingNode(std::move(node), cb ? [rem,cb](bool ok) {
862                 auto& r = *rem;
863                 r.first--;
864                 r.second |= ok;
865                 if (not r.first)
866                     cb(r.second);
867             } : DoneCallbackSimple{});
868         }
869     });
870     cv.notify_all();
871 }
872 
873 void
bootstrap(const SockAddr & addr,DoneCallbackSimple && cb)874 DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
875 {
876     std::lock_guard<std::mutex> lck(storage_mtx);
877     pending_ops_prio.emplace([addr, cb](SecureDht& dht) mutable {
878         dht.pingNode(std::move(addr), std::move(cb));
879     });
880     cv.notify_all();
881 }
882 
883 void
bootstrap(const InfoHash & id,const SockAddr & address)884 DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
885 {
886     {
887         std::unique_lock<std::mutex> lck(storage_mtx);
888         pending_ops_prio.emplace([id, address](SecureDht& dht) mutable {
889             dht.insertNode(id, address);
890         });
891     }
892     cv.notify_all();
893 }
894 
895 void
bootstrap(const std::vector<NodeExport> & nodes)896 DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
897 {
898     {
899         std::lock_guard<std::mutex> lck(storage_mtx);
900         pending_ops_prio.emplace([=](SecureDht& dht) {
901             for (auto& node : nodes)
902                 dht.insertNode(node);
903         });
904     }
905     cv.notify_all();
906 }
907 
908 void
connectivityChanged()909 DhtRunner::connectivityChanged()
910 {
911     {
912         std::lock_guard<std::mutex> lck(storage_mtx);
913         pending_ops_prio.emplace([=](SecureDht& dht) {
914             dht.connectivityChanged();
915         });
916     }
917     cv.notify_all();
918 }
919 
920 void
findCertificate(InfoHash hash,std::function<void (const std::shared_ptr<crypto::Certificate>)> cb)921 DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) {
922     {
923         std::lock_guard<std::mutex> lck(storage_mtx);
924         pending_ops.emplace([=](SecureDht& dht) {
925             dht.findCertificate(hash, cb);
926         });
927     }
928     cv.notify_all();
929 }
930 
931 void
resetDht()932 DhtRunner::resetDht()
933 {
934     peerDiscovery_.reset();
935 #ifdef OPENDHT_PROXY_CLIENT
936     listeners_.clear();
937     dht_via_proxy_.reset();
938 #endif // OPENDHT_PROXY_CLIENT
939     dht_.reset();
940 }
941 
942 SecureDht*
activeDht() const943 DhtRunner::activeDht() const
944 {
945 #ifdef OPENDHT_PROXY_CLIENT
946     return use_proxy? dht_via_proxy_.get() : dht_.get();
947 #else
948     return dht_.get();
949 #endif // OPENDHT_PROXY_CLIENT
950 }
951 
952 void
setProxyServer(const std::string & proxy,const std::string & pushNodeId)953 DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeId)
954 {
955 #ifdef OPENDHT_PROXY_CLIENT
956     std::lock_guard<std::mutex> lck(dht_mtx);
957     if (config_.proxy_server == proxy and config_.push_node_id == pushNodeId)
958         return;
959     config_.proxy_server = proxy;
960     config_.push_node_id = pushNodeId;
961     enableProxy(use_proxy and not config_.proxy_server.empty());
962 #else
963     if (not proxy.empty())
964         std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
965 #endif
966 }
967 
968 void
enableProxy(bool proxify)969 DhtRunner::enableProxy(bool proxify)
970 {
971 #ifdef OPENDHT_PROXY_CLIENT
972     if (dht_via_proxy_) {
973         dht_via_proxy_->shutdown({});
974     }
975     if (proxify) {
976         // Init the proxy client
977         auto dht_via_proxy = std::unique_ptr<DhtInterface>(
978             new DhtProxyClient([this]{
979                 if (config_.threaded) {
980                     {
981                         std::lock_guard<std::mutex> lck(storage_mtx);
982                         pending_ops_prio.emplace([=](SecureDht&) mutable {});
983                     }
984                     cv.notify_all();
985                 }
986             }, config_.proxy_server, config_.push_node_id)
987         );
988         dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config));
989 #ifdef OPENDHT_PUSH_NOTIFICATIONS
990         if (not pushToken_.empty())
991             dht_via_proxy_->setPushNotificationToken(pushToken_);
992 #endif
993         // add current listeners
994         for (auto& l: listeners_)
995             l.second.tokenProxyDht = dht_via_proxy_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
996         // and use it
997         use_proxy = proxify;
998     } else {
999         use_proxy = proxify;
1000         std::lock_guard<std::mutex> lck(storage_mtx);
1001         if (not listeners_.empty()) {
1002             pending_ops.emplace([this](SecureDht& /*dht*/) mutable {
1003                 if (not dht_)
1004                     return;
1005                 for (auto& l : listeners_) {
1006                     if (not l.second.tokenClassicDht) {
1007                         l.second.tokenClassicDht = dht_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
1008                     }
1009                 }
1010             });
1011         }
1012     }
1013 #else
1014     if (proxify)
1015         std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
1016 #endif
1017 }
1018 
1019 void
forwardAllMessages(bool forward)1020 DhtRunner::forwardAllMessages(bool forward)
1021 {
1022     std::lock_guard<std::mutex> lck(dht_mtx);
1023 #ifdef OPENDHT_PROXY_SERVER
1024 #ifdef OPENDHT_PROXY_CLIENT
1025     if (dht_via_proxy_)
1026         dht_via_proxy_->forwardAllMessages(forward);
1027 #endif // OPENDHT_PROXY_CLIENT
1028     if (dht_)
1029         dht_->forwardAllMessages(forward);
1030 #else
1031     (void) forward;
1032 #endif // OPENDHT_PROXY_SERVER
1033 }
1034 
1035 /**
1036  * Updates the push notification device token
1037  */
1038 void
setPushNotificationToken(const std::string & token)1039 DhtRunner::setPushNotificationToken(const std::string& token) {
1040     std::lock_guard<std::mutex> lck(dht_mtx);
1041 #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
1042     pushToken_ = token;
1043     if (dht_via_proxy_)
1044         dht_via_proxy_->setPushNotificationToken(token);
1045 #else
1046     (void) token;
1047 #endif
1048 }
1049 
1050 void
pushNotificationReceived(const std::map<std::string,std::string> & data)1051 DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data)
1052 {
1053 #if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
1054     {
1055         std::lock_guard<std::mutex> lck(storage_mtx);
1056         pending_ops_prio.emplace([=](SecureDht&) {
1057             if (dht_via_proxy_)
1058                 dht_via_proxy_->pushNotificationReceived(data);
1059         });
1060     }
1061     cv.notify_all();
1062 #else
1063     (void) data;
1064 #endif
1065 }
1066 
1067 }
1068