1 /*
2  *  Copyright (C) 2016-2019 Savoir-faire Linux Inc.
3  *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  *          Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "dht_proxy_client.h"
21 
22 #include "dhtrunner.h"
23 #include "op_cache.h"
24 #include "utils.h"
25 
26 #include <restbed>
27 #include <json/json.h>
28 
29 #include <chrono>
30 #include <vector>
31 
32 namespace dht {
33 
34 struct DhtProxyClient::InfoState {
35     std::atomic_uint ipv4 {0}, ipv6 {0};
36     std::atomic_bool cancel {false};
37 };
38 
39 struct DhtProxyClient::ListenState {
40     std::atomic_bool ok {true};
41     std::atomic_bool cancel {false};
42 };
43 
44 struct DhtProxyClient::Listener
45 {
46     OpValueCache cache;
47     ValueCallback cb;
48     Sp<restbed::Request> req;
49     std::thread thread;
50     unsigned callbackId;
51     Sp<ListenState> state;
52     Sp<Scheduler::Job> refreshJob;
Listenerdht::DhtProxyClient::Listener53     Listener(OpValueCache&& c, const Sp<restbed::Request>& r)
54         : cache(std::move(c)), req(r) {}
55 };
56 
57 struct PermanentPut {
58     Sp<Value> value;
59     Sp<Scheduler::Job> refreshJob;
60     Sp<std::atomic_bool> ok;
PermanentPutdht::PermanentPut61     PermanentPut(const Sp<Value>& v, Sp<Scheduler::Job>&& j, const Sp<std::atomic_bool>& o)
62         : value(v), refreshJob(std::move(j)), ok(o) {}
63 };
64 
65 struct DhtProxyClient::ProxySearch {
66     SearchCache ops {};
67     Sp<Scheduler::Job> opExpirationJob {};
68     std::map<size_t, Listener> listeners {};
69     std::map<Value::Id, PermanentPut> puts {};
70 };
71 
DhtProxyClient()72 DhtProxyClient::DhtProxyClient() {}
73 
DhtProxyClient(std::function<void ()> signal,const std::string & serverHost,const std::string & pushClientId,const Logger & l)74 DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId, const Logger& l)
75 : DhtInterface(l), serverHost_(serverHost), pushClientId_(pushClientId), loopSignal_(signal)
76 {
77     if (serverHost_.find("://") == std::string::npos)
78         serverHost_ = proxy::HTTP_PROTO + serverHost_;
79     if (!serverHost_.empty())
80         startProxy();
81 }
82 
83 void
confirmProxy()84 DhtProxyClient::confirmProxy()
85 {
86     if (serverHost_.empty()) return;
87     getConnectivityStatus();
88 }
89 
90 void
startProxy()91 DhtProxyClient::startProxy()
92 {
93     if (serverHost_.empty()) return;
94     DHT_LOG.w("Staring proxy client to %s", serverHost_.c_str());
95     nextProxyConfirmation = scheduler.add(scheduler.time(), std::bind(&DhtProxyClient::confirmProxy, this));
96     listenerRestart = std::make_shared<Scheduler::Job>(std::bind(&DhtProxyClient::restartListeners, this));
97     loopSignal_();
98 }
99 
~DhtProxyClient()100 DhtProxyClient::~DhtProxyClient()
101 {
102     isDestroying_ = true;
103     cancelAllOperations();
104     cancelAllListeners();
105     if (infoState_)
106         infoState_->cancel = true;
107     if (statusThread_.joinable())
108         statusThread_.join();
109 }
110 
111 std::vector<Sp<Value>>
getLocal(const InfoHash & k,const Value::Filter & filter) const112 DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const {
113     std::lock_guard<std::mutex> lock(searchLock_);
114     auto s = searches_.find(k);
115     if (s == searches_.end())
116         return {};
117     return s->second.ops.get(filter);
118 }
119 
120 Sp<Value>
getLocalById(const InfoHash & k,Value::Id id) const121 DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const {
122     std::lock_guard<std::mutex> lock(searchLock_);
123     auto s = searches_.find(k);
124     if (s == searches_.end())
125         return {};
126     return s->second.ops.get(id);
127 }
128 
129 void
cancelAllOperations()130 DhtProxyClient::cancelAllOperations()
131 {
132     std::lock_guard<std::mutex> lock(lockOperations_);
133     auto operation = operations_.begin();
134     while (operation != operations_.end()) {
135         if (operation->thread.joinable()) {
136             // Close connection to stop operation?
137             if (operation->req) {
138                 try {
139                     restbed::Http::close(operation->req);
140                 } catch (const std::exception& e) {
141                     DHT_LOG.w("Error closing socket: %s", e.what());
142                 }
143                 operation->req.reset();
144             }
145             operation->thread.join();
146             operation = operations_.erase(operation);
147         } else {
148             ++operation;
149         }
150     }
151 }
152 
153 void
cancelAllListeners()154 DhtProxyClient::cancelAllListeners()
155 {
156     std::lock_guard<std::mutex> lock(searchLock_);
157     DHT_LOG.w("Cancelling all listeners for %zu searches", searches_.size());
158     for (auto& s: searches_) {
159         s.second.ops.cancelAll([&](size_t token){
160             auto l = s.second.listeners.find(token);
161             if (l == s.second.listeners.end())
162                 return;
163             if (l->second.thread.joinable()) {
164                 // Close connection to stop listener?
165                 l->second.state->cancel = true;
166                 if (l->second.req) {
167                     try {
168                         restbed::Http::close(l->second.req);
169                     } catch (const std::exception& e) {
170                         DHT_LOG.w("Error closing socket: %s", e.what());
171                     }
172                     l->second.req.reset();
173                 }
174                 l->second.thread.join();
175             }
176             s.second.listeners.erase(token);
177         });
178     }
179 }
180 
181 void
shutdown(ShutdownCallback cb)182 DhtProxyClient::shutdown(ShutdownCallback cb)
183 {
184     cancelAllOperations();
185     cancelAllListeners();
186     if (cb)
187         cb();
188 }
189 
190 NodeStatus
getStatus(sa_family_t af) const191 DhtProxyClient::getStatus(sa_family_t af) const
192 {
193     std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
194     switch (af)
195     {
196     case AF_INET:
197         return statusIpv4_;
198     case AF_INET6:
199         return statusIpv6_;
200     default:
201         return NodeStatus::Disconnected;
202     }
203 }
204 
205 bool
isRunning(sa_family_t af) const206 DhtProxyClient::isRunning(sa_family_t af) const
207 {
208     std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
209     switch (af)
210     {
211     case AF_INET:
212         return statusIpv4_ != NodeStatus::Disconnected;
213     case AF_INET6:
214         return statusIpv6_ != NodeStatus::Disconnected;
215     default:
216         return false;
217     }
218 }
219 
220 time_point
periodic(const uint8_t *,size_t,SockAddr)221 DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr)
222 {
223     // Exec all currently stored callbacks
224     scheduler.syncTime();
225     decltype(callbacks_) callbacks;
226     {
227         std::lock_guard<std::mutex> lock(lockCallbacks);
228         callbacks = std::move(callbacks_);
229     }
230     for (auto& callback : callbacks)
231         callback();
232     callbacks.clear();
233 
234     // Remove finished operations
235     {
236         std::lock_guard<std::mutex> lock(lockOperations_);
237         auto operation = operations_.begin();
238         while (operation != operations_.end()) {
239             if (*(operation->finished)) {
240                 if (operation->thread.joinable()) {
241                     // Close connection to stop operation?
242                     if (operation->req) {
243                         try {
244                             restbed::Http::close(operation->req);
245                         } catch (const std::exception& e) {
246                             DHT_LOG.w("Error closing socket: %s", e.what());
247                         }
248                         operation->req.reset();
249                     }
250                     operation->thread.join();
251                 }
252                 operation = operations_.erase(operation);
253             } else {
254                 ++operation;
255             }
256         }
257     }
258     return scheduler.run();
259 }
260 
261 void
get(const InfoHash & key,GetCallback cb,DoneCallback donecb,Value::Filter && f,Where && w)262 DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w)
263 {
264     DHT_LOG.d(key, "[search %s]: get", key.to_c_str());
265     restbed::Uri uri(serverHost_ + "/" + key.toString());
266     auto req = std::make_shared<restbed::Request>(uri);
267     Value::Filter filter = w.empty() ? f : f.chain(w.getFilter());
268 
269     auto finished = std::make_shared<std::atomic_bool>(false);
270     Operation o;
271     o.req = req;
272     o.finished = finished;
273     o.thread = std::thread([=](){
274         // Try to contact the proxy and set the status to connected when done.
275         // will change the connectivity status
276         struct GetState{ std::atomic_bool ok {true}; std::atomic_bool stop {false}; };
277         auto state = std::make_shared<GetState>();
278         try {
279             restbed::Http::async(req,
280                 [=](const std::shared_ptr<restbed::Request>& req,
281                     const std::shared_ptr<restbed::Response>& reply) {
282                 auto code = reply->get_status_code();
283 
284                 if (code == 200) {
285                     try {
286                         while (restbed::Http::is_open(req) and not *finished and not state->stop) {
287                             restbed::Http::fetch("\n", reply);
288                             if (*finished or state->stop)
289                                 break;
290                             std::string body;
291                             reply->get_body(body);
292                             reply->set_body(""); // Reset the body for the next fetch
293 
294                             std::string err;
295                             Json::Value json;
296                             Json::CharReaderBuilder rbuilder;
297                             auto* char_data = reinterpret_cast<const char*>(&body[0]);
298                             auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
299                             if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
300                                 auto value = std::make_shared<Value>(json);
301                                 if ((not filter or filter(*value)) and cb) {
302                                     {
303                                         std::lock_guard<std::mutex> lock(lockCallbacks);
304                                         callbacks_.emplace_back([cb, value, state]() {
305                                             if (not state->stop and not cb({value}))
306                                                 state->stop = true;
307                                         });
308                                     }
309                                     loopSignal_();
310                                 }
311                             } else {
312                                 state->ok = false;
313                             }
314                         }
315                     } catch (std::runtime_error& e) { }
316                 } else {
317                     state->ok = false;
318                 }
319             }).wait();
320         } catch(const std::exception& e) {
321             state->ok = false;
322         }
323         if (donecb) {
324             {
325                 std::lock_guard<std::mutex> lock(lockCallbacks);
326                 callbacks_.emplace_back([=](){
327                     donecb(state->ok, {});
328                     state->stop = true;
329                 });
330             }
331             loopSignal_();
332         }
333         if (!state->ok) {
334             // Connection failed, update connectivity
335             opFailed();
336         }
337         *finished = true;
338     });
339     {
340         std::lock_guard<std::mutex> lock(lockOperations_);
341         operations_.emplace_back(std::move(o));
342     }
343 }
344 
345 void
put(const InfoHash & key,Sp<Value> val,DoneCallback cb,time_point created,bool permanent)346 DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent)
347 {
348     DHT_LOG.d(key, "[search %s]: put", key.to_c_str());
349     scheduler.syncTime();
350     if (not val) {
351         if (cb) cb(false, {});
352         return;
353     }
354     if (val->id == Value::INVALID_ID) {
355         crypto::random_device rdev;
356         std::uniform_int_distribution<Value::Id> rand_id {};
357         val->id = rand_id(rdev);
358     }
359     if (permanent) {
360         std::lock_guard<std::mutex> lock(searchLock_);
361         auto id = val->id;
362         auto& search = searches_[key];
363         auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN;
364         auto ok = std::make_shared<std::atomic_bool>(false);
365         search.puts.erase(id);
366         search.puts.emplace(std::piecewise_construct,
367             std::forward_as_tuple(id),
368             std::forward_as_tuple(val, scheduler.add(nextRefresh, [this, key, id, ok]{
369                 std::lock_guard<std::mutex> lock(searchLock_);
370                 auto s = searches_.find(key);
371                 if (s != searches_.end()) {
372                     auto p = s->second.puts.find(id);
373                     if (p != s->second.puts.end()) {
374                         doPut(key, p->second.value,
375                         [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
376                             *ok = result;
377                         }, time_point::max(), true);
378                         scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
379                     }
380                 }
381             }), ok));
382     }
383     doPut(key, val, std::move(cb), created, permanent);
384 }
385 
386 void
doPut(const InfoHash & key,Sp<Value> val,DoneCallback cb,time_point,bool permanent)387 DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent)
388 {
389     DHT_LOG.d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str());
390     restbed::Uri uri(serverHost_ + "/" + key.toString());
391     auto req = std::make_shared<restbed::Request>(uri);
392     req->set_method("POST");
393 
394     auto json = val->toJson();
395     if (permanent) {
396         if (deviceKey_.empty()) {
397             json["permanent"] = true;
398         } else {
399 #ifdef OPENDHT_PUSH_NOTIFICATIONS
400             Json::Value refresh;
401             getPushRequest(refresh);
402             json["permanent"] = refresh;
403 #else
404             json["permanent"] = true;
405 #endif
406         }
407     }
408     Json::StreamWriterBuilder wbuilder;
409     wbuilder["commentStyle"] = "None";
410     wbuilder["indentation"] = "";
411     auto body = Json::writeString(wbuilder, json) + "\n";
412     req->set_body(body);
413     req->set_header("Content-Length", std::to_string(body.size()));
414 
415     auto finished = std::make_shared<std::atomic_bool>(false);
416     Operation o;
417     o.req = req;
418     o.finished = finished;
419     o.thread = std::thread([=](){
420         auto ok = std::make_shared<std::atomic_bool>(true);
421         try {
422             restbed::Http::async(req,
423                 [ok](const std::shared_ptr<restbed::Request>& /*req*/,
424                             const std::shared_ptr<restbed::Response>& reply) {
425                 auto code = reply->get_status_code();
426 
427                 if (code == 200) {
428                     restbed::Http::fetch("\n", reply);
429                     std::string body;
430                     reply->get_body(body);
431                     reply->set_body(""); // Reset the body for the next fetch
432 
433                     try {
434                         std::string err;
435                         Json::Value json;
436                         Json::CharReaderBuilder rbuilder;
437                         auto* char_data = reinterpret_cast<const char*>(&body[0]);
438                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
439                         if (not reader->parse(char_data, char_data + body.size(), &json, &err))
440                             *ok = false;
441                     } catch (...) {
442                         *ok = false;
443                     }
444                 } else {
445                     *ok = false;
446                 }
447             }).wait();
448         } catch(const std::exception& e) {
449             *ok = false;
450         }
451         if (cb) {
452             {
453                 std::lock_guard<std::mutex> lock(lockCallbacks);
454                 callbacks_.emplace_back([=](){
455                     cb(*ok, {});
456                 });
457             }
458             loopSignal_();
459         }
460         if (!ok) {
461             // Connection failed, update connectivity
462             opFailed();
463         }
464         *finished = true;
465     });
466     {
467         std::lock_guard<std::mutex> lock(lockOperations_);
468         operations_.emplace_back(std::move(o));
469     }
470 }
471 
472 /**
473  * Get data currently being put at the given hash.
474  */
475 std::vector<Sp<Value>>
getPut(const InfoHash & key) const476 DhtProxyClient::getPut(const InfoHash& key) const {
477     std::vector<Sp<Value>> ret;
478     auto search = searches_.find(key);
479     if (search != searches_.end()) {
480         ret.reserve(search->second.puts.size());
481         for (const auto& put : search->second.puts)
482             ret.emplace_back(put.second.value);
483     }
484     return ret;
485 }
486 
487 /**
488  * Get data currently being put at the given hash with the given id.
489  */
490 Sp<Value>
getPut(const InfoHash & key,const Value::Id & id) const491 DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const {
492     auto search = searches_.find(key);
493     if (search == searches_.end())
494         return {};
495     auto val = search->second.puts.find(id);
496     if (val == search->second.puts.end())
497         return {};
498     return val->second.value;
499 }
500 
501 /**
502  * Stop any put/announce operation at the given location,
503  * for the value with the given id.
504  */
505 bool
cancelPut(const InfoHash & key,const Value::Id & id)506 DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id)
507 {
508     auto search = searches_.find(key);
509     if (search == searches_.end())
510         return false;
511     DHT_LOG.d(key, "[search %s] cancel put", key.to_c_str());
512     return search->second.puts.erase(id) > 0;
513 }
514 
515 NodeStats
getNodesStats(sa_family_t af) const516 DhtProxyClient::getNodesStats(sa_family_t af) const
517 {
518     return af == AF_INET ? stats4_ : stats6_;
519 }
520 
521 void
getProxyInfos()522 DhtProxyClient::getProxyInfos()
523 {
524     DHT_LOG.d("Requesting proxy server node information");
525     std::lock_guard<std::mutex> l(statusLock_);
526 
527     auto infoState = std::make_shared<InfoState>();
528     if (infoState_)
529         infoState_->cancel = true;
530     infoState_ = infoState;
531 
532     {
533         std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
534         if (statusIpv4_ == NodeStatus::Disconnected)
535             statusIpv4_ = NodeStatus::Connecting;
536         if (statusIpv6_ == NodeStatus::Disconnected)
537             statusIpv6_ = NodeStatus::Connecting;
538     }
539 
540     // A node can have a Ipv4 and a Ipv6. So, we need to retrieve all public ips
541     auto serverHost = serverHost_;
542 
543     // Try to contact the proxy and set the status to connected when done.
544     // will change the connectivity status
545     if (statusThread_.joinable()) {
546         try {
547             statusThread_.detach();
548             statusThread_ = {};
549         } catch (const std::exception& e) {
550             DHT_LOG.e("Error detaching thread: %s", e.what());
551         }
552     }
553     statusThread_ = std::thread([this, serverHost, infoState]{
554         try {
555             auto endpointStr = serverHost;
556             auto protocol = std::string(proxy::HTTP_PROTO);
557             auto protocolIdx = serverHost.find("://");
558             if (protocolIdx != std::string::npos) {
559                 protocol = endpointStr.substr(0, protocolIdx + 3);
560                 endpointStr = endpointStr.substr(protocolIdx + 3);
561             }
562             auto hostAndService = splitPort(endpointStr);
563             auto resolved_proxies = SockAddr::resolve(hostAndService.first, hostAndService.second);
564             std::vector<std::future<Sp<restbed::Response>>> reqs;
565             reqs.reserve(resolved_proxies.size());
566             for (const auto& resolved_proxy: resolved_proxies) {
567                 auto server = resolved_proxy.toString();
568                 if (resolved_proxy.getFamily() == AF_INET6) {
569                     // HACK restbed seems to not correctly handle directly http://[ipv6]
570                     // See https://github.com/Corvusoft/restbed/issues/290.
571                     server = endpointStr;
572                 }
573                 restbed::Uri uri(protocol + server + "/");
574                 auto req = std::make_shared<restbed::Request>(uri);
575                 if (infoState->cancel)
576                     return;
577                 reqs.emplace_back(restbed::Http::async(req,
578                     [this, resolved_proxy, infoState](
579                                 const std::shared_ptr<restbed::Request>&,
580                                 const std::shared_ptr<restbed::Response>& reply)
581                 {
582                     auto code = reply->get_status_code();
583                     Json::Value proxyInfos;
584                     if (code == 200) {
585                         restbed::Http::fetch("\n", reply);
586                         auto& state = *infoState;
587                         if (state.cancel) return;
588                         std::string body;
589                         reply->get_body(body);
590 
591                         std::string err;
592                         Json::CharReaderBuilder rbuilder;
593                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
594                         try {
595                             reader->parse(body.data(), body.data() + body.size(), &proxyInfos, &err);
596                         } catch (...) {
597                             return;
598                         }
599                         auto family = resolved_proxy.getFamily();
600                         if      (family == AF_INET)  state.ipv4++;
601                         else if (family == AF_INET6) state.ipv6++;
602                         if (not state.cancel)
603                             onProxyInfos(proxyInfos, family);
604                     }
605                 }));
606             }
607             for (auto& r : reqs)
608                 r.get();
609             reqs.clear();
610         } catch (const std::exception& e) {
611             DHT_LOG.e("Error sending proxy info request: %s", e.what());
612         }
613         const auto& state = *infoState;
614         if (state.cancel) return;
615         if (state.ipv4 == 0) onProxyInfos(Json::Value{}, AF_INET);
616         if (state.ipv6 == 0) onProxyInfos(Json::Value{}, AF_INET6);
617     });
618 }
619 
620 void
onProxyInfos(const Json::Value & proxyInfos,sa_family_t family)621 DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, sa_family_t family)
622 {
623     if (isDestroying_)
624         return;
625     std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
626     auto oldStatus = std::max(statusIpv4_, statusIpv6_);
627     auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_;
628     if (not proxyInfos.isMember("node_id")) {
629         DHT_LOG.e("Proxy info request failed for %s", family == AF_INET ? "IPv4" : "IPv6");
630         status = NodeStatus::Disconnected;
631     } else {
632         DHT_LOG.d("Got proxy reply for %s", family == AF_INET ? "IPv4" : "IPv6");
633         try {
634             myid = InfoHash(proxyInfos["node_id"].asString());
635             stats4_ = NodeStats(proxyInfos["ipv4"]);
636             stats6_ = NodeStats(proxyInfos["ipv6"]);
637             if (stats4_.good_nodes + stats6_.good_nodes)
638                 status = NodeStatus::Connected;
639             else if (stats4_.dubious_nodes + stats6_.dubious_nodes)
640                 status = NodeStatus::Connecting;
641             else
642                 status = NodeStatus::Disconnected;
643 
644             auto publicIp = parsePublicAddress(proxyInfos["public_ip"]);
645             auto publicFamily = publicIp.getFamily();
646             if (publicFamily == AF_INET)
647                 publicAddressV4_ = publicIp;
648             else if (publicFamily == AF_INET6)
649                 publicAddressV6_ = publicIp;
650         } catch (const std::exception& e) {
651             DHT_LOG.w("Error processing proxy infos: %s", e.what());
652         }
653     }
654 
655     auto newStatus = std::max(statusIpv4_, statusIpv6_);
656     if (newStatus == NodeStatus::Connected) {
657         if (oldStatus == NodeStatus::Disconnected || oldStatus == NodeStatus::Connecting) {
658             scheduler.edit(listenerRestart, scheduler.time());
659         }
660         scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(15));
661     }
662     else if (newStatus == NodeStatus::Disconnected) {
663         scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(1));
664     }
665     loopSignal_();
666 }
667 
668 SockAddr
parsePublicAddress(const Json::Value & val)669 DhtProxyClient::parsePublicAddress(const Json::Value& val)
670 {
671     auto public_ip = val.asString();
672     auto hostAndService = splitPort(public_ip);
673     auto sa = SockAddr::resolve(hostAndService.first);
674     if (sa.empty()) return {};
675     return sa.front().getMappedIPv4();
676 }
677 
678 std::vector<SockAddr>
getPublicAddress(sa_family_t family)679 DhtProxyClient::getPublicAddress(sa_family_t family)
680 {
681     std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
682     std::vector<SockAddr> result;
683     if (publicAddressV6_ && family != AF_INET) result.emplace_back(publicAddressV6_);
684     if (publicAddressV4_ && family != AF_INET6) result.emplace_back(publicAddressV4_);
685     return result;
686 }
687 
688 size_t
listen(const InfoHash & key,ValueCallback cb,Value::Filter filter,Where where)689 DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) {
690     DHT_LOG.d(key, "[search %s]: listen", key.to_c_str());
691     auto& search = searches_[key];
692     auto query = std::make_shared<Query>(Select{}, std::move(where));
693     auto token = search.ops.listen(cb, query, filter, [this, key](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t {
694         scheduler.syncTime();
695         restbed::Uri uri(serverHost_ + "/" + key.toString());
696         std::lock_guard<std::mutex> lock(searchLock_);
697         // Find search
698         auto search = searches_.find(key);
699         if (search == searches_.end()) {
700             DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str());
701             return 0;
702         }
703         DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe");
704 
705         // Add listener
706         auto req = std::make_shared<restbed::Request>(uri);
707         auto token = ++listenerToken_;
708         auto l = search->second.listeners.find(token);
709         if (l == search->second.listeners.end()) {
710             l = search->second.listeners.emplace(std::piecewise_construct,
711                     std::forward_as_tuple(token),
712                     std::forward_as_tuple(std::move(cb), req)).first;
713         } else {
714             if (l->second.state)
715                 l->second.state->cancel = true;
716             l->second.req = req;
717         }
718 
719         // Add callback
720         auto state = std::make_shared<ListenState>();
721         l->second.state = state;
722         l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
723             if (state->cancel)
724                 return false;
725             std::lock_guard<std::mutex> lock(searchLock_);
726             auto s = searches_.find(key);
727             if (s != searches_.end()) {
728                 auto l = s->second.listeners.find(token);
729                 if (l != s->second.listeners.end()) {
730                     return l->second.cache.onValue(values, expired);
731                 }
732             }
733             return false;
734         };
735 
736         auto vcb = l->second.cb;
737 
738         if (not deviceKey_.empty()) {
739             // Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason)
740             l->second.refreshJob = scheduler.add(scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN, [this, key, token, state] {
741                 if (state->cancel)
742                     return;
743                 std::lock_guard<std::mutex> lock(searchLock_);
744                 auto s = searches_.find(key);
745                 if (s != searches_.end()) {
746                     auto l = s->second.listeners.find(token);
747                     if (l != s->second.listeners.end()) {
748                         resubscribe(key, l->second);
749                     }
750                 }
751             });
752         }
753 
754         // Send listen to servers
755         l->second.thread = std::thread([this, req, vcb, state]() {
756             sendListen(req, vcb, state,
757                     deviceKey_.empty() ? ListenMethod::LISTEN : ListenMethod::SUBSCRIBE);
758         });
759         return token;
760     });
761     return token;
762 }
763 
764 bool
cancelListen(const InfoHash & key,size_t gtoken)765 DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) {
766     scheduler.syncTime();
767     DHT_LOG.d(key, "[search %s]: cancelListen %zu", key.to_c_str(), gtoken);
768     auto it = searches_.find(key);
769     if (it == searches_.end())
770         return false;
771     auto& ops = it->second.ops;
772     bool canceled = ops.cancelListen(gtoken, scheduler.time());
773     if (not it->second.opExpirationJob) {
774         it->second.opExpirationJob = scheduler.add(time_point::max(), [this,key](){
775             auto it = searches_.find(key);
776             if (it != searches_.end()) {
777                 auto next = it->second.ops.expire(scheduler.time(), [this,key](size_t ltoken){
778                     doCancelListen(key, ltoken);
779                 });
780                 if (next != time_point::max()) {
781                     scheduler.edit(it->second.opExpirationJob, next);
782                 }
783             }
784         });
785     }
786     scheduler.edit(it->second.opExpirationJob, ops.getExpiration());
787     loopSignal_();
788     return canceled;
789 }
790 
sendListen(const std::shared_ptr<restbed::Request> & req,const ValueCallback & cb,const Sp<ListenState> & state,ListenMethod method)791 void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
792                                 const ValueCallback &cb,
793                                 const Sp<ListenState> &state,
794                                 ListenMethod method) {
795     auto settings = std::make_shared<restbed::Settings>();
796     if (method != ListenMethod::LISTEN) {
797         req->set_method("SUBSCRIBE");
798     } else {
799         std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
800         settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
801         req->set_method("LISTEN");
802     }
803     try {
804 #ifdef OPENDHT_PUSH_NOTIFICATIONS
805         if (method != ListenMethod::LISTEN)
806         fillBody(req, method == ListenMethod::RESUBSCRIBE);
807     #endif
808         restbed::Http::async(req,
809               [this, cb, state](const std::shared_ptr<restbed::Request>& req,
810                                         const std::shared_ptr<restbed::Response>& reply)
811         {
812             auto code = reply->get_status_code();
813             if (code == 200) {
814                 try {
815                     while (restbed::Http::is_open(req) and not state->cancel) {
816                         restbed::Http::fetch("\n", reply);
817                         if (state->cancel)
818                             break;
819                         std::string body;
820                         reply->get_body(body);
821                         reply->set_body(""); // Reset the body for the next fetch
822 
823                         Json::Value json;
824                         std::string err;
825                         Json::CharReaderBuilder rbuilder;
826                         auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
827                         if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
828                             if (json.size() == 0) {
829                                 // Empty value, it's the end
830                                 break;
831                             }
832                             auto expired = json.get("expired", Json::Value(false)).asBool();
833                             auto value = std::make_shared<Value>(json);
834                             if (cb) {
835                                 {
836                                     std::lock_guard<std::mutex> lock(lockCallbacks);
837                                     callbacks_.emplace_back([cb, value, state, expired]() {
838                                         if (not state->cancel and not cb({value}, expired))
839                                             state->cancel = true;
840                                     });
841                                 }
842                                 loopSignal_();
843                             }
844                         }
845                     }
846                 } catch (const std::exception& e) {
847                     if (not state->cancel) {
848                         DHT_LOG.w("Listen closed by the proxy server: %s", e.what());
849                         state->ok = false;
850                     }
851                 }
852             } else {
853                 state->ok = false;
854             }
855         }, settings).get();
856     } catch (const std::exception& e) {
857         state->ok = false;
858     }
859     auto& s = *state;
860     if (not s.ok and not s.cancel)
861         opFailed();
862 }
863 
864 bool
doCancelListen(const InfoHash & key,size_t ltoken)865 DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
866 {
867     std::lock_guard<std::mutex> lock(searchLock_);
868 
869     auto search = searches_.find(key);
870     if (search == searches_.end())
871         return false;
872 
873     auto it = search->second.listeners.find(ltoken);
874     if (it == search->second.listeners.end())
875         return false;
876 
877     DHT_LOG.d(key, "[search %s] cancel listen", key.to_c_str());
878 
879     auto& listener = it->second;
880     listener.state->cancel = true;
881     if (not deviceKey_.empty()) {
882         // First, be sure to have a token
883         if (listener.thread.joinable()) {
884             listener.thread.join();
885         }
886         // UNSUBSCRIBE
887         restbed::Uri uri(serverHost_ + "/" + key.toString());
888         auto req = std::make_shared<restbed::Request>(uri);
889         req->set_method("UNSUBSCRIBE");
890         // fill request body
891         Json::Value body;
892         body["key"] = deviceKey_;
893         body["client_id"] = pushClientId_;
894         Json::StreamWriterBuilder wbuilder;
895         wbuilder["commentStyle"] = "None";
896         wbuilder["indentation"] = "";
897         auto content = Json::writeString(wbuilder, body) + "\n";
898         std::replace(content.begin(), content.end(), '\n', ' ');
899         req->set_body(content);
900         req->set_header("Content-Length", std::to_string(content.size()));
901         try {
902             restbed::Http::async(req, [](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>&){});
903         } catch (const std::exception& e) {
904             DHT_LOG.w(key, "[search %s] cancelListen: Http::async failed: %s", key.to_c_str(), e.what());
905         }
906     } else {
907         // Just stop the request
908         if (listener.thread.joinable()) {
909             // Close connection to stop listener
910             if (listener.req) {
911                 try {
912                     restbed::Http::close(listener.req);
913                 } catch (const std::exception& e) {
914                     DHT_LOG.w("Error closing socket: %s", e.what());
915                 }
916                 listener.req.reset();
917             }
918             listener.thread.join();
919         }
920     }
921     search->second.listeners.erase(it);
922     DHT_LOG.d(key, "[search %s] cancelListen: %zu listener remaining", key.to_c_str(), search->second.listeners.size());
923     if (search->second.listeners.empty()) {
924         searches_.erase(search);
925     }
926 
927     return true;
928 }
929 
930 void
opFailed()931 DhtProxyClient::opFailed()
932 {
933     DHT_LOG.e("Proxy request failed");
934     {
935         std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
936         statusIpv4_ = NodeStatus::Disconnected;
937         statusIpv6_ = NodeStatus::Disconnected;
938     }
939     getConnectivityStatus();
940     loopSignal_();
941 }
942 
943 void
getConnectivityStatus()944 DhtProxyClient::getConnectivityStatus()
945 {
946     if (!isDestroying_) getProxyInfos();
947 }
948 
949 void
restartListeners()950 DhtProxyClient::restartListeners()
951 {
952     if (isDestroying_) return;
953     std::lock_guard<std::mutex> lock(searchLock_);
954     DHT_LOG.d("Refresh permanent puts");
955     for (auto& search : searches_) {
956         for (auto& put : search.second.puts) {
957             if (!*put.second.ok) {
958                 auto ok = put.second.ok;
959                 doPut(search.first, put.second.value,
960                 [ok](bool result, const std::vector<std::shared_ptr<dht::Node> >&){
961                     *ok = result;
962                 }, time_point::max(), true);
963                 scheduler.edit(put.second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
964             }
965         }
966     }
967     if (not deviceKey_.empty()) {
968         DHT_LOG.d("resubscribe due to a connectivity change");
969         // Connectivity changed, refresh all subscribe
970         for (auto& search : searches_)
971             for (auto& listener : search.second.listeners)
972                 if (!listener.second.state->ok)
973                     resubscribe(search.first, listener.second);
974         return;
975     }
976     DHT_LOG.d("Restarting listeners");
977     for (auto& search: searches_) {
978         for (auto& l: search.second.listeners) {
979             auto& listener = l.second;
980             if (auto state = listener.state)
981                 state->cancel = true;
982             if (listener.req) {
983                 try {
984                     restbed::Http::close(listener.req);
985                 } catch (const std::exception& e) {
986                     DHT_LOG.w("Error closing socket: %s", e.what());
987                 }
988                 listener.req.reset();
989             }
990         }
991     }
992     for (auto& search: searches_) {
993         for (auto& l: search.second.listeners) {
994             auto& listener = l.second;
995             auto state = listener.state;
996             if (listener.thread.joinable()) {
997                 listener.thread.join();
998             }
999             // Redo listen
1000             state->cancel = false;
1001             state->ok = true;
1002             auto cb = listener.cb;
1003             restbed::Uri uri(serverHost_ + "/" + search.first.toString());
1004             auto req = std::make_shared<restbed::Request>(uri);
1005             req->set_method("LISTEN");
1006             listener.req = req;
1007             listener.thread = std::thread([this, req, cb, state]() {
1008                 sendListen(req, cb, state);
1009             });
1010         }
1011     }
1012 }
1013 
1014 void
pushNotificationReceived(const std::map<std::string,std::string> & notification)1015 DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification)
1016 {
1017 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1018     scheduler.syncTime();
1019     {
1020         // If a push notification is received, the proxy is up and running
1021         std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
1022         statusIpv4_ = NodeStatus::Connected;
1023         statusIpv6_ = NodeStatus::Connected;
1024     }
1025     try {
1026         std::lock_guard<std::mutex> lock(searchLock_);
1027         auto timeout = notification.find("timeout");
1028         if (timeout != notification.cend()) {
1029             InfoHash key(timeout->second);
1030             auto& search = searches_.at(key);
1031             auto vidIt = notification.find("vid");
1032             if (vidIt != notification.end()) {
1033                 // Refresh put
1034                 auto vid = std::stoull(vidIt->second);
1035                 auto& put = search.puts.at(vid);
1036                 scheduler.edit(put.refreshJob, scheduler.time());
1037                 loopSignal_();
1038             } else {
1039                 // Refresh listen
1040                 for (auto& list : search.listeners)
1041                     resubscribe(key, list.second);
1042             }
1043         } else {
1044             auto key = InfoHash(notification.at("key"));
1045             auto& search = searches_.at(key);
1046             for (auto& list : search.listeners) {
1047                 if (list.second.state->cancel)
1048                     continue;
1049                 DHT_LOG.d(key, "[search %s] handling push notification", key.to_c_str());
1050                 auto expired = notification.find("exp");
1051                 auto token = list.first;
1052                 auto state = list.second.state;
1053                 if (expired == notification.end()) {
1054                     auto cb = list.second.cb;
1055                     auto oldValues = list.second.cache.getValues();
1056                     get(key, [cb](const std::vector<Sp<Value>>& vals) {
1057                         return cb(vals, false);
1058                     }, [cb, oldValues](bool /*ok*/) {
1059                         // Decrement old values refcount to expire values not present in the new list
1060                         cb(oldValues, true);
1061                     });
1062                 } else {
1063                     std::stringstream ss(expired->second);
1064                     std::vector<Value::Id> ids;
1065                     while(ss.good()){
1066                         std::string substr;
1067                         getline(ss, substr, ',');
1068                         ids.emplace_back(std::stoull(substr));
1069                     }
1070                     {
1071                         std::lock_guard<std::mutex> lock(lockCallbacks);
1072                         callbacks_.emplace_back([this, key, token, state, ids]() {
1073                             if (state->cancel) return;
1074                             std::lock_guard<std::mutex> lock(searchLock_);
1075                             auto s = searches_.find(key);
1076                             if (s == searches_.end()) return;
1077                             auto l = s->second.listeners.find(token);
1078                             if (l == s->second.listeners.end()) return;
1079                             if (not state->cancel and not l->second.cache.onValuesExpired(ids))
1080                                 state->cancel = true;
1081                         });
1082                     }
1083                     loopSignal_();
1084                 }
1085             }
1086         }
1087     } catch (const std::exception& e) {
1088         DHT_LOG.e("Error handling push notification: %s", e.what());
1089     }
1090 #else
1091     (void) notification;
1092 #endif
1093 }
1094 
1095 void
resubscribe(const InfoHash & key,Listener & listener)1096 DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
1097 {
1098 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1099     if (deviceKey_.empty()) return;
1100     scheduler.syncTime();
1101     DHT_LOG.d(key, "[search %s] resubscribe push listener", key.to_c_str());
1102     // Subscribe
1103     auto state = listener.state;
1104     if (listener.thread.joinable()) {
1105         state->cancel = true;
1106         if (listener.req) {
1107             try {
1108                 restbed::Http::close(listener.req);
1109             } catch (const std::exception& e) {
1110                 DHT_LOG.w("Error closing socket: %s", e.what());
1111             }
1112             listener.req.reset();
1113         }
1114         listener.thread.join();
1115     }
1116     state->cancel = false;
1117     state->ok = true;
1118     auto req = std::make_shared<restbed::Request>(restbed::Uri {serverHost_ + "/" + key.toString()});
1119     req->set_method("SUBSCRIBE");
1120     listener.req = req;
1121     scheduler.edit(listener.refreshJob, scheduler.time() + proxy::OP_TIMEOUT - proxy::OP_MARGIN);
1122     auto vcb = listener.cb;
1123     listener.thread = std::thread([this, req, vcb, state]() {
1124         sendListen(req, vcb, state, ListenMethod::RESUBSCRIBE);
1125     });
1126 #else
1127     (void) key;
1128     (void) listener;
1129 #endif
1130 }
1131 
1132 #ifdef OPENDHT_PUSH_NOTIFICATIONS
1133 void
getPushRequest(Json::Value & body) const1134 DhtProxyClient::getPushRequest(Json::Value& body) const
1135 {
1136     body["key"] = deviceKey_;
1137     body["client_id"] = pushClientId_;
1138 #ifdef __ANDROID__
1139     body["platform"] = "android";
1140 #endif
1141 #ifdef __APPLE__
1142     body["platform"] = "apple";
1143 #endif
1144 }
1145 
1146 void
fillBody(std::shared_ptr<restbed::Request> req,bool resubscribe)1147 DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe)
1148 {
1149     // Fill body with
1150     // {
1151     //   "key":"device_key",
1152     // }
1153     Json::Value body;
1154     getPushRequest(body);
1155     if (resubscribe) {
1156         // This is the first listen, we want to retrieve previous values.
1157         body["refresh"] = true;
1158     }
1159     Json::StreamWriterBuilder wbuilder;
1160     wbuilder["commentStyle"] = "None";
1161     wbuilder["indentation"] = "";
1162     auto content = Json::writeString(wbuilder, body) + "\n";
1163     std::replace(content.begin(), content.end(), '\n', ' ');
1164     req->set_body(content);
1165     req->set_header("Content-Length", std::to_string(content.size()));
1166 }
1167 #endif // OPENDHT_PUSH_NOTIFICATIONS
1168 
1169 } // namespace dht
1170