1 /*
2 * Copyright (C) 2014-2019 Savoir-faire Linux Inc.
3 * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20
21 #include "dht.h"
22 #include "rng.h"
23 #include "search.h"
24 #include "storage.h"
25 #include "request.h"
26
27 #include <msgpack.hpp>
28
29 #include <algorithm>
30 #include <random>
31 #include <sstream>
32 #include <fstream>
33
34 namespace dht {
35
36 using namespace std::placeholders;
37
38 constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
39 constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
40 constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME;
41 constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN;
42
43 NodeStatus
getStatus(sa_family_t af) const44 Dht::getStatus(sa_family_t af) const
45 {
46 const auto& stats = getNodesStats(af);
47 if (stats.good_nodes)
48 return NodeStatus::Connected;
49 auto& ping = af == AF_INET ? pending_pings4 : pending_pings6;
50 if (ping or stats.getKnownNodes())
51 return NodeStatus::Connecting;
52 return NodeStatus::Disconnected;
53 }
54
55 void
shutdown(ShutdownCallback cb)56 Dht::shutdown(ShutdownCallback cb)
57 {
58 if (not persistPath.empty())
59 saveState(persistPath);
60
61 if (not maintain_storage) {
62 if (cb) cb();
63 return;
64 }
65
66 // Last store maintenance
67 scheduler.syncTime();
68 auto remaining = std::make_shared<int>(0);
69 auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) {
70 --*remaining;
71 DHT_LOG.w("shuting down node: %u ops remaining", *remaining);
72 if (!*remaining && cb) { cb(); }
73 };
74
75 for (auto& str : store)
76 *remaining += maintainStorage(str, true, str_donecb);
77
78 if (!*remaining) {
79 DHT_LOG.w("shuting down node: %u ops remaining", *remaining);
80 if (cb)
81 cb();
82 }
83 }
84
85 bool
isRunning(sa_family_t af) const86 Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); }
87
88 /* Every bucket contains an unordered list of nodes. */
89 const Sp<Node>
findNode(const InfoHash & id,sa_family_t af) const90 Dht::findNode(const InfoHash& id, sa_family_t af) const
91 {
92 if (const Bucket* b = findBucket(id, af))
93 for (const auto& n : b->nodes)
94 if (n->id == id) return n;
95 return {};
96 }
97
98 /* Every bucket caches the address of a likely node. Ping it. */
99 void
sendCachedPing(Bucket & b)100 Dht::sendCachedPing(Bucket& b)
101 {
102 if (b.cached)
103 DHT_LOG.d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str());
104 b.sendCachedPing(network_engine);
105 }
106
107 std::vector<SockAddr>
getPublicAddress(sa_family_t family)108 Dht::getPublicAddress(sa_family_t family)
109 {
110 std::sort(reported_addr.begin(), reported_addr.end(), [](const ReportedAddr& a, const ReportedAddr& b) {
111 return a.first > b.first;
112 });
113 std::vector<SockAddr> ret;
114 ret.reserve(!family ? reported_addr.size() : reported_addr.size()/2);
115 for (const auto& addr : reported_addr)
116 if (!family || family == addr.second.getFamily())
117 ret.emplace_back(addr.second);
118 return ret;
119 }
120
121 bool
trySearchInsert(const Sp<Node> & node)122 Dht::trySearchInsert(const Sp<Node>& node)
123 {
124 const auto& now = scheduler.time();
125 if (not node) return false;
126
127 auto& srs = searches(node->getFamily());
128 auto closest = srs.lower_bound(node->id);
129 bool inserted {false};
130
131 // insert forward
132 for (auto it = closest; it != srs.end(); it++) {
133 auto& s = *it->second;
134 if (s.insertNode(node, now)) {
135 inserted = true;
136 scheduler.edit(s.nextSearchStep, now);
137 } else if (not s.expired and not s.done)
138 break;
139 }
140 // insert backward
141 for (auto it = closest; it != srs.begin();) {
142 --it;
143 auto& s = *it->second;
144 if (s.insertNode(node, now)) {
145 inserted = true;
146 scheduler.edit(s.nextSearchStep, now);
147 } else if (not s.expired and not s.done)
148 break;
149 }
150 return inserted;
151 }
152
153 void
reportedAddr(const SockAddr & addr)154 Dht::reportedAddr(const SockAddr& addr)
155 {
156 auto it = std::find_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& a){
157 return a.second == addr;
158 });
159 if (it == reported_addr.end()) {
160 if (reported_addr.size() < 32)
161 reported_addr.emplace_back(1, addr);
162 } else
163 it->first++;
164 }
165
166 /* We just learnt about a node, not necessarily a new one. Confirm is 1 if
167 the node sent a message, 2 if it sent us a reply. */
168 void
onNewNode(const Sp<Node> & node,int confirm)169 Dht::onNewNode(const Sp<Node>& node, int confirm)
170 {
171 const auto& now = scheduler.time();
172 auto& b = buckets(node->getFamily());
173 auto wasEmpty = confirm < 2 && b.grow_time < now - std::chrono::minutes(5);
174 if (b.onNewNode(node, confirm, now, myid, network_engine) or confirm) {
175 trySearchInsert(node);
176 if (wasEmpty) {
177 scheduler.edit(nextNodesConfirmation, now + std::chrono::seconds(1));
178 }
179 }
180 }
181
182 /* Called periodically to purge known-bad nodes. Note that we're very
183 conservative here: broken nodes in the table don't do much harm, we'll
184 recover as soon as we find better ones. */
185 void
expireBuckets(RoutingTable & list)186 Dht::expireBuckets(RoutingTable& list)
187 {
188 for (auto& b : list) {
189 bool changed = false;
190 b.nodes.remove_if([&changed](const Sp<Node>& n) {
191 if (n->isExpired()) {
192 changed = true;
193 return true;
194 }
195 return false;
196 });
197 if (changed)
198 sendCachedPing(b);
199 }
200 }
201
202 void
expireSearches()203 Dht::expireSearches()
204 {
205 auto t = scheduler.time() - SEARCH_EXPIRE_TIME;
206 auto expired = [&](std::pair<const InfoHash, Sp<Search>>& srp) {
207 auto& sr = *srp.second;
208 auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t;
209 if (b) {
210 DHT_LOG.d(srp.first, "[search %s] removing search", srp.first.toString().c_str());
211 sr.clear();
212 return b;
213 } else { return false; }
214 };
215 erase_if(searches4, expired);
216 erase_if(searches6, expired);
217 }
218
219 void
searchNodeGetDone(const net::Request & req,net::RequestAnswer && answer,std::weak_ptr<Search> ws,Sp<Query> query)220 Dht::searchNodeGetDone(const net::Request& req,
221 net::RequestAnswer&& answer,
222 std::weak_ptr<Search> ws,
223 Sp<Query> query)
224 {
225 const auto& now = scheduler.time();
226 if (auto sr = ws.lock()) {
227 sr->insertNode(req.node, now, answer.ntoken);
228 if (auto srn = sr->getNode(req.node)) {
229 /* all other get requests which are satisfied by this answer
230 should not be sent anymore */
231 for (auto& g : sr->callbacks) {
232 auto& q = g.second.query;
233 if (q->isSatisfiedBy(*query) and q != query) {
234 auto dummy_req = std::make_shared<net::Request>();
235 dummy_req->cancel();
236 srn->getStatus[q] = std::move(dummy_req);
237 }
238 }
239 auto syncTime = srn->getSyncTime(scheduler.time());
240 if (srn->syncJob)
241 scheduler.edit(srn->syncJob, syncTime);
242 else
243 srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr));
244 }
245 onGetValuesDone(req.node, answer, sr, query);
246 }
247 }
248
249 void
searchNodeGetExpired(const net::Request & status,bool over,std::weak_ptr<Search> ws,Sp<Query> query)250 Dht::searchNodeGetExpired(const net::Request& status,
251 bool over,
252 std::weak_ptr<Search> ws,
253 Sp<Query> query)
254 {
255 if (auto sr = ws.lock()) {
256 if (auto srn = sr->getNode(status.node)) {
257 srn->candidate = not over;
258 if (over)
259 srn->getStatus.erase(query);
260 }
261 scheduler.edit(sr->nextSearchStep, scheduler.time());
262 }
263 }
264
paginate(std::weak_ptr<Search> ws,Sp<Query> query,SearchNode * n)265 void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) {
266 auto sr = ws.lock();
267 if (not sr) return;
268 auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {});
269 auto onSelectDone = [this,ws,query](const net::Request& status,
270 net::RequestAnswer&& answer) mutable {
271 // Retrieve search
272 auto sr = ws.lock();
273 if (not sr) return;
274 const auto& id = sr->id;
275 // Retrieve search node
276 auto sn = sr->getNode(status.node);
277 if (not sn) return;
278 // backward compatibility
279 if (answer.fields.empty()) {
280 searchNodeGetDone(status, std::move(answer), ws, query);
281 return;
282 }
283 for (const auto& fvi : answer.fields) {
284 try {
285 auto vid = fvi->index.at(Value::Field::Id).getInt();
286 if (vid == Value::INVALID_ID) continue;
287 auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid));
288 sn->pagination_queries[query].push_back(query_for_vid);
289 DHT_LOG.d(id, sn->node->id, "[search %s] [node %s] sending %s",
290 id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str());
291 sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node,
292 id,
293 *query_for_vid,
294 -1,
295 std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
296 std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid)
297 );
298 } catch (const std::out_of_range&) {
299 DHT_LOG.e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\
300 "'SELECT id' request...",
301 id.toString().c_str(), sn->node->toString().c_str());
302 }
303 }
304 };
305 /* add pagination query key for tracking ongoing requests. */
306 n->pagination_queries[query].push_back(select_q);
307
308 DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending %s",
309 sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str());
310 n->getStatus[select_q] = network_engine.sendGetValues(n->node,
311 sr->id,
312 *select_q,
313 -1,
314 onSelectDone,
315 std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q)
316 );
317 }
318
319 Dht::SearchNode*
searchSendGetValues(Sp<Search> sr,SearchNode * pn,bool update)320 Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update)
321 {
322 if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES)
323 return nullptr;
324
325 const auto& now = scheduler.time();
326
327 std::weak_ptr<Search> ws = sr;
328 auto cb = sr->callbacks.begin();
329 static const auto ANY_QUERY = std::make_shared<Query>(Select {}, Where {}, true);
330 do { /* for all requests to send */
331 SearchNode* n = nullptr;
332 auto& query = sr->callbacks.empty() ? ANY_QUERY : cb->second.query;
333 const time_point up = (not sr->callbacks.empty() and update)
334 ? sr->getLastGetTime(*query)
335 : time_point::min();
336
337 if (pn and pn->canGet(now, up, query)) {
338 n = pn;
339 } else {
340 for (auto& sn : sr->nodes) {
341 if (sn.canGet(now, up, query)) {
342 n = &sn;
343 break;
344 }
345 }
346 }
347
348 if (sr->callbacks.empty()) { /* 'find_node' request */
349 if (not n)
350 return nullptr;
351
352 /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'",
353 sr->id.toString().c_str(), n->node->toString().c_str());*/
354 n->getStatus[query] = network_engine.sendFindNode(n->node,
355 sr->id,
356 -1,
357 std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
358 std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
359
360 } else { /* 'get' request */
361 if (not n)
362 continue;
363
364 if (query and not query->select.getSelection().empty()) {
365 /* The request contains a select. No need to paginate... */
366 /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'",
367 sr->id.toString().c_str(), n->node->toString().c_str());*/
368 n->getStatus[query] = network_engine.sendGetValues(n->node,
369 sr->id,
370 *query,
371 -1,
372 std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query),
373 std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query));
374 } else
375 paginate(ws, query, n);
376 }
377
378 /* We only try to send one request. return. */
379 return n;
380
381 } while (++cb != sr->callbacks.end());
382
383 /* no request were sent */
384 return nullptr;
385 }
386
searchSendAnnounceValue(const Sp<Search> & sr)387 void Dht::searchSendAnnounceValue(const Sp<Search>& sr) {
388 if (sr->announce.empty())
389 return;
390 unsigned i = 0;
391 std::weak_ptr<Search> ws = sr;
392
393 auto onDone = [this,ws](const net::Request& req, net::RequestAnswer&& answer)
394 { /* when put done */
395 if (auto sr = ws.lock()) {
396 onAnnounceDone(req.node, answer, sr);
397 searchStep(sr);
398 }
399 };
400
401 auto onExpired = [this,ws](const net::Request&, bool over)
402 { /* when put expired */
403 if (over)
404 if (auto sr = ws.lock())
405 scheduler.edit(sr->nextSearchStep, scheduler.time());
406 };
407
408 auto onSelectDone =
409 [this,ws,onDone,onExpired](const net::Request& req, net::RequestAnswer&& answer) mutable
410 { /* on probing done */
411 auto sr = ws.lock();
412 if (not sr) return;
413 const auto& now = scheduler.time();
414 sr->insertNode(req.node, scheduler.time(), answer.ntoken);
415 auto sn = sr->getNode(req.node);
416 if (not sn) return;
417
418 if (not sn->isSynced(now)) {
419 /* Search is now unsynced. Let's call searchStep to sync again. */
420 scheduler.edit(sr->nextSearchStep, now);
421 return;
422 }
423 for (auto& a : sr->announce) {
424 if (sn->getAnnounceTime(a.value->id) > now)
425 continue;
426 bool hasValue {false};
427 uint16_t seq_no = 0;
428 try {
429 const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(),
430 [&a](const Sp<FieldValueIndex>& i){
431 return i->index.at(Value::Field::Id).getInt() == a.value->id;
432 });
433 if (f != answer.fields.cend() and *f) {
434 hasValue = true;
435 seq_no = static_cast<uint16_t>((*f)->index.at(Value::Field::SeqNum).getInt());
436 }
437 } catch (std::out_of_range&) { }
438
439 auto next_refresh_time = now + getType(a.value->type).expiration;
440 /* only put the value if the node doesn't already have it */
441 if (not hasValue or seq_no < a.value->seq) {
442 DHT_LOG.d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
443 sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
444 auto created = a.permanent ? time_point::max() : a.created;
445 sn->acked[a.value->id] = {
446 network_engine.sendAnnounceValue(sn->node, sr->id, a.value, created, sn->token, onDone, onExpired),
447 next_refresh_time
448 };
449 } else if (hasValue and a.permanent) {
450 DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)",
451 sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
452 sn->acked[a.value->id] = {
453 network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, onExpired),
454 next_refresh_time
455 };
456 } else {
457 DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.",
458 sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id);
459 auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED);
460 ack_req->reply_time = now;
461 sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time);
462
463 /* step to clear announces */
464 scheduler.edit(sr->nextSearchStep, now);
465 }
466 if (a.permanent) {
467 scheduler.add(next_refresh_time - REANNOUNCE_MARGIN, [this,ws] {
468 if (auto sr = ws.lock()) {
469 searchStep(sr);
470 }
471 });
472 }
473 }
474 };
475
476 Sp<Query> probe_query {};
477 const auto& now = scheduler.time();
478 for (auto& n : sr->nodes) {
479 if (not n.isSynced(now))
480 continue;
481
482 const auto& gs = n.probe_query ? n.getStatus.find(n.probe_query) : n.getStatus.cend();
483 if (gs != n.getStatus.cend() and gs->second and gs->second->pending()) {
484 continue;
485 }
486
487 bool sendQuery = false;
488 for (auto& a : sr->announce) {
489 if (n.getAnnounceTime(a.value->id) <= now) {
490 if (a.permanent) {
491 sendQuery = true;
492 } else {
493 DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)",
494 sr->id.toString().c_str(), n.node->toString().c_str(), a.value->id);
495 n.acked[a.value->id] = {
496 network_engine.sendAnnounceValue(n.node, sr->id, a.value, a.created, n.token, onDone, onExpired),
497 now + getType(a.value->type).expiration
498 };
499 }
500 }
501 }
502
503 if (sendQuery) {
504 if (not probe_query)
505 probe_query = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum));
506 DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending %s",
507 sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str());
508 n.probe_query = probe_query;
509 n.getStatus[probe_query] = network_engine.sendGetValues(n.node,
510 sr->id,
511 *probe_query,
512 -1,
513 onSelectDone,
514 std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, probe_query));
515 }
516 if (not n.candidate and ++i == TARGET_NODES)
517 break;
518 }
519 }
520
521 void
searchSynchedNodeListen(const Sp<Search> & sr,SearchNode & n)522 Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n)
523 {
524 std::weak_ptr<Search> ws = sr;
525 for (const auto& l : sr->listeners) {
526 const auto& query = l.second.query;
527 auto list_token = l.first;
528 if (n.getListenTime(query) > scheduler.time())
529 continue;
530 // DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'",
531 // sr->id.toString().c_str(), n.node->toString().c_str());
532
533 auto r = n.listenStatus.find(query);
534 if (r == n.listenStatus.end()) {
535 r = n.listenStatus.emplace(query, SearchNode::CachedListenStatus{
536 [ws,list_token](const std::vector<Sp<Value>>& values, bool expired){
537 if (auto sr = ws.lock()) {
538 auto l = sr->listeners.find(list_token);
539 if (l != sr->listeners.end()) {
540 l->second.get_cb(values, expired);
541 }
542 }
543 }, [ws,list_token] (ListenSyncStatus status) {
544 if (auto sr = ws.lock()) {
545 auto l = sr->listeners.find(list_token);
546 if (l != sr->listeners.end()) {
547 l->second.sync_cb(status);
548 }
549 }
550 }
551 }).first;
552 auto node = n.node;
553 r->second.cacheExpirationJob = scheduler.add(time_point::max(), [this,ws,query,node]{
554 if (auto sr = ws.lock()) {
555 if (auto sn = sr->getNode(node)) {
556 sn->expireValues(query, scheduler);
557 }
558 }
559 });
560 }
561 auto prev_req = r != n.listenStatus.end() ? r->second.req : nullptr;
562 auto new_req = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req,
563 [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable
564 { /* on done */
565 if (auto sr = ws.lock()) {
566 scheduler.edit(sr->nextSearchStep, scheduler.time());
567 if (auto sn = sr->getNode(req.node)) {
568 scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr));
569 sn->onListenSynced(query);
570 }
571 onListenDone(req.node, answer, sr);
572 }
573 },
574 [this,ws,query](const net::Request& req, bool over) mutable
575 { /* on request expired */
576 if (auto sr = ws.lock()) {
577 scheduler.edit(sr->nextSearchStep, scheduler.time());
578 if (over)
579 if (auto sn = sr->getNode(req.node))
580 sn->listenStatus.erase(query);
581 }
582 },
583 [this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable
584 { /* on new values */
585 if (auto sr = ws.lock()) {
586 scheduler.edit(sr->nextSearchStep, scheduler.time());
587 sr->insertNode(node, scheduler.time(), answer.ntoken);
588 if (auto sn = sr->getNode(node)) {
589 sn->onValues(query, std::move(answer), types, scheduler);
590 }
591 }
592 }
593 );
594 // Here the request may have failed and the CachedListenStatus removed
595 r = n.listenStatus.find(query);
596 if (r != n.listenStatus.end()) {
597 r->second.req = new_req;
598 }
599 }
600 }
601
602 /* When a search is in progress, we periodically call search_step to send
603 further requests. */
604 void
searchStep(Sp<Search> sr)605 Dht::searchStep(Sp<Search> sr)
606 {
607 if (not sr or sr->expired or sr->done) return;
608
609 const auto& now = scheduler.time();
610 /*if (auto req_count = sr->currentlySolicitedNodeCount())
611 DHT_LOG.d(sr->id, "[search %s IPv%c] step (%d requests)",
612 sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count);*/
613 sr->step_time = now;
614
615 if (sr->refill_time + Node::NODE_EXPIRE_TIME < now and sr->nodes.size()-sr->getNumberOfBadNodes() < SEARCH_NODES)
616 refill(*sr);
617
618 /* Check if the first TARGET_NODES (8) live nodes have replied. */
619 if (sr->isSynced(now)) {
620 if (not (sr->callbacks.empty() and sr->announce.empty())) {
621 // search is synced but some (newer) get operations are not complete
622 // Call callbacks when done
623 std::vector<Get> completed_gets;
624 for (auto b = sr->callbacks.begin(); b != sr->callbacks.end();) {
625 if (sr->isDone(b->second)) {
626 sr->setDone(b->second);
627 completed_gets.emplace_back(std::move(b->second));
628 b = sr->callbacks.erase(b);
629 }
630 else
631 ++b;
632 }
633 // clear corresponding queries
634 for (const auto& get : completed_gets)
635 for (auto& sn : sr->nodes) {
636 sn.getStatus.erase(get.query);
637 sn.pagination_queries.erase(get.query);
638 }
639
640 /* clearing callbacks for announced values */
641 sr->checkAnnounced();
642
643 if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
644 sr->setDone();
645 }
646
647 // true if this node is part of the target nodes cluter.
648 /*bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0;
649
650 DHT_LOG_DBG("[search %s IPv%c] synced%s",
651 sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");*/
652
653 if (not sr->listeners.empty()) {
654 unsigned i = 0;
655 for (auto& n : sr->nodes) {
656 if (not n.isSynced(now))
657 continue;
658 searchSynchedNodeListen(sr, n);
659 if (not n.candidate and ++i == LISTEN_NODES)
660 break;
661 }
662 }
663
664 // Announce requests
665 searchSendAnnounceValue(sr);
666
667 if (sr->callbacks.empty() && sr->announce.empty() && sr->listeners.empty())
668 sr->setDone();
669 }
670
671 while (sr->currentlySolicitedNodeCount() < MAX_REQUESTED_SEARCH_NODES and searchSendGetValues(sr));
672
673 if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(),
674 static_cast<size_t>(SEARCH_MAX_BAD_NODES)))
675 {
676 DHT_LOG.w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6');
677 sr->expire();
678 connectivityChanged(sr->af);
679 }
680
681 /* dumpSearch(*sr, std::cout); */
682 }
683
refill(Dht::Search & sr)684 unsigned Dht::refill(Dht::Search& sr) {
685 const auto& now = scheduler.time();
686 sr.refill_time = now;
687 /* we search for up to SEARCH_NODES good nodes. */
688 auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES);
689
690 if (cached_nodes.empty()) {
691 DHT_LOG.e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search",
692 sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6');
693 return 0;
694 }
695
696 unsigned inserted = 0;
697 for (auto& i : cached_nodes) {
698 /* try to insert the nodes. Search::insertNode will know how many to insert. */
699 if (sr.insertNode(i, now))
700 ++inserted;
701 }
702 DHT_LOG.d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache",
703 sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted);
704 return inserted;
705 }
706
707
708 /* Start a search. */
709 Sp<Dht::Search>
search(const InfoHash & id,sa_family_t af,GetCallback gcb,QueryCallback qcb,DoneCallback dcb,Value::Filter f,const Sp<Query> & q)710 Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, const Sp<Query>& q)
711 {
712 if (!isRunning(af)) {
713 DHT_LOG.e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
714 if (dcb)
715 dcb(false, {});
716 return {};
717 }
718
719 auto& srs = searches(af);
720 const auto& srp = srs.find(id);
721 Sp<Search> sr {};
722
723 if (srp != srs.end()) {
724 sr = srp->second;
725 sr->done = false;
726 sr->expired = false;
727 } else {
728 if (searches4.size() + searches6.size() < MAX_SEARCHES) {
729 sr = std::make_shared<Search>();
730 srs.emplace(id, sr);
731 } else {
732 for (auto it = srs.begin(); it!=srs.end();) {
733 auto& s = *it->second;
734 if ((s.done or s.expired) and s.announce.empty() and s.listeners.empty()) {
735 sr = it->second;
736 break;
737 }
738 }
739 if (not sr) {
740 DHT_LOG.e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
741 return {};
742 }
743 }
744 sr->af = af;
745 sr->tid = search_id++;
746 sr->step_time = time_point::min();
747 sr->id = id;
748 sr->done = false;
749 sr->expired = false;
750 sr->nodes.clear();
751 sr->nodes.reserve(SEARCH_NODES+1);
752 sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr));
753 DHT_LOG.w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
754 if (search_id == 0)
755 search_id++;
756 }
757
758 sr->get(f, q, qcb, gcb, dcb, scheduler);
759 refill(*sr);
760
761 return sr;
762 }
763
764 void
announce(const InfoHash & id,sa_family_t af,Sp<Value> value,DoneCallback callback,time_point created,bool permanent)765 Dht::announce(const InfoHash& id,
766 sa_family_t af,
767 Sp<Value> value,
768 DoneCallback callback,
769 time_point created,
770 bool permanent)
771 {
772 auto& srs = searches(af);
773 auto srp = srs.find(id);
774 auto sr = srp == srs.end() ? search(id, af) : srp->second;
775 if (!sr) {
776 if (callback)
777 callback(false, {});
778 return;
779 }
780 sr->done = false;
781 sr->expired = false;
782 auto a_sr = std::find_if(sr->announce.begin(), sr->announce.end(), [&](const Announce& a){
783 return a.value->id == value->id;
784 });
785 if (a_sr == sr->announce.end()) {
786 sr->announce.emplace_back(Announce {permanent, value, created, callback});
787 for (auto& n : sr->nodes) {
788 n.probe_query.reset();
789 n.acked[value->id].first.reset();
790 }
791 } else {
792 a_sr->permanent = permanent;
793 a_sr->created = created;
794 if (a_sr->value != value) {
795 a_sr->value = value;
796 for (auto& n : sr->nodes) {
797 n.acked[value->id].first.reset();
798 n.probe_query.reset();
799 }
800 }
801 if (sr->isAnnounced(value->id)) {
802 if (a_sr->callback)
803 a_sr->callback(true, {});
804 a_sr->callback = {};
805 if (callback)
806 callback(true, {});
807 return;
808 } else {
809 if (a_sr->callback)
810 a_sr->callback(false, {});
811 a_sr->callback = callback;
812 }
813 }
814 scheduler.edit(sr->nextSearchStep, scheduler.time());
815 }
816
817 size_t
listenTo(const InfoHash & id,sa_family_t af,ValueCallback cb,Value::Filter f,const Sp<Query> & q)818 Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f, const Sp<Query>& q)
819 {
820 if (!isRunning(af))
821 return 0;
822 // DHT_LOG_ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now()));
823
824 //DHT_LOG_WARN("listenTo %s", id.toString().c_str());
825 auto& srs = searches(af);
826 auto srp = srs.find(id);
827 Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second;
828 if (!sr)
829 throw DhtException("Can't create search");
830 DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6');
831 return sr->listen(cb, f, q, scheduler);
832 }
833
834 size_t
listen(const InfoHash & id,ValueCallback cb,Value::Filter f,Where where)835 Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where)
836 {
837 scheduler.syncTime();
838
839 auto token = ++listener_token;
840 auto gcb = OpValueCache::cacheCallback(std::move(cb), [this, id, token]{
841 cancelListen(id, token);
842 });
843
844 auto query = std::make_shared<Query>(Select{}, std::move(where));
845 auto filter = f.chain(query->where.getFilter());
846 auto st = store.find(id);
847 if (st == store.end() && store.size() < MAX_HASHES)
848 st = store.emplace(id, scheduler.time() + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME).first;
849
850 size_t tokenlocal = 0;
851 if (st != store.end()) {
852 tokenlocal = st->second.listen(gcb, filter, query);
853 if (tokenlocal == 0)
854 return 0;
855 }
856
857 auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query);
858 auto token6 = token4 == 0 ? 0 : Dht::listenTo(id, AF_INET6, gcb, filter, query);
859 if (token6 == 0 && st != store.end()) {
860 st->second.cancelListen(tokenlocal);
861 return 0;
862 }
863
864 listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6));
865 return token;
866 }
867
868 bool
cancelListen(const InfoHash & id,size_t token)869 Dht::cancelListen(const InfoHash& id, size_t token)
870 {
871 scheduler.syncTime();
872
873 auto it = listeners.find(token);
874 if (it == listeners.end()) {
875 DHT_LOG.w(id, "Listen token not found: %d", token);
876 return false;
877 }
878 DHT_LOG.d(id, "cancelListen %s with token %d", id.toString().c_str(), token);
879 if (auto tokenlocal = std::get<0>(it->second)) {
880 auto st = store.find(id);
881 if (st != store.end())
882 st->second.cancelListen(tokenlocal);
883 }
884 auto searches_cancel_listen = [this,&id](std::map<InfoHash, Sp<Search>>& srs, size_t token) {
885 if (token) {
886 auto srp = srs.find(id);
887 if (srp != srs.end())
888 srp->second->cancelListen(token, scheduler);
889 }
890 };
891 searches_cancel_listen(searches4, std::get<1>(it->second));
892 searches_cancel_listen(searches6, std::get<2>(it->second));
893 listeners.erase(it);
894 return true;
895 }
896
897 struct OpStatus {
898 struct Status {
899 bool done {false};
900 bool ok {false};
Statusdht::OpStatus::Status901 Status(bool done=false, bool ok=false) : done(done), ok(ok) {}
902 };
903 Status status;
904 Status status4;
905 Status status6;
906 };
907
908 template <typename T>
909 struct GetStatus : public OpStatus {
910 T values;
911 std::vector<Sp<Node>> nodes;
912 };
913
914 void
put(const InfoHash & id,Sp<Value> val,DoneCallback callback,time_point created,bool permanent)915 Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point created, bool permanent)
916 {
917 if (not val) {
918 if (callback)
919 callback(false, {});
920 return;
921 }
922 if (val->id == Value::INVALID_ID) {
923 crypto::random_device rdev;
924 std::uniform_int_distribution<Value::Id> rand_id {};
925 val->id = rand_id(rdev);
926 }
927 scheduler.syncTime();
928 const auto& now = scheduler.time();
929 created = std::min(now, created);
930 storageStore(id, val, created, {}, permanent);
931
932 DHT_LOG.d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str());
933
934 auto op = std::make_shared<OpStatus>();
935 auto donecb = [callback](const std::vector<Sp<Node>>& nodes, OpStatus& op) {
936 // Callback as soon as the value is announced on one of the available networks
937 if (callback and not op.status.done and (op.status4.done && op.status6.done)) {
938 callback(op.status4.ok or op.status6.ok, nodes);
939 op.status.done = true;
940 }
941 };
942 announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) {
943 DHT_LOG.d(id, "Announce done IPv4 %d", ok4);
944 auto& o = *op;
945 o.status4 = {true, ok4};
946 donecb(nodes, o);
947 }, created, permanent);
948 announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) {
949 DHT_LOG.d(id, "Announce done IPv6 %d", ok6);
950 auto& o = *op;
951 o.status6 = {true, ok6};
952 donecb(nodes, o);
953 }, created, permanent);
954 }
955
956 template <typename T>
doneCallbackWrapper(DoneCallback dcb,const std::vector<Sp<Node>> & nodes,GetStatus<T> & op)957 void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, GetStatus<T>& op) {
958 if (op.status.done)
959 return;
960 op.nodes.insert(op.nodes.end(), nodes.begin(), nodes.end());
961 if (op.status.ok or (op.status4.done and op.status6.done)) {
962 bool ok = op.status.ok or op.status4.ok or op.status6.ok;
963 op.status.done = true;
964 if (dcb)
965 dcb(ok, op.nodes);
966 }
967 }
968
969 template <typename T, typename St, typename Cb, typename Av, typename Cv>
callbackWrapper(Cb get_cb,DoneCallback done_cb,const std::vector<Sp<T>> & values,Av add_values,Cv cache_values,GetStatus<St> & op)970 bool callbackWrapper(Cb get_cb, DoneCallback done_cb, const std::vector<Sp<T>>& values,
971 Av add_values, Cv cache_values, GetStatus<St>& op)
972 {
973 if (op.status.done)
974 return false;
975 auto newvals = add_values(values);
976 if (not newvals.empty()) {
977 op.status.ok = !get_cb(newvals);
978 cache_values(newvals);
979 }
980 doneCallbackWrapper(done_cb, {}, op);
981 return !op.status.ok;
982 }
983
984 void
get(const InfoHash & id,GetCallback getcb,DoneCallback donecb,Value::Filter && filter,Where && where)985 Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filter&& filter, Where&& where)
986 {
987 scheduler.syncTime();
988
989 auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>();
990 auto gcb = [getcb, donecb, op](const std::vector<Sp<Value>>& vals) {
991 auto& o = *op;
992 return callbackWrapper(getcb, donecb, vals, [&o](const std::vector<Sp<Value>>& values) {
993 std::vector<Sp<Value>> newvals {};
994 for (const auto& v : values) {
995 auto it = o.values.find(v->id);
996 if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) {
997 newvals.push_back(v);
998 }
999 }
1000 return newvals;
1001 }, [&o](const std::vector<Sp<Value>>& newvals) {
1002 for (const auto& v : newvals)
1003 o.values[v->id] = v;
1004 }, o);
1005 };
1006
1007 auto q = std::make_shared<Query>(Select {}, std::move(where));
1008 auto f = filter.chain(q->where.getFilter());
1009
1010 /* Try to answer this search locally. */
1011 gcb(getLocal(id, f));
1012
1013 Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1014 //DHT_LOG_WARN("DHT done IPv4");
1015 op->status4 = {true, ok};
1016 doneCallbackWrapper(donecb, nodes, *op);
1017 }, f, q);
1018 Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1019 //DHT_LOG_WARN("DHT done IPv6");
1020 op->status6 = {true, ok};
1021 doneCallbackWrapper(donecb, nodes, *op);
1022 }, f, q);
1023 }
1024
query(const InfoHash & id,QueryCallback cb,DoneCallback done_cb,Query && q)1025 void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Query&& q)
1026 {
1027 scheduler.syncTime();
1028 auto op = std::make_shared<GetStatus<std::vector<Sp<FieldValueIndex>>>>();
1029 auto f = q.where.getFilter();
1030 auto qcb = [cb, done_cb, op](const std::vector<Sp<FieldValueIndex>>& fields){
1031 auto& o = *op;
1032 return callbackWrapper(cb, done_cb, fields, [&](const std::vector<Sp<FieldValueIndex>>& fields) {
1033 std::vector<Sp<FieldValueIndex>> newvals {};
1034 for (const auto& f : fields) {
1035 auto it = std::find_if(o.values.cbegin(), o.values.cend(),
1036 [&](const Sp<FieldValueIndex>& sf) {
1037 return sf == f or f->containedIn(*sf);
1038 });
1039 if (it == o.values.cend()) {
1040 auto lesser = std::find_if(o.values.begin(), o.values.end(),
1041 [&](const Sp<FieldValueIndex>& sf) {
1042 return sf->containedIn(*f);
1043 });
1044 if (lesser != o.values.end())
1045 o.values.erase(lesser);
1046 newvals.push_back(f);
1047 }
1048 }
1049 return newvals;
1050 }, [&](const std::vector<Sp<FieldValueIndex>>& fields){
1051 o.values.insert(o.values.end(), fields.begin(), fields.end());
1052 }, o);
1053 };
1054
1055 /* Try to answer this search locally. */
1056 auto values = getLocal(id, f);
1057 std::vector<Sp<FieldValueIndex>> local_fields(values.size());
1058 std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) {
1059 return std::make_shared<FieldValueIndex>(*v, q.select);
1060 });
1061 qcb(local_fields);
1062
1063 auto sq = std::make_shared<Query>(std::move(q));
1064 Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1065 //DHT_LOG_WARN("DHT done IPv4");
1066 op->status4 = {true, ok};
1067 doneCallbackWrapper(done_cb, nodes, *op);
1068 }, f, sq);
1069 Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) {
1070 //DHT_LOG_WARN("DHT done IPv6");
1071 op->status6 = {true, ok};
1072 doneCallbackWrapper(done_cb, nodes, *op);
1073 }, f, sq);
1074 }
1075
1076 std::vector<Sp<Value>>
getLocal(const InfoHash & id,const Value::Filter & f) const1077 Dht::getLocal(const InfoHash& id, const Value::Filter& f) const
1078 {
1079 auto s = store.find(id);
1080 if (s == store.end()) return {};
1081 return s->second.get(f);
1082 }
1083
1084 Sp<Value>
getLocalById(const InfoHash & id,Value::Id vid) const1085 Dht::getLocalById(const InfoHash& id, Value::Id vid) const
1086 {
1087 auto s = store.find(id);
1088 if (s != store.end())
1089 return s->second.getById(vid);
1090 return {};
1091 }
1092
1093 std::vector<Sp<Value>>
getPut(const InfoHash & id) const1094 Dht::getPut(const InfoHash& id) const
1095 {
1096 std::vector<Sp<Value>> ret;
1097 auto find_values = [&](const std::map<InfoHash, Sp<Search>>& srs) {
1098 auto srp = srs.find(id);
1099 if (srp == srs.end()) return;
1100 auto vals = srp->second->getPut();
1101 ret.insert(ret.end(), vals.begin(), vals.end());
1102 };
1103 find_values(searches4);
1104 find_values(searches6);
1105 return ret;
1106 }
1107
1108 Sp<Value>
getPut(const InfoHash & id,const Value::Id & vid) const1109 Dht::getPut(const InfoHash& id, const Value::Id& vid) const
1110 {
1111 auto find_value = [&](const std::map<InfoHash, Sp<Search>>& srs) {
1112 auto srp = srs.find(id);
1113 return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {};
1114 };
1115 if (auto v4 = find_value(searches4))
1116 return v4;
1117 if (auto v6 = find_value(searches6))
1118 return v6;
1119 return {};
1120 }
1121
1122 bool
cancelPut(const InfoHash & id,const Value::Id & vid)1123 Dht::cancelPut(const InfoHash& id, const Value::Id& vid)
1124 {
1125 bool canceled {false};
1126 auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>>& srs) {
1127 auto srp = srs.find(id);
1128 return (srp != srs.end()) ? srp->second->cancelPut(vid) : false;
1129 };
1130 canceled |= sr_cancel_put(searches4);
1131 canceled |= sr_cancel_put(searches6);
1132 if (canceled)
1133 storageErase(id, vid);
1134 return canceled;
1135 }
1136
1137 // Storage
1138
1139 void
storageChanged(const InfoHash & id,Storage & st,ValueStorage & v,bool newValue)1140 Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newValue)
1141 {
1142 if (newValue) {
1143 if (not st.local_listeners.empty()) {
1144 DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size());
1145 std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs;
1146 cbs.reserve(st.local_listeners.size());
1147 for (const auto& l : st.local_listeners) {
1148 std::vector<Sp<Value>> vals;
1149 if (not l.second.filter or l.second.filter(*v.data))
1150 vals.push_back(v.data);
1151 if (not vals.empty()) {
1152 DHT_LOG.d(id, "[store %s] sending update local listener with token %lu",
1153 id.toString().c_str(),
1154 l.first);
1155 cbs.emplace_back(l.second.get_cb, std::move(vals));
1156 }
1157 }
1158 // listeners are copied: they may be deleted by the callback
1159 for (auto& cb : cbs)
1160 cb.first(cb.second, false);
1161 }
1162 }
1163
1164 if (not st.listeners.empty()) {
1165 DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
1166 for (const auto& node_listeners : st.listeners) {
1167 for (const auto& l : node_listeners.second) {
1168 auto f = l.second.query.where.getFilter();
1169 if (f and not f(*v.data))
1170 continue;
1171 DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update",
1172 id.toString().c_str(),
1173 node_listeners.first->toString().c_str());
1174 std::vector<Sp<Value>> vals {};
1175 vals.push_back(v.data);
1176 Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
1177 network_engine.tellListener(node_listeners.first, l.first, id, 0, ntoken, {}, {},
1178 std::move(vals), l.second.query);
1179 }
1180 }
1181 }
1182 }
1183
1184 bool
storageStore(const InfoHash & id,const Sp<Value> & value,time_point created,const SockAddr & sa,bool permanent)1185 Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa, bool permanent)
1186 {
1187 const auto& now = scheduler.time();
1188 created = std::min(created, now);
1189 auto expiration = permanent ? time_point::max() : created + getType(value->type).expiration;
1190 if (expiration < now)
1191 return false;
1192
1193 auto st = store.find(id);
1194 if (st == store.end()) {
1195 if (store.size() >= MAX_HASHES)
1196 return false;
1197 auto st_i = store.emplace(id, now);
1198 st = st_i.first;
1199 if (maintain_storage and st_i.second)
1200 scheduler.add(st->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
1201 }
1202
1203 StorageBucket* store_bucket {nullptr};
1204 if (sa)
1205 store_bucket = &store_quota[sa];
1206
1207 auto store = st->second.store(id, value, created, expiration, store_bucket);
1208 if (auto vs = store.first) {
1209 total_store_size += store.second.size_diff;
1210 total_values += store.second.values_diff;
1211 if (not permanent) {
1212 scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
1213 }
1214 if (total_store_size > max_store_size) {
1215 expireStore();
1216 }
1217 storageChanged(id, st->second, *vs, store.second.values_diff > 0);
1218 }
1219
1220 return std::get<0>(store);
1221 }
1222
1223 bool
storageErase(const InfoHash & id,Value::Id vid)1224 Dht::storageErase(const InfoHash& id, Value::Id vid)
1225 {
1226 auto st = store.find(id);
1227 if (st == store.end())
1228 return false;
1229 auto ret = st->second.remove(id, vid);
1230 total_store_size += ret.size_diff;
1231 total_values += ret.values_diff;
1232 return ret.values_diff;
1233 }
1234
1235 void
storageAddListener(const InfoHash & id,const Sp<Node> & node,size_t socket_id,Query && query)1236 Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query)
1237 {
1238 const auto& now = scheduler.time();
1239 auto st = store.find(id);
1240 if (st == store.end()) {
1241 if (store.size() >= MAX_HASHES)
1242 return;
1243 st = store.emplace(id, now).first;
1244 }
1245 auto& node_listeners = st->second.listeners[node];
1246 auto l = node_listeners.find(socket_id);
1247 if (l == node_listeners.end()) {
1248 auto vals = st->second.get(query.where.getFilter());
1249 if (not vals.empty()) {
1250 network_engine.tellListener(node, socket_id, id, WANT4 | WANT6, makeToken(node->getAddr(), false),
1251 buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES),
1252 std::move(vals), query);
1253 }
1254 node_listeners.emplace(socket_id, Listener {now, std::forward<Query>(query)});
1255 }
1256 else
1257 l->second.refresh(now, std::forward<Query>(query));
1258 }
1259
1260 void
expireStore(decltype(store)::iterator i)1261 Dht::expireStore(decltype(store)::iterator i)
1262 {
1263 const auto& id = i->first;
1264 auto& st = i->second;
1265 auto stats = st.expire(id, scheduler.time());
1266 total_store_size += stats.first;
1267 total_values -= stats.second.size();
1268 if (not stats.second.empty()) {
1269 DHT_LOG.d(id, "[store %s] discarded %ld expired values (%ld bytes)",
1270 id.toString().c_str(), stats.second.size(), -stats.first);
1271
1272 if (not st.listeners.empty()) {
1273 DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
1274
1275 std::vector<Value::Id> ids;
1276 ids.reserve(stats.second.size());
1277 for (const auto& v : stats.second)
1278 ids.emplace_back(v->id);
1279
1280 for (const auto& node_listeners : st.listeners) {
1281 for (const auto& l : node_listeners.second) {
1282 DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending expired",
1283 id.toString().c_str(),
1284 node_listeners.first->toString().c_str());
1285 Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
1286 network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids);
1287 }
1288 }
1289 }
1290 for (const auto& local_listeners : st.local_listeners) {
1291 local_listeners.second.get_cb(stats.second, true);
1292 }
1293 }
1294 }
1295
1296 void
expireStorage(InfoHash h)1297 Dht::expireStorage(InfoHash h)
1298 {
1299 auto i = store.find(h);
1300 if (i != store.end())
1301 expireStore(i);
1302 }
1303
1304 void
expireStore()1305 Dht::expireStore()
1306 {
1307 // removing expired values
1308 for (auto i = store.begin(); i != store.end();) {
1309 expireStore(i);
1310
1311 if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) {
1312 DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str());
1313 i = store.erase(i);
1314 }
1315 else
1316 ++i;
1317 }
1318
1319 // remove more values if storage limit is exceeded
1320 while (total_store_size > max_store_size) {
1321 // find IP using the most storage
1322 if (store_quota.empty()) {
1323 DHT_LOG.w("No space left: local data consumes all the quota!");
1324 break;
1325 }
1326 auto largest = store_quota.begin();
1327 for (auto it = ++largest; it != store_quota.end(); ++it) {
1328 if (it->second.size() > largest->second.size())
1329 largest = it;
1330 }
1331 DHT_LOG.w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str());
1332 while (true) {
1333 auto exp_value = largest->second.getOldest();
1334 auto storage = store.find(exp_value.first);
1335 if (storage != store.end()) {
1336 auto ret = storage->second.remove(exp_value.first, exp_value.second);
1337 total_store_size += ret.size_diff;
1338 total_values += ret.values_diff;
1339 DHT_LOG.w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size);
1340 if (ret.values_diff)
1341 break;
1342 }
1343 }
1344 }
1345
1346 // remove unused quota entires
1347 for (auto i = store_quota.begin(); i != store_quota.end();) {
1348 if (i->second.size() == 0)
1349 i = store_quota.erase(i);
1350 else
1351 ++i;
1352 }
1353 }
1354
1355 void
connectivityChanged(sa_family_t af)1356 Dht::connectivityChanged(sa_family_t af)
1357 {
1358 const auto& now = scheduler.time();
1359 scheduler.edit(nextNodesConfirmation, now);
1360 buckets(af).connectivityChanged(now);
1361 network_engine.connectivityChanged(af);
1362 reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){
1363 return addr.second.getFamily() == af;
1364 }), reported_addr.end());
1365 }
1366
1367 void
rotateSecrets()1368 Dht::rotateSecrets()
1369 {
1370 oldsecret = secret;
1371 {
1372 crypto::random_device rdev;
1373 secret = std::uniform_int_distribution<uint64_t>{}(rdev);
1374 }
1375 uniform_duration_distribution<> time_dist(std::chrono::minutes(15), std::chrono::minutes(45));
1376 auto rotate_secrets_time = scheduler.time() + time_dist(rd);
1377 scheduler.add(rotate_secrets_time, std::bind(&Dht::rotateSecrets, this));
1378 }
1379
1380 Blob
makeToken(const SockAddr & addr,bool old) const1381 Dht::makeToken(const SockAddr& addr, bool old) const
1382 {
1383 const void *ip;
1384 size_t iplen;
1385 in_port_t port;
1386
1387 auto family = addr.getFamily();
1388 if (family == AF_INET) {
1389 const auto& sin = addr.getIPv4();
1390 ip = &sin.sin_addr;
1391 iplen = 4;
1392 port = sin.sin_port;
1393 } else if (family == AF_INET6) {
1394 const auto& sin6 = addr.getIPv6();
1395 ip = &sin6.sin6_addr;
1396 iplen = 16;
1397 port = sin6.sin6_port;
1398 } else {
1399 return {};
1400 }
1401
1402 const auto& c1 = old ? oldsecret : secret;
1403 Blob data;
1404 data.reserve(sizeof(secret)+sizeof(in_port_t)+iplen);
1405 data.insert(data.end(), (uint8_t*)&c1, ((uint8_t*)&c1) + sizeof(c1));
1406 data.insert(data.end(), (uint8_t*)ip, (uint8_t*)ip+iplen);
1407 data.insert(data.end(), (uint8_t*)&port, ((uint8_t*)&port)+sizeof(in_port_t));
1408 return crypto::hash(data, TOKEN_SIZE);
1409 }
1410
1411 bool
tokenMatch(const Blob & token,const SockAddr & addr) const1412 Dht::tokenMatch(const Blob& token, const SockAddr& addr) const
1413 {
1414 if (not addr or token.size() != TOKEN_SIZE)
1415 return false;
1416 if (token == makeToken(addr, false))
1417 return true;
1418 if (token == makeToken(addr, true))
1419 return true;
1420 return false;
1421 }
1422
1423 NodeStats
getNodesStats(sa_family_t af) const1424 Dht::getNodesStats(sa_family_t af) const
1425 {
1426 NodeStats stats {};
1427 const auto& now = scheduler.time();
1428 const auto& bcks = buckets(af);
1429 for (const auto& b : bcks) {
1430 for (auto& n : b.nodes) {
1431 if (n->isGood(now)) {
1432 stats.good_nodes++;
1433 if (n->isIncoming())
1434 stats.incoming_nodes++;
1435 } else if (not n->isExpired())
1436 stats.dubious_nodes++;
1437 }
1438 if (b.cached)
1439 stats.cached_nodes++;
1440 }
1441 stats.table_depth = bcks.depth(bcks.findBucket(myid));
1442 return stats;
1443 }
1444
1445 void
dumpBucket(const Bucket & b,std::ostream & out) const1446 Dht::dumpBucket(const Bucket& b, std::ostream& out) const
1447 {
1448 const auto& now = scheduler.time();
1449 using namespace std::chrono;
1450 out << b.first << " count " << b.nodes.size() << " age " << duration_cast<seconds>(now - b.time).count() << " sec";
1451 if (b.cached)
1452 out << " (cached)";
1453 out << std::endl;
1454 for (auto& n : b.nodes) {
1455 out << " Node " << n->toString();
1456 const auto& t = n->getTime();
1457 const auto& r = n->getReplyTime();
1458 if (t != r)
1459 out << " age " << duration_cast<seconds>(now - t).count() << ", reply: " << duration_cast<seconds>(now - r).count();
1460 else
1461 out << " age " << duration_cast<seconds>(now - t).count();
1462 if (n->isExpired())
1463 out << " [expired]";
1464 else if (n->isGood(now))
1465 out << " [good]";
1466 out << std::endl;
1467 }
1468 }
1469
1470 void
dumpSearch(const Search & sr,std::ostream & out) const1471 Dht::dumpSearch(const Search& sr, std::ostream& out) const
1472 {
1473 const auto& now = scheduler.time();
1474 using namespace std::chrono;
1475 out << std::endl << "Search IPv" << (sr.af == AF_INET6 ? '6' : '4') << ' ' << sr.id << " gets: " << sr.callbacks.size();
1476 out << ", age: " << duration_cast<seconds>(now - sr.step_time).count() << " s";
1477 if (sr.done)
1478 out << " [done]";
1479 if (sr.expired)
1480 out << " [expired]";
1481 bool synced = sr.isSynced(now);
1482 out << (synced ? " [synced]" : " [not synced]");
1483 if (synced && sr.isListening(now))
1484 out << " [listening]";
1485 out << std::endl;
1486
1487 /*printing the queries*/
1488 if (sr.callbacks.size() + sr.listeners.size() > 0)
1489 out << "Queries:" << std::endl;
1490 for (const auto& cb : sr.callbacks) {
1491 out << *cb.second.query << std::endl;
1492 }
1493 for (const auto& l : sr.listeners) {
1494 out << *l.second.query << std::endl;
1495 }
1496
1497 for (const auto& a : sr.announce) {
1498 bool announced = sr.isAnnounced(a.value->id);
1499 out << "Announcement: " << *a.value << (announced ? " [announced]" : "") << std::endl;
1500 }
1501
1502 out << " Common bits InfoHash Conn. Get Ops IP" << std::endl;
1503 unsigned i = 0;
1504 auto last_get = sr.getLastGetTime();
1505 for (const auto& n : sr.nodes) {
1506 i++;
1507 out << std::setfill (' ') << std::setw(3) << InfoHash::commonBits(sr.id, n.node->id) << ' ' << n.node->id;
1508 out << ' ' << (findNode(n.node->id, sr.af) ? '*' : ' ');
1509 out << " [";
1510 if (auto pendingCount = n.node->getPendingMessageCount())
1511 out << pendingCount;
1512 else
1513 out << ' ';
1514 out << (n.node->isExpired() ? 'x' : ' ') << "]";
1515
1516 // Get status
1517 {
1518 char g_i = n.pending(n.getStatus) ? (n.candidate ? 'c' : 'f') : ' ';
1519 char s_i = n.isSynced(now) ? (n.last_get_reply > last_get ? 'u' : 's') : '-';
1520 out << " [" << s_i << g_i << "] ";
1521 }
1522
1523 // Listen status
1524 if (not sr.listeners.empty()) {
1525 if (n.listenStatus.empty())
1526 out << " ";
1527 else
1528 out << "["
1529 << (n.isListening(now) ? 'l' : (n.pending(n.listenStatus) ? 'f' : ' ')) << "] ";
1530 }
1531
1532 // Announce status
1533 if (not sr.announce.empty()) {
1534 if (n.acked.empty()) {
1535 out << " ";
1536 for (size_t a=0; a < sr.announce.size(); a++)
1537 out << ' ';
1538 } else {
1539 out << "[";
1540 for (const auto& a : sr.announce) {
1541 auto ack = n.acked.find(a.value->id);
1542 if (ack == n.acked.end() or not ack->second.first) {
1543 out << ' ';
1544 } else {
1545 out << ack->second.first->getStateChar();
1546 }
1547 }
1548 out << "] ";
1549 }
1550 }
1551 out << n.node->getAddrStr() << std::endl;
1552 }
1553 }
1554
1555 void
dumpTables() const1556 Dht::dumpTables() const
1557 {
1558 std::stringstream out;
1559 out << "My id " << myid << std::endl;
1560
1561 out << "Buckets IPv4 :" << std::endl;
1562 for (const auto& b : buckets4)
1563 dumpBucket(b, out);
1564 out << "Buckets IPv6 :" << std::endl;
1565 for (const auto& b : buckets6)
1566 dumpBucket(b, out);
1567
1568 auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) {
1569 for (auto& srp : srs)
1570 dumpSearch(*srp.second, out);
1571 };
1572 dump_searches(searches4);
1573 dump_searches(searches6);
1574 out << std::endl;
1575
1576 out << getStorageLog() << std::endl;
1577
1578 DHT_LOG.d("%s", out.str().c_str());
1579 }
1580
1581 std::string
getStorageLog() const1582 Dht::getStorageLog() const
1583 {
1584 std::stringstream out;
1585 for (const auto& s : store)
1586 out << printStorageLog(s);
1587 out << std::endl << std::endl;
1588 std::multimap<size_t, const SockAddr*> q_map;
1589 for (const auto& ip : store_quota)
1590 if (ip.second.size())
1591 q_map.emplace(ip.second.size(), &ip.first);
1592 for (auto ip = q_map.rbegin(); ip != q_map.rend(); ++ip)
1593 out << "IP " << ip->second->toString() << " uses " << ip->first << " bytes" << std::endl;
1594 out << std::endl;
1595 out << "Total " << store.size() << " storages, " << total_values << " values (";
1596 if (total_store_size < 1024)
1597 out << total_store_size << " bytes)";
1598 else
1599 out << (total_store_size/1024) << " / " << (max_store_size/1024) << " KB)";
1600 out << std::endl;
1601 return out.str();
1602 }
1603
1604 std::string
getStorageLog(const InfoHash & h) const1605 Dht::getStorageLog(const InfoHash& h) const
1606 {
1607 auto s = store.find(h);
1608 if (s == store.end()) {
1609 std::stringstream out;
1610 out << "Storage " << h << " empty" << std::endl;
1611 return out.str();
1612 }
1613 return printStorageLog(*s);
1614 }
1615
1616 std::string
printStorageLog(const decltype(store)::value_type & s) const1617 Dht::printStorageLog(const decltype(store)::value_type& s) const
1618 {
1619 std::stringstream out;
1620 using namespace std::chrono;
1621 const auto& st = s.second;
1622 out << "Storage " << s.first << " "
1623 << st.listeners.size() << " list., "
1624 << st.valueCount() << " values ("
1625 << st.totalSize() << " bytes)" << std::endl;
1626 if (not st.local_listeners.empty())
1627 out << " " << st.local_listeners.size() << " local listeners" << std::endl;
1628 for (const auto& node_listeners : st.listeners) {
1629 const auto& node = node_listeners.first;
1630 out << " " << "Listener " << node->toString() << " : " << node_listeners.second.size() << " entries" << std::endl;
1631 }
1632 return out.str();
1633 }
1634
1635 std::string
getRoutingTablesLog(sa_family_t af) const1636 Dht::getRoutingTablesLog(sa_family_t af) const
1637 {
1638 std::stringstream out;
1639 for (const auto& b : buckets(af))
1640 dumpBucket(b, out);
1641 return out.str();
1642 }
1643
1644 std::string
getSearchesLog(sa_family_t af) const1645 Dht::getSearchesLog(sa_family_t af) const
1646 {
1647 std::stringstream out;
1648 auto num_searches = searches4.size() + searches6.size();
1649 if (num_searches > 8) {
1650 if (not af or af == AF_INET)
1651 for (const auto& sr : searches4)
1652 out << "[search " << sr.first << " IPv4]" << std::endl;
1653 if (not af or af == AF_INET6)
1654 for (const auto& sr : searches6)
1655 out << "[search " << sr.first << " IPv6]" << std::endl;
1656 } else {
1657 out << "s:synched, u:updated, a:announced, c:candidate, f:cur req, x:expired, *:known" << std::endl;
1658 if (not af or af == AF_INET)
1659 for (const auto& sr : searches4)
1660 dumpSearch(*sr.second, out);
1661 if (not af or af == AF_INET6)
1662 for (const auto& sr : searches6)
1663 dumpSearch(*sr.second, out);
1664 }
1665 out << "Total: " << num_searches << " searches (" << searches4.size() << " IPv4, " << searches6.size() << " IPv6)." << std::endl;
1666 return out.str();
1667 }
1668
1669 std::string
getSearchLog(const InfoHash & id,sa_family_t af) const1670 Dht::getSearchLog(const InfoHash& id, sa_family_t af) const
1671 {
1672 std::stringstream out;
1673 if (af == AF_UNSPEC) {
1674 out << getSearchLog(id, AF_INET) << getSearchLog(id, AF_INET6);
1675 } else {
1676 auto& srs = searches(af);
1677 auto sr = srs.find(id);
1678 if (sr != srs.end())
1679 dumpSearch(*sr->second, out);
1680 }
1681 return out.str();
1682 }
1683
~Dht()1684 Dht::~Dht()
1685 {
1686 for (auto& s : searches4)
1687 s.second->clear();
1688 for (auto& s : searches6)
1689 s.second->clear();
1690 }
1691
Dht()1692 Dht::Dht() : store(), network_engine(DHT_LOG, scheduler, {}) {}
1693
Dht(std::unique_ptr<net::DatagramSocket> && sock,const Config & config,const Logger & l)1694 Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l)
1695 : DhtInterface(l), myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(),
1696 network_engine(myid, config.network, std::move(sock), DHT_LOG, scheduler,
1697 std::bind(&Dht::onError, this, _1, _2),
1698 std::bind(&Dht::onNewNode, this, _1, _2),
1699 std::bind(&Dht::onReportedAddr, this, _1, _2),
1700 std::bind(&Dht::onPing, this, _1),
1701 std::bind(&Dht::onFindNode, this, _1, _2, _3),
1702 std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
1703 std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5),
1704 std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5),
1705 std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)),
1706 persistPath(config.persist_path),
1707 is_bootstrap(config.is_bootstrap),
1708 maintain_storage(config.maintain_storage)
1709 {
1710 scheduler.syncTime();
1711 auto s = network_engine.getSocket();
1712 if (not s or (not s->hasIPv4() and not s->hasIPv6()))
1713 throw DhtException("Opened socket required");
1714 if (s->hasIPv4()) {
1715 buckets4 = {Bucket {AF_INET}};
1716 buckets4.is_client = config.is_bootstrap;
1717 }
1718 if (s->hasIPv6()) {
1719 buckets6 = {Bucket {AF_INET6}};
1720 buckets6.is_client = config.is_bootstrap;
1721 }
1722
1723 search_id = std::uniform_int_distribution<decltype(search_id)>{}(rd);
1724
1725 uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)};
1726 nextNodesConfirmation = scheduler.add(scheduler.time() + time_dis(rd), std::bind(&Dht::confirmNodes, this));
1727
1728 // Fill old secret
1729 {
1730 crypto::random_device rdev;
1731 secret = std::uniform_int_distribution<uint64_t>{}(rdev);
1732 }
1733 rotateSecrets();
1734
1735 expire();
1736
1737 DHT_LOG.d("DHT node initialised with ID %s", myid.toString().c_str());
1738
1739 if (not persistPath.empty())
1740 loadState(persistPath);
1741 }
1742
1743 bool
neighbourhoodMaintenance(RoutingTable & list)1744 Dht::neighbourhoodMaintenance(RoutingTable& list)
1745 {
1746 //DHT_LOG_DBG("neighbourhoodMaintenance");
1747 auto b = list.findBucket(myid);
1748 if (b == list.end())
1749 return false;
1750
1751 InfoHash id = myid;
1752 #ifdef _WIN32
1753 std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint8_t>::max() };
1754 #else
1755 std::uniform_int_distribution<uint8_t> rand_byte;
1756 #endif
1757 id[HASH_LEN-1] = rand_byte(rd);
1758
1759 std::bernoulli_distribution rand_trial(1./8.);
1760 auto q = b;
1761 if (std::next(q) != list.end() && (q->nodes.empty() || rand_trial(rd)))
1762 q = std::next(q);
1763 if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
1764 auto r = std::prev(b);
1765 if (!r->nodes.empty())
1766 q = r;
1767 }
1768
1769 auto n = q->randomNode();
1770 if (n) {
1771 DHT_LOG.d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance",
1772 n->toString().c_str(), id.toString().c_str());
1773 /* Since our node-id is the same in both DHTs, it's probably
1774 profitable to query both families. */
1775 network_engine.sendFindNode(n, id, network_engine.want());
1776 }
1777
1778 return true;
1779 }
1780
1781 bool
bucketMaintenance(RoutingTable & list)1782 Dht::bucketMaintenance(RoutingTable& list)
1783 {
1784 std::bernoulli_distribution rand_trial(1./8.);
1785 std::bernoulli_distribution rand_trial_38(1./38.);
1786
1787 bool sent {false};
1788 for (auto b = list.begin(); b != list.end(); ++b) {
1789 if (b->time < scheduler.time() - std::chrono::minutes(10) || b->nodes.empty()) {
1790 /* This bucket hasn't seen any positive confirmation for a long
1791 time. Pick a random id in this bucket's range, and send a request
1792 to a random node. */
1793 InfoHash id = list.randomId(b);
1794 auto q = b;
1795 /* If the bucket is empty, we try to fill it from a neighbour.
1796 We also sometimes do it gratuitiously to recover from
1797 buckets full of broken nodes. */
1798 if (std::next(b) != list.end() && (q->nodes.empty() || rand_trial(rd)))
1799 q = std::next(b);
1800 if (b != list.begin() && (q->nodes.empty() || rand_trial(rd))) {
1801 auto r = std::prev(b);
1802 if (!r->nodes.empty())
1803 q = r;
1804 }
1805
1806 auto n = q->randomNode();
1807 if (n and not n->isPendingMessage()) {
1808 want_t want = -1;
1809
1810 if (network_engine.want() != want) {
1811 auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET);
1812 if (otherbucket && otherbucket->nodes.size() < TARGET_NODES)
1813 /* The corresponding bucket in the other family
1814 is emptyish -- querying both is useful. */
1815 want = WANT4 | WANT6;
1816 else if (rand_trial_38(rd))
1817 /* Most of the time, this just adds overhead.
1818 However, it might help stitch back one of
1819 the DHTs after a network collapse, so query
1820 both, but only very occasionally. */
1821 want = WANT4 | WANT6;
1822 }
1823
1824 DHT_LOG.d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str());
1825 //auto start = scheduler.time();
1826 network_engine.sendFindNode(n, id, want, nullptr, [this,n](const net::Request&, bool over) {
1827 if (over) {
1828 const auto& end = scheduler.time();
1829 // using namespace std::chrono;
1830 // DHT_LOG.d(n->id, "[node %s] bucket maintenance op expired after %llu ms", n->toString().c_str(), duration_cast<milliseconds>(end-start).count());
1831 scheduler.edit(nextNodesConfirmation, end + Node::MAX_RESPONSE_TIME);
1832 }
1833 });
1834 sent = true;
1835 }
1836 }
1837 }
1838 return sent;
1839 }
1840
1841 void
dataPersistence(InfoHash id)1842 Dht::dataPersistence(InfoHash id)
1843 {
1844 const auto& now = scheduler.time();
1845 auto str = store.find(id);
1846 if (str != store.end() and now > str->second.maintenance_time) {
1847 DHT_LOG.d(id, "[storage %s] maintenance (%u values, %u bytes)",
1848 id.toString().c_str(), str->second.valueCount(), str->second.totalSize());
1849 maintainStorage(*str);
1850 str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
1851 scheduler.add(str->second.maintenance_time, std::bind(&Dht::dataPersistence, this, id));
1852 }
1853 }
1854
1855 size_t
maintainStorage(decltype(store)::value_type & storage,bool force,const DoneCallback & donecb)1856 Dht::maintainStorage(decltype(store)::value_type& storage, bool force, const DoneCallback& donecb)
1857 {
1858 const auto& now = scheduler.time();
1859 size_t announce_per_af = 0;
1860
1861 bool want4 = true, want6 = true;
1862
1863 auto nodes = buckets4.findClosestNodes(storage.first, now);
1864 if (!nodes.empty()) {
1865 if (force || storage.first.xorCmp(nodes.back()->id, myid) < 0) {
1866 for (auto &value : storage.second.getValues()) {
1867 const auto& vt = getType(value.data->type);
1868 if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
1869 // gotta put that value there
1870 announce(storage.first, AF_INET, value.data, donecb, value.created);
1871 ++announce_per_af;
1872 }
1873 }
1874 want4 = false;
1875 }
1876 }
1877
1878 auto nodes6 = buckets6.findClosestNodes(storage.first, now);
1879 if (!nodes6.empty()) {
1880 if (force || storage.first.xorCmp(nodes6.back()->id, myid) < 0) {
1881 for (auto &value : storage.second.getValues()) {
1882 const auto& vt = getType(value.data->type);
1883 if (force || value.created + vt.expiration > now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME) {
1884 // gotta put that value there
1885 announce(storage.first, AF_INET6, value.data, donecb, value.created);
1886 ++announce_per_af;
1887 }
1888 }
1889 want6 = false;
1890 }
1891 }
1892
1893 if (not want4 and not want6) {
1894 DHT_LOG.d(storage.first, "Discarding storage values %s", storage.first.toString().c_str());
1895 auto diff = storage.second.clear();
1896 total_store_size += diff.size_diff;
1897 total_values += diff.values_diff;
1898 }
1899
1900 return announce_per_af;
1901 }
1902
1903 time_point
periodic(const uint8_t * buf,size_t buflen,SockAddr from)1904 Dht::periodic(const uint8_t *buf, size_t buflen, SockAddr from)
1905 {
1906 scheduler.syncTime();
1907 if (buflen) {
1908 try {
1909 network_engine.processMessage(buf, buflen, std::move(from));
1910 } catch (const std::exception& e) {
1911 DHT_LOG.e("Can't process message: %s", e.what());
1912 }
1913 }
1914 return scheduler.run();
1915 }
1916
1917 void
expire()1918 Dht::expire()
1919 {
1920 uniform_duration_distribution<> time_dis(std::chrono::minutes(2), std::chrono::minutes(6));
1921 auto expire_stuff_time = scheduler.time() + duration(time_dis(rd));
1922
1923 expireBuckets(buckets4);
1924 expireBuckets(buckets6);
1925 expireStore();
1926 expireSearches();
1927 scheduler.add(expire_stuff_time, std::bind(&Dht::expire, this));
1928 }
1929
1930 void
confirmNodes()1931 Dht::confirmNodes()
1932 {
1933 using namespace std::chrono;
1934 bool soon = false;
1935 const auto& now = scheduler.time();
1936
1937 if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) {
1938 DHT_LOG.d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str());
1939 search(myid, AF_INET);
1940 }
1941 if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) {
1942 DHT_LOG.d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str());
1943 search(myid, AF_INET6);
1944 }
1945
1946 soon |= bucketMaintenance(buckets4);
1947 soon |= bucketMaintenance(buckets6);
1948
1949 if (!soon) {
1950 if (buckets4.grow_time >= now - seconds(150))
1951 soon |= neighbourhoodMaintenance(buckets4);
1952 if (buckets6.grow_time >= now - seconds(150))
1953 soon |= neighbourhoodMaintenance(buckets6);
1954 }
1955
1956 /* In order to maintain all buckets' age within 600 seconds, worst
1957 case is roughly 27 seconds, assuming the table is 22 bits deep.
1958 We want to keep a margin for neighborhood maintenance, so keep
1959 this within 25 seconds. */
1960 auto time_dis = soon
1961 ? uniform_duration_distribution<> {seconds(5) , seconds(25)}
1962 : uniform_duration_distribution<> {seconds(60), seconds(180)};
1963 auto confirm_nodes_time = now + time_dis(rd);
1964
1965 scheduler.edit(nextNodesConfirmation, confirm_nodes_time);
1966 }
1967
1968 std::vector<ValuesExport>
exportValues() const1969 Dht::exportValues() const
1970 {
1971 std::vector<ValuesExport> e {};
1972 e.reserve(store.size());
1973 for (const auto& h : store) {
1974 ValuesExport ve;
1975 ve.first = h.first;
1976
1977 msgpack::sbuffer buffer;
1978 msgpack::packer<msgpack::sbuffer> pk(&buffer);
1979 const auto& vals = h.second.getValues();
1980 pk.pack_array(vals.size());
1981 for (const auto& v : vals) {
1982 pk.pack_array(2);
1983 pk.pack(v.created.time_since_epoch().count());
1984 v.data->msgpack_pack(pk);
1985 }
1986 ve.second = {buffer.data(), buffer.data()+buffer.size()};
1987 e.push_back(std::move(ve));
1988 }
1989 return e;
1990 }
1991
1992 void
importValues(const std::vector<ValuesExport> & import)1993 Dht::importValues(const std::vector<ValuesExport>& import)
1994 {
1995 const auto& now = scheduler.time();
1996
1997 for (const auto& node : import) {
1998 if (node.second.empty())
1999 continue;
2000
2001 try {
2002 msgpack::unpacked msg;
2003 msgpack::unpack(msg, (const char*)node.second.data(), node.second.size());
2004 auto valarr = msg.get();
2005 if (valarr.type != msgpack::type::ARRAY)
2006 throw msgpack::type_error();
2007 for (unsigned i = 0; i < valarr.via.array.size; i++) {
2008 auto& valel = valarr.via.array.ptr[i];
2009 if (valel.type != msgpack::type::ARRAY or valel.via.array.size < 2)
2010 throw msgpack::type_error();
2011 time_point val_time;
2012 Value tmp_val;
2013 try {
2014 val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}};
2015 tmp_val.msgpack_unpack(valel.via.array.ptr[1]);
2016 } catch (const std::exception&) {
2017 DHT_LOG.e(node.first, "Error reading value at %s", node.first.toString().c_str());
2018 continue;
2019 }
2020 val_time = std::min(val_time, now);
2021 storageStore(node.first, std::make_shared<Value>(std::move(tmp_val)), val_time);
2022 }
2023 } catch (const std::exception&) {
2024 DHT_LOG.e(node.first, "Error reading values at %s", node.first.toString().c_str());
2025 continue;
2026 }
2027 }
2028 }
2029
2030
2031 std::vector<NodeExport>
exportNodes() const2032 Dht::exportNodes() const
2033 {
2034 const auto& now = scheduler.time();
2035 std::vector<NodeExport> nodes;
2036 const auto b4 = buckets4.findBucket(myid);
2037 if (b4 != buckets4.end()) {
2038 for (auto& n : b4->nodes)
2039 if (n->isGood(now))
2040 nodes.push_back(n->exportNode());
2041 }
2042 const auto b6 = buckets6.findBucket(myid);
2043 if (b6 != buckets6.end()) {
2044 for (auto& n : b6->nodes)
2045 if (n->isGood(now))
2046 nodes.push_back(n->exportNode());
2047 }
2048 for (auto b = buckets4.begin(); b != buckets4.end(); ++b) {
2049 if (b == b4) continue;
2050 for (auto& n : b->nodes)
2051 if (n->isGood(now))
2052 nodes.push_back(n->exportNode());
2053 }
2054 for (auto b = buckets6.begin(); b != buckets6.end(); ++b) {
2055 if (b == b6) continue;
2056 for (auto& n : b->nodes)
2057 if (n->isGood(now))
2058 nodes.push_back(n->exportNode());
2059 }
2060 return nodes;
2061 }
2062
2063 void
insertNode(const InfoHash & id,const SockAddr & addr)2064 Dht::insertNode(const InfoHash& id, const SockAddr& addr)
2065 {
2066 if (addr.getFamily() != AF_INET && addr.getFamily() != AF_INET6)
2067 return;
2068 scheduler.syncTime();
2069 network_engine.insertNode(id, addr);
2070 }
2071
2072 void
pingNode(SockAddr sa,DoneCallbackSimple && cb)2073 Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb)
2074 {
2075 scheduler.syncTime();
2076 DHT_LOG.d("Sending ping to %s", sa.toString().c_str());
2077 auto& count = sa.getFamily() == AF_INET ? pending_pings4 : pending_pings6;
2078 count++;
2079 network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) {
2080 count--;
2081 if (cb)
2082 cb(true);
2083 }, [&count,cb](const net::Request&, bool last){
2084 if (last) {
2085 count--;
2086 if (cb)
2087 cb(false);
2088 }
2089 });
2090 }
2091
2092 void
onError(Sp<net::Request> req,net::DhtProtocolException e)2093 Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) {
2094 const auto& node = req->node;
2095 if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) {
2096 DHT_LOG.e(node->id, "[node %s] token flush", node->toString().c_str());
2097 node->authError();
2098 node->cancelRequest(req);
2099 for (auto& srp : searches(node->getFamily())) {
2100 auto& sr = srp.second;
2101 for (auto& n : sr->nodes) {
2102 if (n.node != node) continue;
2103 n.token.clear();
2104 n.last_get_reply = time_point::min();
2105 searchSendGetValues(sr);
2106 scheduler.edit(sr->nextSearchStep, scheduler.time());
2107 break;
2108 }
2109 }
2110 } else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) {
2111 DHT_LOG.e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str());
2112 node->cancelRequest(req);
2113 }
2114 }
2115
2116 void
onReportedAddr(const InfoHash &,const SockAddr & addr)2117 Dht::onReportedAddr(const InfoHash& /*id*/, const SockAddr& addr)
2118 {
2119 if (addr)
2120 reportedAddr(addr);
2121 }
2122
2123 net::RequestAnswer
onPing(Sp<Node>)2124 Dht::onPing(Sp<Node>)
2125 {
2126 return {};
2127 }
2128
2129 net::RequestAnswer
onFindNode(Sp<Node> node,const InfoHash & target,want_t want)2130 Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want)
2131 {
2132 const auto& now = scheduler.time();
2133 net::RequestAnswer answer;
2134 answer.ntoken = makeToken(node->getAddr(), false);
2135 if (want & WANT4)
2136 answer.nodes4 = buckets4.findClosestNodes(target, now, TARGET_NODES);
2137 if (want & WANT6)
2138 answer.nodes6 = buckets6.findClosestNodes(target, now, TARGET_NODES);
2139 return answer;
2140 }
2141
2142 net::RequestAnswer
onGetValues(Sp<Node> node,const InfoHash & hash,want_t,const Query & query)2143 Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query)
2144 {
2145 if (not hash) {
2146 DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str());
2147 throw net::DhtProtocolException {
2148 net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2149 net::DhtProtocolException::GET_NO_INFOHASH
2150 };
2151 }
2152 const auto& now = scheduler.time();
2153 net::RequestAnswer answer {};
2154 auto st = store.find(hash);
2155 answer.ntoken = makeToken(node->getAddr(), false);
2156 answer.nodes4 = buckets4.findClosestNodes(hash, now, TARGET_NODES);
2157 answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES);
2158 if (st != store.end() && not st->second.empty()) {
2159 answer.values = st->second.get(query.where.getFilter());
2160 DHT_LOG.d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size());
2161 }
2162 return answer;
2163 }
2164
onGetValuesDone(const Sp<Node> & node,net::RequestAnswer & a,Sp<Search> & sr,const Sp<Query> & orig_query)2165 void Dht::onGetValuesDone(const Sp<Node>& node,
2166 net::RequestAnswer& a,
2167 Sp<Search>& sr,
2168 const Sp<Query>& orig_query)
2169 {
2170 if (not sr) {
2171 DHT_LOG.w("[search unknown] got reply to 'get'. Ignoring.");
2172 return;
2173 }
2174
2175 /*DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes",
2176 sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/
2177
2178 if (not a.ntoken.empty()) {
2179 if (not a.values.empty() or not a.fields.empty()) {
2180 DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] found %u values",
2181 sr->id.toString().c_str(), node->toString().c_str(), a.values.size());
2182 for (auto& getp : sr->callbacks) { /* call all callbacks for this search */
2183 auto& get = getp.second;
2184 if (not (get.get_cb or get.query_cb) or
2185 (orig_query and get.query and not get.query->isSatisfiedBy(*orig_query)))
2186 continue;
2187
2188 if (get.query_cb) { /* in case of a request with query */
2189 if (not a.fields.empty()) {
2190 get.query_cb(a.fields);
2191 } else if (not a.values.empty()) {
2192 std::vector<Sp<FieldValueIndex>> fields;
2193 fields.reserve(a.values.size());
2194 for (const auto& v : a.values)
2195 fields.emplace_back(std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {}));
2196 get.query_cb(fields);
2197 }
2198 } else if (get.get_cb) { /* in case of a vanilla get request */
2199 std::vector<Sp<Value>> tmp;
2200 for (const auto& v : a.values)
2201 if (not get.filter or get.filter(*v))
2202 tmp.emplace_back(v);
2203 if (not tmp.empty())
2204 get.get_cb(tmp);
2205 }
2206 }
2207
2208 /* callbacks for local search listeners */
2209 /*std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> tmp_lists;
2210 for (auto& l : sr->listeners) {
2211 if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query)))
2212 continue;
2213 std::vector<Sp<Value>> tmp;
2214 for (const auto& v : a.values)
2215 if (not l.second.filter or l.second.filter(*v))
2216 tmp.emplace_back(v);
2217 if (not tmp.empty())
2218 tmp_lists.emplace_back(l.second.get_cb, std::move(tmp));
2219 }
2220 for (auto& l : tmp_lists)
2221 l.first(l.second, false);*/
2222 } else if (not a.expired_values.empty()) {
2223 DHT_LOG.w(sr->id, node->id, "[search %s] [node %s] %u expired values",
2224 sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size());
2225 }
2226 } else {
2227 DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str());
2228 network_engine.blacklistNode(node);
2229 }
2230
2231 if (not sr->done) {
2232 searchSendGetValues(sr);
2233
2234 // Force to recompute the next step time
2235 scheduler.edit(sr->nextSearchStep, scheduler.time());
2236 }
2237 }
2238
2239 net::RequestAnswer
onListen(Sp<Node> node,const InfoHash & hash,const Blob & token,size_t socket_id,const Query & query)2240 Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query)
2241 {
2242 if (not hash) {
2243 DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str());
2244 throw net::DhtProtocolException {
2245 net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2246 net::DhtProtocolException::LISTEN_NO_INFOHASH
2247 };
2248 }
2249 if (not tokenMatch(token, node->getAddr())) {
2250 DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str());
2251 throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN};
2252 }
2253 Query q = query;
2254 storageAddListener(hash, node, socket_id, std::move(q));
2255 return {};
2256 }
2257
2258 void
onListenDone(const Sp<Node> &,net::RequestAnswer &,Sp<Search> & sr)2259 Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr)
2260 {
2261 // DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation",
2262 // sr->id.toString().c_str(), node->toString().c_str(), answer.values.size());
2263
2264 if (not sr->done) {
2265 const auto& now = scheduler.time();
2266 searchSendGetValues(sr);
2267 scheduler.edit(sr->nextSearchStep, now);
2268 }
2269 }
2270
2271 net::RequestAnswer
onAnnounce(Sp<Node> n,const InfoHash & hash,const Blob & token,const std::vector<Sp<Value>> & values,const time_point & creation_date)2272 Dht::onAnnounce(Sp<Node> n,
2273 const InfoHash& hash,
2274 const Blob& token,
2275 const std::vector<Sp<Value>>& values,
2276 const time_point& creation_date)
2277 {
2278 auto& node = *n;
2279 if (not hash) {
2280 DHT_LOG.w(node.id, "put with no info_hash");
2281 throw net::DhtProtocolException {
2282 net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2283 net::DhtProtocolException::PUT_NO_INFOHASH
2284 };
2285 }
2286 if (!tokenMatch(token, node.getAddr())) {
2287 DHT_LOG.w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str());
2288 throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN};
2289 }
2290 {
2291 // We store a value only if we think we're part of the
2292 // SEARCH_NODES nodes around the target id.
2293 auto closest_nodes = buckets(node.getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES);
2294 if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) {
2295 DHT_LOG.w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str());
2296 return {};
2297 }
2298 }
2299
2300 auto created = std::min(creation_date, scheduler.time());
2301 for (const auto& v : values) {
2302 if (v->id == Value::INVALID_ID) {
2303 DHT_LOG.w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str());
2304 throw net::DhtProtocolException {
2305 net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION,
2306 net::DhtProtocolException::PUT_INVALID_ID
2307 };
2308 }
2309 auto lv = getLocalById(hash, v->id);
2310 Sp<Value> vc = v;
2311 if (lv) {
2312 if (*lv == *vc) {
2313 storageRefresh(hash, v->id);
2314 DHT_LOG.d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str());
2315 } else {
2316 const auto& type = getType(lv->type);
2317 if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) {
2318 DHT_LOG.d(hash, node.id, "[store %s] editing %s",
2319 hash.toString().c_str(), vc->toString().c_str());
2320 storageStore(hash, vc, created, node.getAddr());
2321 } else {
2322 DHT_LOG.d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy",
2323 hash.toString().c_str(), vc->toString().c_str());
2324 }
2325 }
2326 } else {
2327 // Allow the value to be edited by the storage policy
2328 const auto& type = getType(vc->type);
2329 if (type.storePolicy(hash, vc, node.id, node.getAddr())) {
2330 //DHT_LOG.d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str());
2331 storageStore(hash, vc, created, node.getAddr());
2332 } else {
2333 DHT_LOG.d(hash, node.id, "[store %s] rejecting storage of %s",
2334 hash.toString().c_str(), vc->toString().c_str());
2335 }
2336 }
2337 }
2338 return {};
2339 }
2340
2341 net::RequestAnswer
onRefresh(Sp<Node> node,const InfoHash & hash,const Blob & token,const Value::Id & vid)2342 Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid)
2343 {
2344 using namespace net;
2345
2346 if (not tokenMatch(token, node->getAddr())) {
2347 DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str());
2348 throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN};
2349 }
2350 if (storageRefresh(hash, vid)) {
2351 DHT_LOG.d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str());
2352 } else {
2353 DHT_LOG.d(hash, node->id, "[store %s] [node %s] got refresh for unknown value",
2354 hash.toString().c_str(), node->toString().c_str());
2355 throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND};
2356 }
2357 return {};
2358 }
2359
2360 bool
storageRefresh(const InfoHash & id,Value::Id vid)2361 Dht::storageRefresh(const InfoHash& id, Value::Id vid)
2362 {
2363 const auto& now = scheduler.time();
2364 auto s = store.find(id);
2365 if (s != store.end()) {
2366 // Values like for a permanent put can be refreshed. So, inform remote listeners that the value
2367 // need to be refreshed
2368 auto& st = s->second;
2369 if (not st.listeners.empty()) {
2370 DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());
2371 std::vector<Value::Id> ids = {vid};
2372 for (const auto& node_listeners : st.listeners) {
2373 for (const auto& l : node_listeners.second) {
2374 DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh",
2375 id.toString().c_str(),
2376 node_listeners.first->toString().c_str());
2377 Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
2378 network_engine.tellListenerRefreshed(node_listeners.first, l.first, id, ntoken, ids);
2379 }
2380 }
2381 }
2382
2383 auto expiration = s->second.refresh(now, vid, types);
2384 if (expiration != time_point::max())
2385 scheduler.add(expiration, std::bind(&Dht::expireStorage, this, id));
2386 return true;
2387 }
2388 return false;
2389 }
2390
2391 void
onAnnounceDone(const Sp<Node> & node,net::RequestAnswer & answer,Sp<Search> & sr)2392 Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr)
2393 {
2394 DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got reply to put!",
2395 sr->id.toString().c_str(), node->toString().c_str());
2396 searchSendGetValues(sr);
2397 sr->checkAnnounced(answer.vid);
2398 }
2399
2400
2401 void
saveState(const std::string & path) const2402 Dht::saveState(const std::string& path) const
2403 {
2404 std::ofstream file(path);
2405 msgpack::pack(file, exportNodes());
2406 msgpack::pack(file, exportValues());
2407 }
2408
2409 void
loadState(const std::string & path)2410 Dht::loadState(const std::string& path)
2411 {
2412 DHT_LOG.d("Importing state from %s", path.c_str());
2413 try {
2414 // Import nodes from binary file
2415 msgpack::unpacker pac;
2416 {
2417 // Read whole file
2418 std::ifstream file(path, std::ios::binary|std::ios::ate);
2419 if (!file.is_open()) {
2420 return;
2421 }
2422 auto size = file.tellg();
2423 file.seekg (0, std::ios::beg);
2424 pac.reserve_buffer(size);
2425 file.read (pac.buffer(), size);
2426 pac.buffer_consumed(size);
2427 }
2428 // Import nodes
2429 msgpack::object_handle oh;
2430 if (pac.next(oh)) {
2431 {
2432 auto imported_nodes = oh.get().as<std::vector<NodeExport>>();
2433 DHT_LOG.d("Importing %zu nodes", imported_nodes.size());
2434 for (const auto& node : imported_nodes)
2435 insertNode(node);
2436 }
2437 if (pac.next(oh)) {
2438 auto imported_values = oh.get().as<std::vector<ValuesExport>>();
2439 DHT_LOG.d("Importing %zu values", imported_values.size());
2440 importValues(imported_values);
2441 }
2442 }
2443 } catch (const std::exception& e) {
2444 DHT_LOG.w("Error importing state from %s: %s", path.c_str(), e.what());
2445 }
2446 }
2447
2448 }
2449