1 /*
2  *  Copyright (C) 2014-2019 Savoir-faire Linux Inc.
3  *  Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  *              Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 
21 #include "dht.h"
22 #include "rng.h"
23 #include "search.h"
24 #include "storage.h"
25 #include "request.h"
26 
27 #include <msgpack.hpp>
28 
29 #include <algorithm>
30 #include <random>
31 #include <sstream>
32 #include <fstream>
33 
34 namespace dht {
35 
36 using namespace std::placeholders;
37 
38 constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
39 constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
40 constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME;
41 constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN;
42 
43 NodeStatus
getStatus(sa_family_t af) const44 Dht::getStatus(sa_family_t af) const
45 {
46     const auto& stats = getNodesStats(af);
47     if (stats.good_nodes)
48         return NodeStatus::Connected;
49     auto& ping = af == AF_INET ? pending_pings4 : pending_pings6;
50     if (ping or stats.getKnownNodes())
51         return NodeStatus::Connecting;
52     return NodeStatus::Disconnected;
53 }
54 
55 void
shutdown(ShutdownCallback cb)56 Dht::shutdown(ShutdownCallback cb)
57 {
58     if (not persistPath.empty())
59         saveState(persistPath);
60 
61     if (not maintain_storage) {
62         if (cb) cb();
63         return;
64     }
65 
66     // Last store maintenance
67     scheduler.syncTime();
68     auto remaining = std::make_shared<int>(0);
69     auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) {
70         --*remaining;
71         DHT_LOG.w("shuting down node: %u ops remaining", *remaining);
72         if (!*remaining && cb) { cb(); }
73     };
74 
75     for (auto& str : store)
76         *remaining += maintainStorage(str, true, str_donecb);
77 
78     if (!*remaining) {
79         DHT_LOG.w("shuting down node: %u ops remaining", *remaining);
80         if (cb)
81             cb();
82     }
83 }
84 
85 bool
isRunning(sa_family_t af) const86 Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }
87 
88 /* Every bucket contains an unordered list of nodes. */
89 const Sp<Node>
findNode(const InfoHash & id,sa_family_t af) const90 Dht::findNode(const InfoHash& id, sa_family_t af) const
91 {
92     if (const Bucket* b = findBucket(id, af))
93         for (const auto& n : b->nodes)
94             if (n->id == id) return n;
95     return {};
96 }
97 
98 /* Every bucket caches the address of a likely node.  Ping it. */
99 void
sendCachedPing(Bucket & b)100 Dht::sendCachedPing(Bucket& b)
101 {
102     if (b.cached)
103         DHT_LOG.d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str());
104     b.sendCachedPing(network_engine);
105 }
106 
107 std::vector<SockAddr>
getPublicAddress(sa_family_t family)108 Dht::getPublicAddress(sa_family_t family)
109 {
110     std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
111         return a.first > b.first;
112     });
113     std::vector<SockAddr> ret;
114     ret.reserve(!family ? reported_addr.size() : reported_addr.size()/2);
115     for (const auto& addr : reported_addr)
116         if (!family || family == addr.second.getFamily())
117             ret.emplace_back(addr.second);
118     return ret;
119 }
120 
121 bool
trySearchInsert(const Sp<Node> & node)122 Dht::trySearchInsert(const Sp<Node>& node)
123 {
124     const auto& now = scheduler.time();
125     if (not node) return false;
126 
127     auto& srs = searches(node->getFamily());
128     auto closest = srs.lower_bound(node->id);
129     bool inserted {false};
130 
131     // insert forward
132     for (auto it = closest; it != srs.end(); it++) {
133         auto& s = *it->second;
134         if (s.insertNode(node, now)) {
135             inserted = true;
136             scheduler.edit(s.nextSearchStep, now);
137         } else if (not s.expired and not s.done)
138             break;
139     }
140     // insert backward
141     for (auto it = closest; it != srs.begin();) {
142         --it;
143         auto& s = *it->second;
144         if (s.insertNode(node, now)) {
145             inserted = true;
146             scheduler.edit(s.nextSearchStep, now);
147         } else if (not s.expired and not s.done)
148             break;
149     }
150     return inserted;
151 }
152 
153 void
reportedAddr(const SockAddr & addr)154 Dht::reportedAddr(const SockAddr& addr)
155 {
156     auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& a){
157         return a.second == addr;
158     });
159     if (it == reported_addr.end()) {
160         if (reported_addr.size() < 32)
161             reported_addr.emplace_back(1, addr);
162     } else
163         it->first++;
164 }
165 
166 /* We just learnt about a node, not necessarily a new one.  Confirm is 1 if
167    the node sent a message, 2 if it sent us a reply. */
168 void
onNewNode(const Sp<Node> & node,int confirm)169 Dht::onNewNode(const Sp<Node>& node, int confirm)
170 {
171     const auto& now = scheduler.time();
172     auto& b = buckets(node->getFamily());
173     auto wasEmpty = confirm < 2 && b.grow_time < now - std::chrono::minutes(5);
174     if (b.onNewNode(node, confirm, now, myid, network_engine) or confirm) {
175         trySearchInsert(node);
176         if (wasEmpty) {
177             scheduler.edit(nextNodesConfirmation, now + std::chrono::seconds(1));
178         }
179     }
180 }
181 
182 /* Called periodically to purge known-bad nodes.  Note that we're very
183    conservative here: broken nodes in the table don't do much harm, we'll
184    recover as soon as we find better ones. */
185 void
expireBuckets(RoutingTable & list)186 Dht::expireBuckets(RoutingTable& list)
187 {
188     for (auto& b : list) {
189         bool changed = false;
190         b.nodes.remove_if([&changed](const Sp<Node>& n) {
191             if (n->isExpired()) {
192                 changed = true;
193                 return true;
194             }
195             return false;
196         });
197         if (changed)
198             sendCachedPing(b);
199     }
200 }
201 
202 void
expireSearches()203 Dht::expireSearches()
204 {
205     auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
206     auto expired = [&](std::pair<const InfoHash, Sp<Search>>& srp) {
207         auto& sr = *srp.second;
208         auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t;
209         if (b) {
210             DHT_LOG.d(srp.first, "[search %s] removing search", srp.first.toString().c_str());
211             sr.clear();
212             return b;
213         } else { return false; }
214     };
215     erase_if(searches4, expired);
216     erase_if(searches6, expired);
217 }
218 
219 void
searchNodeGetDone(const net::Request & req,net::RequestAnswer && answer,std::weak_ptr<Search> ws,Sp<Query> query)220 Dht::searchNodeGetDone(const net::Request& req,
221         net::RequestAnswer&& answer,
222         std::weak_ptr<Search> ws,
223         Sp<Query> query)
224 {
225     const auto& now = scheduler.time();
226     if (auto sr = ws.lock()) {
227         sr->insertNode(req.node, now, answer.ntoken);
228         if (auto srn = sr->getNode(req.node)) {
229             /* all other get requests which are satisfied by this answer
230                should not be sent anymore */
231             for (auto& g : sr->callbacks) {
232                 auto& q = g.second.query;
233                 if (q->isSatisfiedBy(*query) and q != query) {
234                     auto dummy_req = std::make_shared<net::Request>();
235                     dummy_req->cancel();
236                     srn->getStatus[q] = std::move(dummy_req);
237                 }
238             }
239             auto syncTime = srn->getSyncTime(scheduler.time());
240             if (srn->syncJob)
241                 scheduler.edit(srn->syncJob, syncTime);
242             else
243                 srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr));
244         }
245         onGetValuesDone(req.node, answer, sr, query);
246     }
247 }
248 
249 void
searchNodeGetExpired(const net::Request & status,bool over,std::weak_ptr<Search> ws,Sp<Query> query)250 Dht::searchNodeGetExpired(const net::Request& status,
251         bool over,
252         std::weak_ptr<Search> ws,
253         Sp<Query> query)
254 {
255     if (auto sr = ws.lock()) {
256         if (auto srn = sr->getNode(status.node)) {
257             srn->candidate = not over;
258             if (over)
259                 srn->getStatus.erase(query);
260         }
261         scheduler.edit(sr->nextSearchStep, scheduler.time());
262     }
263 }
264 
paginate(std::weak_ptr<Search> ws,Sp<Query> query,SearchNode * n)265 void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) {
266     auto sr = ws.lock();
267     if (not sr) return;
268     auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {});
269     auto onSelectDone = [this,ws,query](const net::Request& status,
270                                         net::RequestAnswer&& answer) mutable {
271         // Retrieve search
272         auto sr = ws.lock();
273         if (not sr) return;
274         const auto& id = sr->id;
275         // Retrieve search node
276         auto sn = sr->getNode(status.node);
277         if (not sn) return;
278         // backward compatibility
279         if (answer.fields.empty()) {
280             searchNodeGetDone(status, std::move(answer), ws, query);
281             return;
282         }
283         for (const auto& fvi : answer.fields) {
284             try {
285                 auto vid = fvi->index.at(Value::Field::Id).getInt();
286                 if (vid == Value::INVALID_ID) continue;
287                 auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid));
288                 sn->pagination_queries[query].push_back(query_for_vid);
289                 DHT_LOG.d(id, sn->node->id, "[search %s] [node %s] sending %s",
290                         id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str());
291                 sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node,
292                         id,
293                         *query_for_vid,
294                         -1,
295                         std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
296                         std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid)
297                         );
298             } catch (const std::out_of_range&) {
299                 DHT_LOG.e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\
300                         "'SELECT id' request...",
301                         id.toString().c_str(), sn->node->toString().c_str());
302             }
303         }
304     };
305     /* add pagination query key for tracking ongoing requests. */
306     n->pagination_queries[query].push_back(select_q);
307 
308     DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending %s",
309             sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str());
310     n->getStatus[select_q] = network_engine.sendGetValues(n->node,
311             sr->id,
312             *select_q,
313             -1,
314             onSelectDone,
315             std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q)
316             );
317 }
318 
319 Dht::SearchNode*
searchSendGetValues(Sp<Search> sr,SearchNode * pn,bool update)320 Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update)
321 {
322     if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES)
323         return nullptr;
324 
325     const auto& now = scheduler.time();
326 
327     std::weak_ptr<Search> ws = sr;
328     auto cb = sr->callbacks.begin();
329     static const auto ANY_QUERY = std::make_shared<Query>(Select {}, Where {}, true);
330     do { /* for all requests to send */
331         SearchNode* n = nullptr;
332         auto& query = sr->callbacks.empty() ? ANY_QUERY : cb->second.query;
333         const time_point up = (not sr->callbacks.empty() and update)
334                                 ? sr->getLastGetTime(*query)
335                                 : time_point::min();
336 
337         if (pn and pn->canGet(now, up, query)) {
338             n = pn;
339         } else {
340             for (auto& sn : sr->nodes) {
341                 if (sn.canGet(now, up, query)) {
342                     n = &sn;
343                     break;
344                 }
345             }
346         }
347 
348         if (sr->callbacks.empty()) { /* 'find_node' request */
349             if (not n)
350                 return nullptr;
351 
352             /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'",
353                     sr->id.toString().c_str(), n->node->toString().c_str());*/
354             n->getStatus[query] = network_engine.sendFindNode(n->node,
355                     sr->id,
356                     -1,
357                     std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
358                     std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
359 
360         } else { /* 'get' request */
361             if (not n)
362                 continue;
363 
364             if (query and not query->select.getSelection().empty()) {
365                 /* The request contains a select. No need to paginate... */
366                 /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'",
367                         sr->id.toString().c_str(), n->node->toString().c_str());*/
368                 n->getStatus[query] = network_engine.sendGetValues(n->node,
369                         sr->id,
370                         *query,
371                         -1,
372                         std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
373                         std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
374             } else
375                 paginate(ws, query, n);
376         }
377 
378         /* We only try to send one request. return. */
379         return n;
380 
381     } while (++cb != sr->callbacks.end());
382 
383     /* no request were sent */
384     return nullptr;
385 }
386 
searchSendAnnounceValue(const Sp<Search> & sr)387 void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
388     if (sr->announce.empty())
389         return;
390     unsigned i = 0;
391     std::weak_ptr<Search> ws = sr;
392 
393     auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer)
394     { /* when put done */
395         if (auto sr = ws.lock()) {
396             onAnnounceDone(req.node, answer, sr);
397             searchStep(sr);
398         }
399     };
400 
401     auto onExpired = [this,ws](const net::Request&, bool over)
402     { /* when put expired */
403         if (over)
404             if (auto sr = ws.lock())
405                 scheduler.edit(sr->nextSearchStep, scheduler.time());
406     };
407 
408     auto onSelectDone =
409     [this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable
410     { /* on probing done */
411         auto sr = ws.lock();
412         if (not sr) return;
413         const auto& now = scheduler.time();
414         sr->insertNode(req.node, scheduler.time(), answer.ntoken);
415         auto sn = sr->getNode(req.node);
416         if (not sn) return;
417 
418         if (not sn->isSynced(now)) {
419             /* Search is now unsynced. Let's call searchStep to sync again. */
420             scheduler.edit(sr->nextSearchStep, now);
421             return;
422         }
423         for (auto& a : sr->announce) {
424             if (sn->getAnnounceTime(a.value->id) > now)
425                 continue;
426             bool hasValue {false};
427             uint16_t seq_no = 0;
428             try {
429                 const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(),
430                         [&a](const Sp<FieldValueIndex>& i){
431                             return i->index.at(Value::Field::Id).getInt() == a.value->id;
432                         });
433                 if (f != answer.fields.cend() and *f) {
434                     hasValue = true;
435                     seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt());
436                 }
437             } catch (std::out_of_range&) { }
438 
439             auto next_refresh_time = now + getType(a.value->type).expiration;
440             /* only put the value if the node doesn't already have it */
441             if (not hasValue or seq_no < a.value->seq) {
442                 DHT_LOG.d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
443                         sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
444                 auto created = a.permanent ? time_point::max() : a.created;
445                 sn->acked[a.value->id] = {
446                     network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired),
447                     next_refresh_time
448                 };
449             } else if (hasValue and a.permanent) {
450                 DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)",
451                         sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
452                 sn->acked[a.value->id] = {
453                     network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, onExpired),
454                     next_refresh_time
455                 };
456             } else {
457                 DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.",
458                         sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
459                 auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED);
460                 ack_req->reply_time = now;
461                 sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time);
462 
463                 /* step to clear announces */
464                 scheduler.edit(sr->nextSearchStep, now);
465             }
466             if (a.permanent) {
467                 scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] {
468                     if (auto sr = ws.lock()) {
469                         searchStep(sr);
470                     }
471                 });
472             }
473         }
474     };
475 
476     Sp<Query> probe_query {};
477     const auto& now = scheduler.time();
478     for (auto& n : sr->nodes) {
479         if (not n.isSynced(now))
480             continue;
481 
482         const auto& gs = n.probe_query ? n.getStatus.find(n.probe_query) : n.getStatus.cend();
483         if (gs != n.getStatus.cend() and gs->second and gs->second->pending()) {
484             continue;
485         }
486 
487         bool sendQuery = false;
488         for (auto& a : sr->announce) {
489             if (n.getAnnounceTime(a.value->id) <= now) {
490                 if (a.permanent) {
491                     sendQuery = true;
492                 } else {
493                     DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
494                             sr->id.toString().c_str(), n.node->toString().c_str(), a.value->id);
495                     n.acked[a.value->id] = {
496                         network_engine.sendAnnounceValue(n.node, sr->id, a.value, a.created, n.token, onDone, onExpired),
497                         now + getType(a.value->type).expiration
498                     };
499                 }
500             }
501         }
502 
503         if (sendQuery) {
504             if (not probe_query)
505                 probe_query = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum));
506             DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending %s",
507                     sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str());
508             n.probe_query = probe_query;
509             n.getStatus[probe_query] = network_engine.sendGetValues(n.node,
510                     sr->id,
511                     *probe_query,
512                     -1,
513                     onSelectDone,
514                     std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, probe_query));
515         }
516         if (not n.candidate and ++i == TARGET_NODES)
517             break;
518     }
519 }
520 
521 void
searchSynchedNodeListen(const Sp<Search> & sr,SearchNode & n)522 Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n)
523 {
524     std::weak_ptr<Search> ws = sr;
525     for (const auto& l : sr->listeners) {
526         const auto& query = l.second.query;
527         auto list_token = l.first;
528         if (n.getListenTime(query) > scheduler.time())
529             continue;
530         // DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'",
531         //        sr->id.toString().c_str(), n.node->toString().c_str());
532 
533         auto r = n.listenStatus.find(query);
534         if (r == n.listenStatus.end()) {
535             r = n.listenStatus.emplace(query, SearchNode::CachedListenStatus{
536                 [ws,list_token](const std::vector<Sp<Value>>& values, bool expired){
537                     if (auto sr = ws.lock()) {
538                         auto l = sr->listeners.find(list_token);
539                         if (l != sr->listeners.end()) {
540                             l->second.get_cb(values, expired);
541                         }
542                     }
543                 }, [ws,list_token] (ListenSyncStatus status) {
544                     if (auto sr = ws.lock()) {
545                         auto l = sr->listeners.find(list_token);
546                         if (l != sr->listeners.end()) {
547                             l->second.sync_cb(status);
548                         }
549                     }
550                 }
551             }).first;
552             auto node = n.node;
553             r->second.cacheExpirationJob = scheduler.add(time_point::max(), [this,ws,query,node]{
554                 if (auto sr = ws.lock()) {
555                     if (auto sn = sr->getNode(node)) {
556                         sn->expireValues(query, scheduler);
557                     }
558                 }
559             });
560         }
561         auto prev_req = r != n.listenStatus.end() ? r->second.req : nullptr;
562         auto new_req = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req,
563             [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable
564             { /* on done */
565                 if (auto sr = ws.lock()) {
566                     scheduler.edit(sr->nextSearchStep, scheduler.time());
567                     if (auto sn = sr->getNode(req.node)) {
568                         scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr));
569                         sn->onListenSynced(query);
570                     }
571                     onListenDone(req.node, answer, sr);
572                 }
573             },
574             [this,ws,query](const net::Request& req, bool over) mutable
575             { /* on request expired */
576                 if (auto sr = ws.lock()) {
577                     scheduler.edit(sr->nextSearchStep, scheduler.time());
578                     if (over)
579                         if (auto sn = sr->getNode(req.node))
580                             sn->listenStatus.erase(query);
581                 }
582             },
583             [this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable
584             { /* on new values */
585                 if (auto sr = ws.lock()) {
586                     scheduler.edit(sr->nextSearchStep, scheduler.time());
587                     sr->insertNode(node, scheduler.time(), answer.ntoken);
588                     if (auto sn = sr->getNode(node)) {
589                         sn->onValues(query, std::move(answer), types, scheduler);
590                     }
591                 }
592             }
593         );
594         // Here the request may have failed and the CachedListenStatus removed
595         r = n.listenStatus.find(query);
596         if (r != n.listenStatus.end()) {
597             r->second.req = new_req;
598         }
599     }
600 }
601 
602 /* When a search is in progress, we periodically call search_step to send
603    further requests. */
604 void
searchStep(Sp<Search> sr)605 Dht::searchStep(Sp<Search> sr)
606 {
607     if (not sr or sr->expired or sr->done) return;
608 
609     const auto& now = scheduler.time();
610     /*if (auto req_count = sr->currentlySolicitedNodeCount())
611         DHT_LOG.d(sr->id, "[search %s IPv%c] step (%d requests)",
612                 sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count);*/
613     sr->step_time = now;
614 
615     if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES)
616         refill(*sr);
617 
618     /* Check if the first TARGET_NODES (8) live nodes have replied. */
619     if (sr->isSynced(now)) {
620         if (not (sr->callbacks.empty() and sr->announce.empty())) {
621             // search is synced but some (newer) get operations are not complete
622             // Call callbacks when done
623             std::vector<Get> completed_gets;
624             for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
625                 if (sr->isDone(b->second)) {
626                     sr->setDone(b->second);
627                     completed_gets.emplace_back(std::move(b->second));
628                     b = sr->callbacks.erase(b);
629                 }
630                 else
631                     ++b;
632             }
633             // clear corresponding queries
634             for (const auto& get : completed_gets)
635                 for (auto& sn : sr->nodes) {
636                     sn.getStatus.erase(get.query);
637                     sn.pagination_queries.erase(get.query);
638                 }
639 
640             /* clearing callbacks for announced values */
641             sr->checkAnnounced();
642 
643             if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
644                 sr->setDone();
645         }
646 
647         // true if this node is part of the target nodes cluter.
648         /*bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;
649 
650         DHT_LOG_DBG("[search %s IPv%c] synced%s",
651                 sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");*/
652 
653         if (not sr->listeners.empty()) {
654             unsigned i = 0;
655             for (auto& n : sr->nodes) {
656                 if (not n.isSynced(now))
657                     continue;
658                 searchSynchedNodeListen(sr, n);
659                 if (not n.candidate and ++i == LISTEN_NODES)
660                     break;
661             }
662         }
663 
664         // Announce requests
665         searchSendAnnounceValue(sr);
666 
667         if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
668             sr->setDone();
669     }
670 
671     while (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES and searchSendGetValues(sr));
672 
673     if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(),
674                                                              static_cast<size_t>(SEARCH_MAX_BAD_NODES)))
675     {
676         DHT_LOG.w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
677         sr->expire();
678         connectivityChanged(sr->af);
679     }
680 
681     /* dumpSearch(*sr, std::cout); */
682 }
683 
refill(Dht::Search & sr)684 unsigned Dht::refill(Dht::Search& sr) {
685     const auto& now = scheduler.time();
686     sr.refill_time = now;
687     /* we search for up to SEARCH_NODES good nodes. */
688     auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES);
689 
690     if (cached_nodes.empty()) {
691         DHT_LOG.e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search",
692                 sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6');
693         return 0;
694     }
695 
696     unsigned inserted = 0;
697     for (auto& i : cached_nodes) {
698         /* try to insert the nodes. Search::insertNode will know how many to insert. */
699         if (sr.insertNode(i, now))
700             ++inserted;
701     }
702     DHT_LOG.d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache",
703             sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted);
704     return inserted;
705 }
706 
707 
708 /* Start a search. */
709 Sp<Dht::Search>
search(const InfoHash & id,sa_family_t af,GetCallback gcb,QueryCallback qcb,DoneCallback dcb,Value::Filter f,const Sp<Query> & q)710 Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, const Sp<Query>& q)
711 {
712     if (!isRunning(af)) {
713         DHT_LOG.e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
714         if (dcb)
715             dcb(false, {});
716         return {};
717     }
718 
719     auto& srs = searches(af);
720     const auto& srp = srs.find(id);
721     Sp<Search> sr {};
722 
723     if (srp != srs.end()) {
724         sr = srp->second;
725         sr->done = false;
726         sr->expired = false;
727     } else {
728         if (searches4.size() + searches6.size() < MAX_SEARCHES) {
729             sr = std::make_shared<Search>();
730             srs.emplace(id, sr);
731         } else {
732             for (auto it = srs.begin(); it!=srs.end();) {
733                 auto& s = *it->second;
734                 if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
735                     sr = it->second;
736                     break;
737                 }
738             }
739             if (not sr) {
740                 DHT_LOG.e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
741                 return {};
742             }
743         }
744         sr->af = af;
745         sr->tid = search_id++;
746         sr->step_time = time_point::min();
747         sr->id = id;
748         sr->done = false;
749         sr->expired = false;
750         sr->nodes.clear();
751         sr->nodes.reserve(SEARCH_NODES+1);
752         sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr));
753         DHT_LOG.w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
754         if (search_id == 0)
755             search_id++;
756     }
757 
758     sr->get(f, q, qcb, gcb, dcb, scheduler);
759     refill(*sr);
760 
761     return sr;
762 }
763 
764 void
announce(const InfoHash & id,sa_family_t af,Sp<Value> value,DoneCallback callback,time_point created,bool permanent)765 Dht::announce(const InfoHash& id,
766         sa_family_t af,
767         Sp<Value> value,
768         DoneCallback callback,
769         time_point created,
770         bool permanent)
771 {
772     auto& srs = searches(af);
773     auto srp = srs.find(id);
774     auto sr = srp == srs.end() ? search(id, af) : srp->second;
775     if (!sr) {
776         if (callback)
777             callback(false, {});
778         return;
779     }
780     sr->done = false;
781     sr->expired = false;
782     auto a_sr = std::find_if(sr->announce.begin(), sr->announce.end(), [&](const Announce& a){
783         return a.value->id == value->id;
784     });
785     if (a_sr == sr->announce.end()) {
786         sr->announce.emplace_back(Announce {permanent, value, created, callback});
787         for (auto& n : sr->nodes) {
788             n.probe_query.reset();
789             n.acked[value->id].first.reset();
790         }
791     } else {
792         a_sr->permanent = permanent;
793         a_sr->created = created;
794         if (a_sr->value != value) {
795             a_sr->value = value;
796             for (auto& n : sr->nodes) {
797                 n.acked[value->id].first.reset();
798                 n.probe_query.reset();
799             }
800         }
801         if (sr->isAnnounced(value->id)) {
802             if (a_sr->callback)
803                 a_sr->callback(true, {});
804             a_sr->callback = {};
805             if (callback)
806                 callback(true, {});
807             return;
808         } else {
809             if (a_sr->callback)
810                 a_sr->callback(false, {});
811             a_sr->callback = callback;
812         }
813     }
814     scheduler.edit(sr->nextSearchStep, scheduler.time());
815 }
816 
817 size_t
listenTo(const InfoHash & id,sa_family_t af,ValueCallback cb,Value::Filter f,const Sp<Query> & q)818 Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f, const Sp<Query>& q)
819 {
820     if (!isRunning(af))
821         return 0;
822        // DHT_LOG_ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
823 
824     //DHT_LOG_WARN("listenTo %s", id.toString().c_str());
825     auto& srs = searches(af);
826     auto srp = srs.find(id);
827     Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second;
828     if (!sr)
829         throw DhtException("Can't create search");
830     DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
831     return sr->listen(cb, f, q, scheduler);
832 }
833 
834 size_t
listen(const InfoHash & id,ValueCallback cb,Value::Filter f,Where where)835 Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where)
836 {
837     scheduler.syncTime();
838 
839     auto token = ++listener_token;
840     auto gcb = OpValueCache::cacheCallback(std::move(cb), [this, id, token]{
841         cancelListen(id, token);
842     });
843 
844     auto query = std::make_shared<Query>(Select{}, std::move(where));
845     auto filter = f.chain(query->where.getFilter());
846     auto st = store.find(id);
847     if (st == store.end() && store.size() < MAX_HASHES)
848         st = store.emplace(id, scheduler.time() + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME).first;
849 
850     size_t tokenlocal = 0;
851     if (st != store.end()) {
852         tokenlocal = st->second.listen(gcb, filter, query);
853         if (tokenlocal == 0)
854             return 0;
855     }
856 
857     auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query);
858     auto token6 = token4 == 0 ? 0 : Dht::listenTo(id, AF_INET6, gcb, filter, query);
859     if (token6 == 0 && st != store.end()) {
860         st->second.cancelListen(tokenlocal);
861         return 0;
862     }
863 
864     listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
865     return token;
866 }
867 
868 bool
cancelListen(const InfoHash & id,size_t token)869 Dht::cancelListen(const InfoHash& id, size_t token)
870 {
871     scheduler.syncTime();
872 
873     auto it = listeners.find(token);
874     if (it == listeners.end()) {
875         DHT_LOG.w(id, "Listen token not found: %d", token);
876         return false;
877     }
878     DHT_LOG.d(id, "cancelListen %s with token %d", id.toString().c_str(), token);
879     if (auto tokenlocal = std::get<0>(it->second)) {
880         auto st = store.find(id);
881         if (st != store.end())
882             st->second.cancelListen(tokenlocal);
883     }
884     auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) {
885         if (token) {
886             auto srp = srs.find(id);
887             if (srp != srs.end())
888                 srp->second->cancelListen(token, scheduler);
889         }
890     };
891     searches_cancel_listen(searches4, std::get<1>(it->second));
892     searches_cancel_listen(searches6, std::get<2>(it->second));
893     listeners.erase(it);
894     return true;
895 }
896 
897 struct OpStatus {
898     struct Status {
899         bool done {false};
900         bool ok {false};
Statusdht::OpStatus::Status901         Status(bool done=false, bool ok=false) : done(done), ok(ok) {}
902     };
903     Status status;
904     Status status4;
905     Status status6;
906 };
907 
908 template <typename T>
909 struct GetStatus : public OpStatus {
910     T values;
911     std::vector<Sp<Node>> nodes;
912 };
913 
914 void
put(const InfoHash & id,Sp<Value> val,DoneCallback callback,time_point created,bool permanent)915 Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point created, bool permanent)
916 {
917     if (not val) {
918         if (callback)
919             callback(false, {});
920         return;
921     }
922     if (val->id == Value::INVALID_ID) {
923         crypto::random_device rdev;
924         std::uniform_int_distribution<Value::Id> rand_id {};
925         val->id = rand_id(rdev);
926     }
927     scheduler.syncTime();
928     const auto& now = scheduler.time();
929     created = std::min(now, created);
930     storageStore(id, val, created, {}, permanent);
931 
932     DHT_LOG.d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());
933 
934     auto op = std::make_shared<OpStatus>();
935     auto donecb = [callback](const std::vector<Sp<Node>>& nodes, OpStatus& op) {
936         // Callback as soon as the value is announced on one of the available networks
937         if (callback and not op.status.done and (op.status4.done && op.status6.done)) {
938             callback(op.status4.ok or op.status6.ok, nodes);
939             op.status.done = true;
940         }
941     };
942     announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) {
943         DHT_LOG.d(id, "Announce done IPv4 %d", ok4);
944         auto& o = *op;
945         o.status4 = {true, ok4};
946         donecb(nodes, o);
947     }, created, permanent);
948     announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) {
949         DHT_LOG.d(id, "Announce done IPv6 %d", ok6);
950         auto& o = *op;
951         o.status6 = {true, ok6};
952         donecb(nodes, o);
953     }, created, permanent);
954 }
955 
956 template <typename T>
doneCallbackWrapper(DoneCallback dcb,const std::vector<Sp<Node>> & nodes,GetStatus<T> & op)957 void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, GetStatus<T>& op) {
958     if (op.status.done)
959         return;
960     op.nodes.insert(op.nodes.end(), nodes.begin(), nodes.end());
961     if (op.status.ok or (op.status4.done and op.status6.done)) {
962         bool ok = op.status.ok or op.status4.ok or op.status6.ok;
963         op.status.done = true;
964         if (dcb)
965             dcb(ok, op.nodes);
966     }
967 }
968 
969 template <typename T, typename St, typename Cb, typename Av, typename Cv>
callbackWrapper(Cb get_cb,DoneCallback done_cb,const std::vector<Sp<T>> & values,Av add_values,Cv cache_values,GetStatus<St> & op)970 bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values,
971     Av add_values, Cv cache_values, GetStatus<St>& op)
972 {
973     if (op.status.done)
974         return false;
975     auto newvals = add_values(values);
976     if (not newvals.empty()) {
977         op.status.ok = !get_cb(newvals);
978         cache_values(newvals);
979     }
980     doneCallbackWrapper(done_cb, {}, op);
981     return !op.status.ok;
982 }
983 
984 void
get(const InfoHash & id,GetCallback getcb,DoneCallback donecb,Value::Filter && filter,Where && where)985 Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter, Where&& where)
986 {
987     scheduler.syncTime();
988 
989     auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>();
990     auto gcb = [getcb, donecb, op](const std::vector<Sp<Value>>& vals) {
991         auto& o = *op;
992         return callbackWrapper(getcb, donecb, vals, [&o](const std::vector<Sp<Value>>& values) {
993             std::vector<Sp<Value>> newvals {};
994             for (const auto& v : values) {
995                 auto it = o.values.find(v->id);
996                 if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) {
997                     newvals.push_back(v);
998                 }
999             }
1000             return newvals;
1001         }, [&o](const std::vector<Sp<Value>>& newvals) {
1002             for (const auto& v : newvals)
1003                 o.values[v->id] = v;
1004         }, o);
1005     };
1006 
1007     auto q = std::make_shared<Query>(Select {}, std::move(where));
1008     auto f = filter.chain(q->where.getFilter());
1009 
1010     /* Try to answer this search locally. */
1011     gcb(getLocal(id, f));
1012 
1013     Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1014         //DHT_LOG_WARN("DHT done IPv4");
1015         op->status4 = {true, ok};
1016         doneCallbackWrapper(donecb, nodes, *op);
1017     }, f, q);
1018     Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1019         //DHT_LOG_WARN("DHT done IPv6");
1020         op->status6 = {true, ok};
1021         doneCallbackWrapper(donecb, nodes, *op);
1022     }, f, q);
1023 }
1024 
query(const InfoHash & id,QueryCallback cb,DoneCallback done_cb,Query && q)1025 void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q)
1026 {
1027     scheduler.syncTime();
1028     auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>();
1029     auto f = q.where.getFilter();
1030     auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){
1031         auto& o = *op;
1032         return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) {
1033             std::vector<Sp<FieldValueIndex>> newvals {};
1034             for (const auto& f : fields) {
1035                 auto it = std::find_if(o.values.cbegin(), o.values.cend(),
1036                     [&](const Sp<FieldValueIndex>& sf) {
1037                         return sf == f or f->containedIn(*sf);
1038                     });
1039                 if (it == o.values.cend()) {
1040                     auto lesser = std::find_if(o.values.begin(), o.values.end(),
1041                         [&](const Sp<FieldValueIndex>& sf) {
1042                             return sf->containedIn(*f);
1043                         });
1044                     if (lesser != o.values.end())
1045                         o.values.erase(lesser);
1046                     newvals.push_back(f);
1047                 }
1048             }
1049             return newvals;
1050         }, [&](const std::vector<Sp<FieldValueIndex>>& fields){
1051             o.values.insert(o.values.end(), fields.begin(), fields.end());
1052         }, o);
1053     };
1054 
1055     /* Try to answer this search locally. */
1056     auto values = getLocal(id, f);
1057     std::vector<Sp<FieldValueIndex>> local_fields(values.size());
1058     std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) {
1059         return std::make_shared<FieldValueIndex>(*v, q.select);
1060     });
1061     qcb(local_fields);
1062 
1063     auto sq = std::make_shared<Query>(std::move(q));
1064     Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1065         //DHT_LOG_WARN("DHT done IPv4");
1066         op->status4 = {true, ok};
1067         doneCallbackWrapper(done_cb, nodes, *op);
1068     }, f, sq);
1069     Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1070         //DHT_LOG_WARN("DHT done IPv6");
1071         op->status6 = {true, ok};
1072         doneCallbackWrapper(done_cb, nodes, *op);
1073     }, f, sq);
1074 }
1075 
1076 std::vector<Sp<Value>>
getLocal(const InfoHash & id,const Value::Filter & f) const1077 Dht::getLocal(const InfoHash& id, const Value::Filter& f) const
1078 {
1079     auto s = store.find(id);
1080     if (s == store.end()) return {};
1081     return s->second.get(f);
1082 }
1083 
1084 Sp<Value>
getLocalById(const InfoHash & id,Value::Id vid) const1085 Dht::getLocalById(const InfoHash& id, Value::Id vid) const
1086 {
1087     auto s = store.find(id);
1088     if (s != store.end())
1089         return s->second.getById(vid);
1090     return {};
1091 }
1092 
1093 std::vector<Sp<Value>>
getPut(const InfoHash & id) const1094 Dht::getPut(const InfoHash& id) const
1095 {
1096     std::vector<Sp<Value>> ret;
1097     auto find_values = [&](const std::map<InfoHash, Sp<Search>>& srs) {
1098         auto srp = srs.find(id);
1099         if (srp == srs.end()) return;
1100         auto vals = srp->second->getPut();
1101         ret.insert(ret.end(), vals.begin(), vals.end());
1102     };
1103     find_values(searches4);
1104     find_values(searches6);
1105     return ret;
1106 }
1107 
1108 Sp<Value>
getPut(const InfoHash & id,const Value::Id & vid) const1109 Dht::getPut(const InfoHash& id, const Value::Id& vid) const
1110 {
1111     auto find_value = [&](const std::map<InfoHash, Sp<Search>>& srs) {
1112         auto srp = srs.find(id);
1113         return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {};
1114     };
1115     if (auto v4 = find_value(searches4))
1116         return v4;
1117     if (auto v6 = find_value(searches6))
1118         return v6;
1119     return {};
1120 }
1121 
1122 bool
cancelPut(const InfoHash & id,const Value::Id & vid)1123 Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
1124 {
1125     bool canceled {false};
1126     auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>>& srs) {
1127         auto srp = srs.find(id);
1128         return (srp != srs.end()) ? srp->second->cancelPut(vid) : false;
1129     };
1130     canceled |= sr_cancel_put(searches4);
1131     canceled |= sr_cancel_put(searches6);
1132     if (canceled)
1133         storageErase(id, vid);
1134     return canceled;
1135 }
1136 
1137 // Storage
1138 
1139 void
storageChanged(const InfoHash & id,Storage & st,ValueStorage & v,bool newValue)1140 Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newValue)
1141 {
1142     if (newValue) {
1143         if (not st.local_listeners.empty()) {
1144             DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size());
1145             std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs;
1146             cbs.reserve(st.local_listeners.size());
1147             for (const auto& l : st.local_listeners) {
1148                 std::vector<Sp<Value>> vals;
1149                 if (not l.second.filter or l.second.filter(*v.data))
1150                     vals.push_back(v.data);
1151                 if (not vals.empty()) {
1152                     DHT_LOG.d(id, "[store %s] sending update local listener with token %lu",
1153                             id.toString().c_str(),
1154                             l.first);
1155                     cbs.emplace_back(l.second.get_cb, std::move(vals));
1156                 }
1157             }
1158             // listeners are copied: they may be deleted by the callback
1159             for (auto& cb : cbs)
1160                 cb.first(cb.second, false);
1161         }
1162     }
1163 
1164     if (not st.listeners.empty()) {
1165         DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
1166         for (const auto& node_listeners : st.listeners) {
1167             for (const auto& l : node_listeners.second) {
1168                 auto f = l.second.query.where.getFilter();
1169                 if (f and not f(*v.data))
1170                     continue;
1171                 DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update",
1172                         id.toString().c_str(),
1173                         node_listeners.first->toString().c_str());
1174                 std::vector<Sp<Value>> vals {};
1175                 vals.push_back(v.data);
1176                 Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
1177                 network_engine.tellListener(node_listeners.first, l.first, id, 0, ntoken, {}, {},
1178                         std::move(vals), l.second.query);
1179             }
1180         }
1181     }
1182 }
1183 
1184 bool
storageStore(const InfoHash & id,const Sp<Value> & value,time_point created,const SockAddr & sa,bool permanent)1185 Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa, bool permanent)
1186 {
1187     const auto& now = scheduler.time();
1188     created = std::min(created, now);
1189     auto expiration = permanent ? time_point::max() : created + getType(value->type).expiration;
1190     if (expiration < now)
1191         return false;
1192 
1193     auto st = store.find(id);
1194     if (st == store.end()) {
1195         if (store.size() >= MAX_HASHES)
1196             return false;
1197         auto st_i = store.emplace(id, now);
1198         st = st_i.first;
1199         if (maintain_storage and st_i.second)
1200             scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
1201     }
1202 
1203     StorageBucket* store_bucket {nullptr};
1204     if (sa)
1205         store_bucket = &store_quota[sa];
1206 
1207     auto store = st->second.store(id, value, created, expiration, store_bucket);
1208     if (auto vs = store.first) {
1209         total_store_size += store.second.size_diff;
1210         total_values += store.second.values_diff;
1211         if (not permanent) {
1212             scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
1213         }
1214         if (total_store_size > max_store_size) {
1215             expireStore();
1216         }
1217         storageChanged(id, st->second, *vs, store.second.values_diff > 0);
1218     }
1219 
1220     return std::get<0>(store);
1221 }
1222 
1223 bool
storageErase(const InfoHash & id,Value::Id vid)1224 Dht::storageErase(const InfoHash& id, Value::Id vid)
1225 {
1226     auto st = store.find(id);
1227     if (st == store.end())
1228         return false;
1229     auto ret = st->second.remove(id, vid);
1230     total_store_size += ret.size_diff;
1231     total_values += ret.values_diff;
1232     return ret.values_diff;
1233 }
1234 
1235 void
storageAddListener(const InfoHash & id,const Sp<Node> & node,size_t socket_id,Query && query)1236 Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query)
1237 {
1238     const auto& now = scheduler.time();
1239     auto st = store.find(id);
1240     if (st == store.end()) {
1241         if (store.size() >= MAX_HASHES)
1242             return;
1243         st = store.emplace(id, now).first;
1244     }
1245     auto& node_listeners = st->second.listeners[node];
1246     auto l = node_listeners.find(socket_id);
1247     if (l == node_listeners.end()) {
1248         auto vals = st->second.get(query.where.getFilter());
1249         if (not vals.empty()) {
1250             network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false),
1251                     buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
1252                     std::move(vals), query);
1253         }
1254         node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query)});
1255     }
1256     else
1257         l->second.refresh(now, std::forward<Query>(query));
1258 }
1259 
1260 void
expireStore(decltype(store)::iterator i)1261 Dht::expireStore(decltype(store)::iterator i)
1262 {
1263     const auto& id = i->first;
1264     auto& st = i->second;
1265     auto stats = st.expire(id, scheduler.time());
1266     total_store_size += stats.first;
1267     total_values -= stats.second.size();
1268     if (not stats.second.empty()) {
1269         DHT_LOG.d(id, "[store %s] discarded %ld expired values (%ld bytes)",
1270             id.toString().c_str(), stats.second.size(), -stats.first);
1271 
1272         if (not st.listeners.empty()) {
1273             DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
1274 
1275             std::vector<Value::Id> ids;
1276             ids.reserve(stats.second.size());
1277             for (const auto& v : stats.second)
1278                 ids.emplace_back(v->id);
1279 
1280             for (const auto& node_listeners : st.listeners) {
1281                 for (const auto& l : node_listeners.second) {
1282                     DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending expired",
1283                             id.toString().c_str(),
1284                             node_listeners.first->toString().c_str());
1285                     Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
1286                     network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids);
1287                 }
1288             }
1289         }
1290         for (const auto& local_listeners : st.local_listeners) {
1291             local_listeners.second.get_cb(stats.second, true);
1292         }
1293     }
1294 }
1295 
1296 void
expireStorage(InfoHash h)1297 Dht::expireStorage(InfoHash h)
1298 {
1299     auto i = store.find(h);
1300     if (i != store.end())
1301         expireStore(i);
1302 }
1303 
1304 void
expireStore()1305 Dht::expireStore()
1306 {
1307     // removing expired values
1308     for (auto i = store.begin(); i != store.end();) {
1309         expireStore(i);
1310 
1311         if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) {
1312             DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str());
1313             i = store.erase(i);
1314         }
1315         else
1316             ++i;
1317     }
1318 
1319     // remove more values if storage limit is exceeded
1320     while (total_store_size > max_store_size) {
1321         // find IP using the most storage
1322         if (store_quota.empty()) {
1323             DHT_LOG.w("No space left: local data consumes all the quota!");
1324             break;
1325         }
1326         auto largest = store_quota.begin();
1327         for (auto it = ++largest; it != store_quota.end(); ++it) {
1328             if (it->second.size() > largest->second.size())
1329                 largest = it;
1330         }
1331         DHT_LOG.w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str());
1332         while (true) {
1333             auto exp_value = largest->second.getOldest();
1334             auto storage = store.find(exp_value.first);
1335             if (storage != store.end()) {
1336                 auto ret = storage->second.remove(exp_value.first, exp_value.second);
1337                 total_store_size += ret.size_diff;
1338                 total_values += ret.values_diff;
1339                 DHT_LOG.w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size);
1340                 if (ret.values_diff)
1341                     break;
1342             }
1343         }
1344     }
1345 
1346     // remove unused quota entires
1347     for (auto i = store_quota.begin(); i != store_quota.end();) {
1348         if (i->second.size() == 0)
1349             i = store_quota.erase(i);
1350         else
1351             ++i;
1352     }
1353 }
1354 
1355 void
connectivityChanged(sa_family_t af)1356 Dht::connectivityChanged(sa_family_t af)
1357 {
1358     const auto& now = scheduler.time();
1359     scheduler.edit(nextNodesConfirmation, now);
1360     buckets(af).connectivityChanged(now);
1361     network_engine.connectivityChanged(af);
1362     reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
1363         return addr.second.getFamily() == af;
1364     }), reported_addr.end());
1365 }
1366 
1367 void
rotateSecrets()1368 Dht::rotateSecrets()
1369 {
1370     oldsecret = secret;
1371     {
1372         crypto::random_device rdev;
1373         secret = std::uniform_int_distribution<uint64_t>{}(rdev);
1374     }
1375     uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
1376     auto rotate_secrets_time = scheduler.time() + time_dist(rd);
1377     scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
1378 }
1379 
1380 Blob
makeToken(const SockAddr & addr,bool old) const1381 Dht::makeToken(const SockAddr& addr, bool old) const
1382 {
1383     const void *ip;
1384     size_t iplen;
1385     in_port_t port;
1386 
1387     auto family = addr.getFamily();
1388     if (family == AF_INET) {
1389         const auto& sin = addr.getIPv4();
1390         ip = &sin.sin_addr;
1391         iplen = 4;
1392         port = sin.sin_port;
1393     } else if (family == AF_INET6) {
1394         const auto& sin6 = addr.getIPv6();
1395         ip = &sin6.sin6_addr;
1396         iplen = 16;
1397         port = sin6.sin6_port;
1398     } else {
1399         return {};
1400     }
1401 
1402     const auto& c1 = old ? oldsecret : secret;
1403     Blob data;
1404     data.reserve(sizeof(secret)+sizeof(in_port_t)+iplen);
1405     data.insert(data.end(), (uint8_t*)&c1, ((uint8_t*)&c1) + sizeof(c1));
1406     data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
1407     data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+sizeof(in_port_t));
1408     return crypto::hash(data, TOKEN_SIZE);
1409 }
1410 
1411 bool
tokenMatch(const Blob & token,const SockAddr & addr) const1412 Dht::tokenMatch(const Blob& token, const SockAddr& addr) const
1413 {
1414     if (not addr or token.size() != TOKEN_SIZE)
1415         return false;
1416     if (token == makeToken(addr, false))
1417         return true;
1418     if (token == makeToken(addr, true))
1419         return true;
1420     return false;
1421 }
1422 
1423 NodeStats
getNodesStats(sa_family_t af) const1424 Dht::getNodesStats(sa_family_t af) const
1425 {
1426     NodeStats stats {};
1427     const auto& now = scheduler.time();
1428     const auto& bcks = buckets(af);
1429     for (const auto& b : bcks) {
1430         for (auto& n : b.nodes) {
1431             if (n->isGood(now)) {
1432                 stats.good_nodes++;
1433                 if (n->isIncoming())
1434                     stats.incoming_nodes++;
1435             } else if (not n->isExpired())
1436                 stats.dubious_nodes++;
1437         }
1438         if (b.cached)
1439             stats.cached_nodes++;
1440     }
1441     stats.table_depth = bcks.depth(bcks.findBucket(myid));
1442     return stats;
1443 }
1444 
1445 void
dumpBucket(const Bucket & b,std::ostream & out) const1446 Dht::dumpBucket(const Bucket& b, std::ostream& out) const
1447 {
1448     const auto& now = scheduler.time();
1449     using namespace std::chrono;
1450     out << b.first << " count " << b.nodes.size() << " age " << duration_cast<seconds>(now - b.time).count() << " sec";
1451     if (b.cached)
1452         out << " (cached)";
1453     out  << std::endl;
1454     for (auto& n : b.nodes) {
1455         out << "    Node " << n->toString();
1456         const auto& t = n->getTime();
1457         const auto& r = n->getReplyTime();
1458         if (t != r)
1459             out << " age " << duration_cast<seconds>(now - t).count() << ", reply: " << duration_cast<seconds>(now - r).count();
1460         else
1461             out << " age " << duration_cast<seconds>(now - t).count();
1462         if (n->isExpired())
1463             out << " [expired]";
1464         else if (n->isGood(now))
1465             out << " [good]";
1466         out << std::endl;
1467     }
1468 }
1469 
1470 void
dumpSearch(const Search & sr,std::ostream & out) const1471 Dht::dumpSearch(const Search& sr, std::ostream& out) const
1472 {
1473     const auto& now = scheduler.time();
1474     using namespace std::chrono;
1475     out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
1476     out << ", age: " << duration_cast<seconds>(now - sr.step_time).count() << " s";
1477     if (sr.done)
1478         out << " [done]";
1479     if (sr.expired)
1480         out << " [expired]";
1481     bool synced = sr.isSynced(now);
1482     out << (synced ? " [synced]" : " [not synced]");
1483     if (synced && sr.isListening(now))
1484         out << " [listening]";
1485     out << std::endl;
1486 
1487     /*printing the queries*/
1488     if (sr.callbacks.size() + sr.listeners.size() > 0)
1489         out << "Queries:" << std::endl;
1490     for (const auto& cb : sr.callbacks) {
1491         out << *cb.second.query << std::endl;
1492     }
1493     for (const auto& l : sr.listeners) {
1494         out << *l.second.query << std::endl;
1495     }
1496 
1497     for (const auto& a : sr.announce) {
1498         bool announced = sr.isAnnounced(a.value->id);
1499         out << "Announcement: " << *a.value << (announced ? " [announced]" : "") << std::endl;
1500     }
1501 
1502     out << " Common bits    InfoHash                       Conn. Get   Ops  IP" << std::endl;
1503     unsigned i = 0;
1504     auto last_get = sr.getLastGetTime();
1505     for (const auto& n : sr.nodes) {
1506         i++;
1507         out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
1508         out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
1509         out << " [";
1510         if (auto pendingCount = n.node->getPendingMessageCount())
1511             out << pendingCount;
1512         else
1513             out << ' ';
1514         out << (n.node->isExpired() ? 'x' : ' ') << "]";
1515 
1516         // Get status
1517         {
1518             char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' ';
1519             char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
1520             out << " [" << s_i << g_i << "] ";
1521         }
1522 
1523         // Listen status
1524         if (not sr.listeners.empty()) {
1525             if (n.listenStatus.empty())
1526                 out << "    ";
1527             else
1528                 out << "["
1529                     << (n.isListening(now) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] ";
1530         }
1531 
1532         // Announce status
1533         if (not sr.announce.empty()) {
1534             if (n.acked.empty()) {
1535                 out << "   ";
1536                 for (size_t a=0; a < sr.announce.size(); a++)
1537                     out << ' ';
1538             } else {
1539                 out << "[";
1540                 for (const auto& a : sr.announce) {
1541                     auto ack = n.acked.find(a.value->id);
1542                     if (ack == n.acked.end() or not ack->second.first) {
1543                         out << ' ';
1544                     } else {
1545                         out << ack->second.first->getStateChar();
1546                     }
1547                 }
1548                 out << "] ";
1549             }
1550         }
1551         out << n.node->getAddrStr() << std::endl;
1552     }
1553 }
1554 
1555 void
dumpTables() const1556 Dht::dumpTables() const
1557 {
1558     std::stringstream out;
1559     out << "My id " << myid << std::endl;
1560 
1561     out << "Buckets IPv4 :" << std::endl;
1562     for (const auto& b : buckets4)
1563         dumpBucket(b, out);
1564     out << "Buckets IPv6 :" << std::endl;
1565     for (const auto& b : buckets6)
1566         dumpBucket(b, out);
1567 
1568     auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) {
1569         for (auto& srp : srs)
1570             dumpSearch(*srp.second, out);
1571     };
1572     dump_searches(searches4);
1573     dump_searches(searches6);
1574     out << std::endl;
1575 
1576     out << getStorageLog() << std::endl;
1577 
1578     DHT_LOG.d("%s", out.str().c_str());
1579 }
1580 
1581 std::string
getStorageLog() const1582 Dht::getStorageLog() const
1583 {
1584     std::stringstream out;
1585     for (const auto& s : store)
1586         out << printStorageLog(s);
1587     out << std::endl << std::endl;
1588     std::multimap<size_t, const SockAddr*> q_map;
1589     for (const auto& ip : store_quota)
1590         if (ip.second.size())
1591             q_map.emplace(ip.second.size(), &ip.first);
1592     for (auto ip = q_map.rbegin(); ip != q_map.rend(); ++ip)
1593         out << "IP " << ip->second->toString() << " uses " << ip->first << " bytes" << std::endl;
1594     out << std::endl;
1595     out << "Total " << store.size() << " storages, " << total_values << " values (";
1596     if (total_store_size < 1024)
1597         out << total_store_size << " bytes)";
1598     else
1599         out << (total_store_size/1024) << " / " << (max_store_size/1024) << " KB)";
1600     out << std::endl;
1601     return out.str();
1602 }
1603 
1604 std::string
getStorageLog(const InfoHash & h) const1605 Dht::getStorageLog(const InfoHash& h) const
1606 {
1607     auto s = store.find(h);
1608     if (s == store.end()) {
1609         std::stringstream out;
1610         out << "Storage " << h << " empty" << std::endl;
1611         return out.str();
1612     }
1613     return printStorageLog(*s);
1614 }
1615 
1616 std::string
printStorageLog(const decltype(store)::value_type & s) const1617 Dht::printStorageLog(const decltype(store)::value_type& s) const
1618 {
1619     std::stringstream out;
1620     using namespace std::chrono;
1621     const auto& st = s.second;
1622     out << "Storage " << s.first << " "
1623                       << st.listeners.size() << " list., "
1624                       << st.valueCount() << " values ("
1625                       << st.totalSize() << " bytes)" << std::endl;
1626     if (not st.local_listeners.empty())
1627         out << "   " << st.local_listeners.size() << " local listeners" << std::endl;
1628     for (const auto& node_listeners : st.listeners) {
1629         const auto& node = node_listeners.first;
1630         out << "   " << "Listener " << node->toString() << " : " << node_listeners.second.size() << " entries" << std::endl;
1631     }
1632     return out.str();
1633 }
1634 
1635 std::string
getRoutingTablesLog(sa_family_t af) const1636 Dht::getRoutingTablesLog(sa_family_t af) const
1637 {
1638     std::stringstream out;
1639     for (const auto& b : buckets(af))
1640         dumpBucket(b, out);
1641     return out.str();
1642 }
1643 
1644 std::string
getSearchesLog(sa_family_t af) const1645 Dht::getSearchesLog(sa_family_t af) const
1646 {
1647     std::stringstream out;
1648     auto num_searches = searches4.size() + searches6.size();
1649     if (num_searches > 8) {
1650         if (not af or af == AF_INET)
1651             for (const auto& sr : searches4)
1652                 out << "[search " << sr.first << " IPv4]" << std::endl;
1653         if (not af or af == AF_INET6)
1654             for (const auto& sr : searches6)
1655                 out << "[search " << sr.first << " IPv6]" << std::endl;
1656     } else {
1657         out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
1658         if (not af or af == AF_INET)
1659             for (const auto& sr : searches4)
1660                 dumpSearch(*sr.second, out);
1661         if (not af or af == AF_INET6)
1662             for (const auto& sr : searches6)
1663                 dumpSearch(*sr.second, out);
1664     }
1665     out << "Total: " << num_searches << " searches (" << searches4.size() << " IPv4, " << searches6.size() << " IPv6)." << std::endl;
1666     return out.str();
1667 }
1668 
1669 std::string
getSearchLog(const InfoHash & id,sa_family_t af) const1670 Dht::getSearchLog(const InfoHash& id, sa_family_t af) const
1671 {
1672     std::stringstream out;
1673     if (af == AF_UNSPEC) {
1674         out << getSearchLog(id, AF_INET) << getSearchLog(id, AF_INET6);
1675     } else {
1676         auto& srs = searches(af);
1677         auto sr = srs.find(id);
1678         if (sr != srs.end())
1679             dumpSearch(*sr->second, out);
1680     }
1681     return out.str();
1682 }
1683 
~Dht()1684 Dht::~Dht()
1685 {
1686     for (auto& s : searches4)
1687         s.second->clear();
1688     for (auto& s : searches6)
1689         s.second->clear();
1690 }
1691 
Dht()1692 Dht::Dht() : store(), network_engine(DHT_LOG, scheduler, {}) {}
1693 
Dht(std::unique_ptr<net::DatagramSocket> && sock,const Config & config,const Logger & l)1694 Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l)
1695     : DhtInterface(l), myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(),
1696     network_engine(myid, config.network, std::move(sock), DHT_LOG, scheduler,
1697             std::bind(&Dht::onError, this, _1, _2),
1698             std::bind(&Dht::onNewNode, this, _1, _2),
1699             std::bind(&Dht::onReportedAddr, this, _1, _2),
1700             std::bind(&Dht::onPing, this, _1),
1701             std::bind(&Dht::onFindNode, this, _1, _2, _3),
1702             std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
1703             std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5),
1704             std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5),
1705             std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)),
1706     persistPath(config.persist_path),
1707     is_bootstrap(config.is_bootstrap),
1708     maintain_storage(config.maintain_storage)
1709 {
1710     scheduler.syncTime();
1711     auto s = network_engine.getSocket();
1712     if (not s or (not s->hasIPv4() and not s->hasIPv6()))
1713         throw DhtException("Opened socket required");
1714     if (s->hasIPv4()) {
1715         buckets4 = {Bucket {AF_INET}};
1716         buckets4.is_client = config.is_bootstrap;
1717     }
1718     if (s->hasIPv6()) {
1719         buckets6 = {Bucket {AF_INET6}};
1720         buckets6.is_client = config.is_bootstrap;
1721     }
1722 
1723     search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);
1724 
1725     uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
1726     nextNodesConfirmation = scheduler.add(scheduler.time() + time_dis(rd), std::bind(&Dht::confirmNodes, this));
1727 
1728     // Fill old secret
1729     {
1730         crypto::random_device rdev;
1731         secret = std::uniform_int_distribution<uint64_t>{}(rdev);
1732     }
1733     rotateSecrets();
1734 
1735     expire();
1736 
1737     DHT_LOG.d("DHT node initialised with ID %s", myid.toString().c_str());
1738 
1739     if (not persistPath.empty())
1740         loadState(persistPath);
1741 }
1742 
1743 bool
neighbourhoodMaintenance(RoutingTable & list)1744 Dht::neighbourhoodMaintenance(RoutingTable& list)
1745 {
1746     //DHT_LOG_DBG("neighbourhoodMaintenance");
1747     auto b = list.findBucket(myid);
1748     if (b == list.end())
1749         return false;
1750 
1751     InfoHash id = myid;
1752 #ifdef _WIN32
1753     std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint8_t>::max() };
1754 #else
1755     std::uniform_int_distribution<uint8_t> rand_byte;
1756 #endif
1757     id[HASH_LEN-1] = rand_byte(rd);
1758 
1759     std::bernoulli_distribution rand_trial(1./8.);
1760     auto q = b;
1761     if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
1762         q = std::next(q);
1763     if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
1764         auto r = std::prev(b);
1765         if (!r->nodes.empty())
1766             q = r;
1767     }
1768 
1769     auto n = q->randomNode();
1770     if (n) {
1771         DHT_LOG.d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance",
1772                 n->toString().c_str(), id.toString().c_str());
1773         /* Since our node-id is the same in both DHTs, it's probably
1774            profitable to query both families. */
1775         network_engine.sendFindNode(n, id, network_engine.want());
1776     }
1777 
1778     return true;
1779 }
1780 
1781 bool
bucketMaintenance(RoutingTable & list)1782 Dht::bucketMaintenance(RoutingTable& list)
1783 {
1784     std::bernoulli_distribution rand_trial(1./8.);
1785     std::bernoulli_distribution rand_trial_38(1./38.);
1786 
1787     bool sent {false};
1788     for (auto b = list.begin(); b != list.end(); ++b) {
1789         if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
1790             /* This bucket hasn't seen any positive confirmation for a long
1791                time. Pick a random id in this bucket's range, and send a request
1792                to a random node. */
1793             InfoHash id = list.randomId(b);
1794             auto q = b;
1795             /* If the bucket is empty, we try to fill it from a neighbour.
1796                We also sometimes do it gratuitiously to recover from
1797                buckets full of broken nodes. */
1798             if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
1799                 q = std::next(b);
1800             if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
1801                 auto r = std::prev(b);
1802                 if (!r->nodes.empty())
1803                     q = r;
1804             }
1805 
1806             auto n = q->randomNode();
1807             if (n and not n->isPendingMessage()) {
1808                 want_t want = -1;
1809 
1810                 if (network_engine.want() != want) {
1811                     auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
1812                     if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
1813                         /* The corresponding bucket in the other family
1814                            is emptyish -- querying both is useful. */
1815                         want = WANT4 | WANT6;
1816                     else if (rand_trial_38(rd))
1817                         /* Most of the time, this just adds overhead.
1818                            However, it might help stitch back one of
1819                            the DHTs after a network collapse, so query
1820                            both, but only very occasionally. */
1821                         want = WANT4 | WANT6;
1822                 }
1823 
1824                 DHT_LOG.d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str());
1825                 //auto start = scheduler.time();
1826                 network_engine.sendFindNode(n, id, want, nullptr, [this,n](const net::Request&, bool over) {
1827                     if (over) {
1828                         const auto& end = scheduler.time();
1829                         // using namespace std::chrono;
1830                         // DHT_LOG.d(n->id, "[node %s] bucket maintenance op expired after %llu ms", n->toString().c_str(), duration_cast<milliseconds>(end-start).count());
1831                         scheduler.edit(nextNodesConfirmation, end + Node::MAX_RESPONSE_TIME);
1832                     }
1833                 });
1834                 sent = true;
1835             }
1836         }
1837     }
1838     return sent;
1839 }
1840 
1841 void
dataPersistence(InfoHash id)1842 Dht::dataPersistence(InfoHash id)
1843 {
1844     const auto& now = scheduler.time();
1845     auto str = store.find(id);
1846     if (str != store.end() and now > str->second.maintenance_time) {
1847         DHT_LOG.d(id, "[storage %s] maintenance (%u values, %u bytes)",
1848                 id.toString().c_str(), str->second.valueCount(), str->second.totalSize());
1849         maintainStorage(*str);
1850         str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
1851         scheduler.add(str->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
1852     }
1853 }
1854 
1855 size_t
maintainStorage(decltype(store)::value_type & storage,bool force,const DoneCallback & donecb)1856 Dht::maintainStorage(decltype(store)::value_type& storage, bool force, const DoneCallback& donecb)
1857 {
1858     const auto& now = scheduler.time();
1859     size_t announce_per_af = 0;
1860 
1861     bool want4 = true, want6 = true;
1862 
1863     auto nodes = buckets4.findClosestNodes(storage.first, now);
1864     if (!nodes.empty()) {
1865         if (force || storage.first.xorCmp(nodes.back()->id, myid) < 0) {
1866             for (auto &value : storage.second.getValues()) {
1867                 const auto& vt = getType(value.data->type);
1868                 if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
1869                     // gotta put that value there
1870                     announce(storage.first, AF_INET, value.data, donecb, value.created);
1871                     ++announce_per_af;
1872                 }
1873             }
1874             want4 = false;
1875         }
1876     }
1877 
1878     auto nodes6 = buckets6.findClosestNodes(storage.first, now);
1879     if (!nodes6.empty()) {
1880         if (force || storage.first.xorCmp(nodes6.back()->id, myid) < 0) {
1881             for (auto &value : storage.second.getValues()) {
1882                 const auto& vt = getType(value.data->type);
1883                 if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
1884                     // gotta put that value there
1885                     announce(storage.first, AF_INET6, value.data, donecb, value.created);
1886                     ++announce_per_af;
1887                 }
1888             }
1889             want6 = false;
1890         }
1891     }
1892 
1893     if (not want4 and not want6) {
1894         DHT_LOG.d(storage.first, "Discarding storage values %s", storage.first.toString().c_str());
1895         auto diff = storage.second.clear();
1896         total_store_size += diff.size_diff;
1897         total_values += diff.values_diff;
1898     }
1899 
1900     return announce_per_af;
1901 }
1902 
1903 time_point
periodic(const uint8_t * buf,size_t buflen,SockAddr from)1904 Dht::periodic(const uint8_t *buf, size_t buflen, SockAddr from)
1905 {
1906     scheduler.syncTime();
1907     if (buflen) {
1908         try {
1909             network_engine.processMessage(buf, buflen, std::move(from));
1910         } catch (const std::exception& e) {
1911             DHT_LOG.e("Can't process message: %s", e.what());
1912         }
1913     }
1914     return scheduler.run();
1915 }
1916 
1917 void
expire()1918 Dht::expire()
1919 {
1920     uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
1921     auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
1922 
1923     expireBuckets(buckets4);
1924     expireBuckets(buckets6);
1925     expireStore();
1926     expireSearches();
1927     scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
1928 }
1929 
1930 void
confirmNodes()1931 Dht::confirmNodes()
1932 {
1933     using namespace std::chrono;
1934     bool soon = false;
1935     const auto& now = scheduler.time();
1936 
1937     if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) {
1938         DHT_LOG.d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str());
1939         search(myid, AF_INET);
1940     }
1941     if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) {
1942         DHT_LOG.d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str());
1943         search(myid, AF_INET6);
1944     }
1945 
1946     soon |= bucketMaintenance(buckets4);
1947     soon |= bucketMaintenance(buckets6);
1948 
1949     if (!soon) {
1950         if (buckets4.grow_time >= now - seconds(150))
1951             soon |= neighbourhoodMaintenance(buckets4);
1952         if (buckets6.grow_time >= now - seconds(150))
1953             soon |= neighbourhoodMaintenance(buckets6);
1954     }
1955 
1956     /* In order to maintain all buckets' age within 600 seconds, worst
1957        case is roughly 27 seconds, assuming the table is 22 bits deep.
1958        We want to keep a margin for neighborhood maintenance, so keep
1959        this within 25 seconds. */
1960     auto time_dis = soon
1961         ? uniform_duration_distribution<> {seconds(5) , seconds(25)}
1962         : uniform_duration_distribution<> {seconds(60), seconds(180)};
1963     auto confirm_nodes_time = now + time_dis(rd);
1964 
1965     scheduler.edit(nextNodesConfirmation, confirm_nodes_time);
1966 }
1967 
1968 std::vector<ValuesExport>
exportValues() const1969 Dht::exportValues() const
1970 {
1971     std::vector<ValuesExport> e {};
1972     e.reserve(store.size());
1973     for (const auto& h : store) {
1974         ValuesExport ve;
1975         ve.first = h.first;
1976 
1977         msgpack::sbuffer buffer;
1978         msgpack::packer<msgpack::sbuffer> pk(&buffer);
1979         const auto& vals = h.second.getValues();
1980         pk.pack_array(vals.size());
1981         for (const auto& v : vals) {
1982             pk.pack_array(2);
1983             pk.pack(v.created.time_since_epoch().count());
1984             v.data->msgpack_pack(pk);
1985         }
1986         ve.second = {buffer.data(), buffer.data()+buffer.size()};
1987         e.push_back(std::move(ve));
1988     }
1989     return e;
1990 }
1991 
1992 void
importValues(const std::vector<ValuesExport> & import)1993 Dht::importValues(const std::vector<ValuesExport>& import)
1994 {
1995     const auto& now = scheduler.time();
1996 
1997     for (const auto& node : import) {
1998         if (node.second.empty())
1999             continue;
2000 
2001         try {
2002             msgpack::unpacked msg;
2003             msgpack::unpack(msg, (const char*)node.second.data(), node.second.size());
2004             auto valarr = msg.get();
2005             if (valarr.type != msgpack::type::ARRAY)
2006                 throw msgpack::type_error();
2007             for (unsigned i = 0; i < valarr.via.array.size; i++) {
2008                 auto& valel = valarr.via.array.ptr[i];
2009                 if (valel.type != msgpack::type::ARRAY or valel.via.array.size < 2)
2010                     throw msgpack::type_error();
2011                 time_point val_time;
2012                 Value tmp_val;
2013                 try {
2014                     val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
2015                     tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
2016                 } catch (const std::exception&) {
2017                     DHT_LOG.e(node.first, "Error reading value at %s", node.first.toString().c_str());
2018                     continue;
2019                 }
2020                 val_time = std::min(val_time, now);
2021                 storageStore(node.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
2022             }
2023         } catch (const std::exception&) {
2024             DHT_LOG.e(node.first, "Error reading values at %s", node.first.toString().c_str());
2025             continue;
2026         }
2027     }
2028 }
2029 
2030 
2031 std::vector<NodeExport>
exportNodes() const2032 Dht::exportNodes() const
2033 {
2034     const auto& now = scheduler.time();
2035     std::vector<NodeExport> nodes;
2036     const auto b4 = buckets4.findBucket(myid);
2037     if (b4 != buckets4.end()) {
2038         for (auto& n : b4->nodes)
2039             if (n->isGood(now))
2040                 nodes.push_back(n->exportNode());
2041     }
2042     const auto b6 = buckets6.findBucket(myid);
2043     if (b6 != buckets6.end()) {
2044         for (auto& n : b6->nodes)
2045             if (n->isGood(now))
2046                 nodes.push_back(n->exportNode());
2047     }
2048     for (auto b = buckets4.begin(); b != buckets4.end(); ++b) {
2049         if (b == b4) continue;
2050         for (auto& n : b->nodes)
2051             if (n->isGood(now))
2052                 nodes.push_back(n->exportNode());
2053     }
2054     for (auto b = buckets6.begin(); b != buckets6.end(); ++b) {
2055         if (b == b6) continue;
2056         for (auto& n : b->nodes)
2057             if (n->isGood(now))
2058                 nodes.push_back(n->exportNode());
2059     }
2060     return nodes;
2061 }
2062 
2063 void
insertNode(const InfoHash & id,const SockAddr & addr)2064 Dht::insertNode(const InfoHash& id, const SockAddr& addr)
2065 {
2066     if (addr.getFamily() != AF_INET && addr.getFamily() != AF_INET6)
2067         return;
2068     scheduler.syncTime();
2069     network_engine.insertNode(id, addr);
2070 }
2071 
2072 void
pingNode(SockAddr sa,DoneCallbackSimple && cb)2073 Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb)
2074 {
2075     scheduler.syncTime();
2076     DHT_LOG.d("Sending ping to %s", sa.toString().c_str());
2077     auto& count = sa.getFamily() == AF_INET ? pending_pings4 : pending_pings6;
2078     count++;
2079     network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) {
2080         count--;
2081         if (cb)
2082             cb(true);
2083     }, [&count,cb](const net::Request&, bool last){
2084         if (last) {
2085             count--;
2086             if (cb)
2087                 cb(false);
2088         }
2089     });
2090 }
2091 
2092 void
onError(Sp<net::Request> req,net::DhtProtocolException e)2093 Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) {
2094     const auto& node = req->node;
2095     if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) {
2096         DHT_LOG.e(node->id, "[node %s] token flush", node->toString().c_str());
2097         node->authError();
2098         node->cancelRequest(req);
2099         for (auto& srp : searches(node->getFamily())) {
2100             auto& sr = srp.second;
2101             for (auto& n : sr->nodes) {
2102                 if (n.node != node) continue;
2103                 n.token.clear();
2104                 n.last_get_reply = time_point::min();
2105                 searchSendGetValues(sr);
2106                 scheduler.edit(sr->nextSearchStep, scheduler.time());
2107                 break;
2108             }
2109         }
2110     } else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
2111         DHT_LOG.e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
2112         node->cancelRequest(req);
2113     }
2114 }
2115 
2116 void
onReportedAddr(const InfoHash &,const SockAddr & addr)2117 Dht::onReportedAddr(const InfoHash& /*id*/, const SockAddr& addr)
2118 {
2119     if (addr)
2120         reportedAddr(addr);
2121 }
2122 
2123 net::RequestAnswer
onPing(Sp<Node>)2124 Dht::onPing(Sp<Node>)
2125 {
2126     return {};
2127 }
2128 
2129 net::RequestAnswer
onFindNode(Sp<Node> node,const InfoHash & target,want_t want)2130 Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want)
2131 {
2132     const auto& now = scheduler.time();
2133     net::RequestAnswer answer;
2134     answer.ntoken = makeToken(node->getAddr(), false);
2135     if (want & WANT4)
2136         answer.nodes4 = buckets4.findClosestNodes(target, now, TARGET_NODES);
2137     if (want & WANT6)
2138         answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES);
2139     return answer;
2140 }
2141 
2142 net::RequestAnswer
onGetValues(Sp<Node> node,const InfoHash & hash,want_t,const Query & query)2143 Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query)
2144 {
2145     if (not hash) {
2146         DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str());
2147         throw net::DhtProtocolException {
2148             net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2149             net::DhtProtocolException::GET_NO_INFOHASH
2150         };
2151     }
2152     const auto& now = scheduler.time();
2153     net::RequestAnswer answer {};
2154     auto st = store.find(hash);
2155     answer.ntoken = makeToken(node->getAddr(), false);
2156     answer.nodes4 = buckets4.findClosestNodes(hash, now, TARGET_NODES);
2157     answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
2158     if (st != store.end() && not st->second.empty()) {
2159         answer.values = st->second.get(query.where.getFilter());
2160         DHT_LOG.d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size());
2161     }
2162     return answer;
2163 }
2164 
onGetValuesDone(const Sp<Node> & node,net::RequestAnswer & a,Sp<Search> & sr,const Sp<Query> & orig_query)2165 void Dht::onGetValuesDone(const Sp<Node>& node,
2166         net::RequestAnswer& a,
2167         Sp<Search>& sr,
2168         const Sp<Query>& orig_query)
2169 {
2170     if (not sr) {
2171         DHT_LOG.w("[search unknown] got reply to 'get'. Ignoring.");
2172         return;
2173     }
2174 
2175     /*DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes",
2176             sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/
2177 
2178     if (not a.ntoken.empty()) {
2179         if (not a.values.empty() or not a.fields.empty()) {
2180             DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] found %u values",
2181                       sr->id.toString().c_str(), node->toString().c_str(), a.values.size());
2182             for (auto& getp : sr->callbacks) { /* call all callbacks for this search */
2183                 auto& get = getp.second;
2184                 if (not (get.get_cb or get.query_cb) or
2185                         (orig_query and get.query and not get.query->isSatisfiedBy(*orig_query)))
2186                     continue;
2187 
2188                 if (get.query_cb) { /* in case of a request with query */
2189                     if (not a.fields.empty()) {
2190                         get.query_cb(a.fields);
2191                     } else if (not a.values.empty()) {
2192                         std::vector<Sp<FieldValueIndex>> fields;
2193                         fields.reserve(a.values.size());
2194                         for (const auto& v : a.values)
2195                             fields.emplace_back(std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {}));
2196                         get.query_cb(fields);
2197                     }
2198                 } else if (get.get_cb) { /* in case of a vanilla get request */
2199                     std::vector<Sp<Value>> tmp;
2200                     for (const auto& v : a.values)
2201                         if (not get.filter or get.filter(*v))
2202                             tmp.emplace_back(v);
2203                     if (not tmp.empty())
2204                         get.get_cb(tmp);
2205                 }
2206             }
2207 
2208             /* callbacks for local search listeners */
2209             /*std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> tmp_lists;
2210             for (auto& l : sr->listeners) {
2211                 if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query)))
2212                     continue;
2213                 std::vector<Sp<Value>> tmp;
2214                 for (const auto& v : a.values)
2215                     if (not l.second.filter or l.second.filter(*v))
2216                         tmp.emplace_back(v);
2217                 if (not tmp.empty())
2218                     tmp_lists.emplace_back(l.second.get_cb, std::move(tmp));
2219             }
2220             for (auto& l : tmp_lists)
2221                 l.first(l.second, false);*/
2222         } else if (not a.expired_values.empty()) {
2223             DHT_LOG.w(sr->id, node->id, "[search %s] [node %s] %u expired values",
2224                       sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size());
2225         }
2226     } else {
2227         DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str());
2228         network_engine.blacklistNode(node);
2229     }
2230 
2231     if (not sr->done) {
2232         searchSendGetValues(sr);
2233 
2234         // Force to recompute the next step time
2235         scheduler.edit(sr->nextSearchStep, scheduler.time());
2236     }
2237 }
2238 
2239 net::RequestAnswer
onListen(Sp<Node> node,const InfoHash & hash,const Blob & token,size_t socket_id,const Query & query)2240 Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query)
2241 {
2242     if (not hash) {
2243         DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str());
2244         throw net::DhtProtocolException {
2245             net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2246             net::DhtProtocolException::LISTEN_NO_INFOHASH
2247         };
2248     }
2249     if (not tokenMatch(token, node->getAddr())) {
2250         DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str());
2251         throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN};
2252     }
2253     Query q = query;
2254     storageAddListener(hash, node, socket_id, std::move(q));
2255     return {};
2256 }
2257 
2258 void
onListenDone(const Sp<Node> &,net::RequestAnswer &,Sp<Search> & sr)2259 Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr)
2260 {
2261     // DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation",
2262     //            sr->id.toString().c_str(), node->toString().c_str(), answer.values.size());
2263 
2264     if (not sr->done) {
2265         const auto& now = scheduler.time();
2266         searchSendGetValues(sr);
2267         scheduler.edit(sr->nextSearchStep, now);
2268     }
2269 }
2270 
2271 net::RequestAnswer
onAnnounce(Sp<Node> n,const InfoHash & hash,const Blob & token,const std::vector<Sp<Value>> & values,const time_point & creation_date)2272 Dht::onAnnounce(Sp<Node> n,
2273         const InfoHash& hash,
2274         const Blob& token,
2275         const std::vector<Sp<Value>>& values,
2276         const time_point& creation_date)
2277 {
2278     auto& node = *n;
2279     if (not hash) {
2280         DHT_LOG.w(node.id, "put with no info_hash");
2281         throw net::DhtProtocolException {
2282             net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2283             net::DhtProtocolException::PUT_NO_INFOHASH
2284         };
2285     }
2286     if (!tokenMatch(token, node.getAddr())) {
2287         DHT_LOG.w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str());
2288         throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN};
2289     }
2290     {
2291         // We store a value only if we think we're part of the
2292         // SEARCH_NODES nodes around the target id.
2293         auto closest_nodes = buckets(node.getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
2294         if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
2295             DHT_LOG.w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str());
2296             return {};
2297         }
2298     }
2299 
2300     auto created = std::min(creation_date, scheduler.time());
2301     for (const auto& v : values) {
2302         if (v->id == Value::INVALID_ID) {
2303             DHT_LOG.w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str());
2304             throw net::DhtProtocolException {
2305                 net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2306                 net::DhtProtocolException::PUT_INVALID_ID
2307             };
2308         }
2309         auto lv = getLocalById(hash, v->id);
2310         Sp<Value> vc = v;
2311         if (lv) {
2312             if (*lv == *vc) {
2313                 storageRefresh(hash, v->id);
2314                 DHT_LOG.d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str());
2315             } else {
2316                 const auto& type = getType(lv->type);
2317                 if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) {
2318                     DHT_LOG.d(hash, node.id, "[store %s] editing %s",
2319                             hash.toString().c_str(), vc->toString().c_str());
2320                     storageStore(hash, vc, created, node.getAddr());
2321                 } else {
2322                     DHT_LOG.d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy",
2323                             hash.toString().c_str(), vc->toString().c_str());
2324                 }
2325             }
2326         } else {
2327             // Allow the value to be edited by the storage policy
2328             const auto& type = getType(vc->type);
2329             if (type.storePolicy(hash, vc, node.id, node.getAddr())) {
2330                 //DHT_LOG.d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str());
2331                 storageStore(hash, vc, created, node.getAddr());
2332             } else {
2333                 DHT_LOG.d(hash, node.id, "[store %s] rejecting storage of %s",
2334                         hash.toString().c_str(), vc->toString().c_str());
2335             }
2336         }
2337     }
2338     return {};
2339 }
2340 
2341 net::RequestAnswer
onRefresh(Sp<Node> node,const InfoHash & hash,const Blob & token,const Value::Id & vid)2342 Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid)
2343 {
2344     using namespace net;
2345 
2346     if (not tokenMatch(token, node->getAddr())) {
2347         DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str());
2348         throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
2349     }
2350     if (storageRefresh(hash, vid)) {
2351         DHT_LOG.d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str());
2352     } else {
2353         DHT_LOG.d(hash, node->id, "[store %s] [node %s] got refresh for unknown value",
2354                 hash.toString().c_str(), node->toString().c_str());
2355         throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND};
2356     }
2357     return {};
2358 }
2359 
2360 bool
storageRefresh(const InfoHash & id,Value::Id vid)2361 Dht::storageRefresh(const InfoHash& id, Value::Id vid)
2362 {
2363     const auto& now = scheduler.time();
2364     auto s = store.find(id);
2365     if (s != store.end()) {
2366         // Values like for a permanent put can be refreshed. So, inform remote listeners that the value
2367         // need to be refreshed
2368         auto& st = s->second;
2369         if (not st.listeners.empty()) {
2370             DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
2371             std::vector<Value::Id> ids = {vid};
2372             for (const auto& node_listeners : st.listeners) {
2373                 for (const auto& l : node_listeners.second) {
2374                     DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh",
2375                             id.toString().c_str(),
2376                             node_listeners.first->toString().c_str());
2377                     Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
2378                     network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids);
2379                 }
2380             }
2381         }
2382 
2383         auto expiration = s->second.refresh(now, vid, types);
2384         if (expiration != time_point::max())
2385             scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
2386         return true;
2387     }
2388     return false;
2389 }
2390 
2391 void
onAnnounceDone(const Sp<Node> & node,net::RequestAnswer & answer,Sp<Search> & sr)2392 Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr)
2393 {
2394     DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got reply to put!",
2395             sr->id.toString().c_str(), node->toString().c_str());
2396     searchSendGetValues(sr);
2397     sr->checkAnnounced(answer.vid);
2398 }
2399 
2400 
2401 void
saveState(const std::string & path) const2402 Dht::saveState(const std::string& path) const
2403 {
2404     std::ofstream file(path);
2405     msgpack::pack(file, exportNodes());
2406     msgpack::pack(file, exportValues());
2407 }
2408 
2409 void
loadState(const std::string & path)2410 Dht::loadState(const std::string& path)
2411 {
2412     DHT_LOG.d("Importing state from %s", path.c_str());
2413     try {
2414         // Import nodes from binary file
2415         msgpack::unpacker pac;
2416         {
2417             // Read whole file
2418             std::ifstream file(path, std::ios::binary|std::ios::ate);
2419             if (!file.is_open()) {
2420                 return;
2421             }
2422             auto size = file.tellg();
2423             file.seekg (0, std::ios::beg);
2424             pac.reserve_buffer(size);
2425             file.read (pac.buffer(), size);
2426             pac.buffer_consumed(size);
2427         }
2428         // Import nodes
2429         msgpack::object_handle oh;
2430         if (pac.next(oh)) {
2431             {
2432                 auto imported_nodes = oh.get().as<std::vector<NodeExport>>();
2433                 DHT_LOG.d("Importing %zu nodes", imported_nodes.size());
2434                 for (const auto& node : imported_nodes)
2435                     insertNode(node);
2436             }
2437             if (pac.next(oh)) {
2438                 auto imported_values = oh.get().as<std::vector<ValuesExport>>();
2439                 DHT_LOG.d("Importing %zu values", imported_values.size());
2440                 importValues(imported_values);
2441             }
2442         }
2443     } catch (const std::exception& e) {
2444         DHT_LOG.w("Error importing state from %s: %s", path.c_str(), e.what());
2445     }
2446 }
2447 
2448 }
2449