1 /*
2
3 Copyright (c) 2006-2018, Arvid Norberg
4 All rights reserved.
5
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30
31 */
32
33 #include <vector>
34 #include <iterator> // std::distance(), std::next
35 #include <algorithm> // std::copy, std::remove_copy_if
36 #include <functional>
37 #include <numeric>
38 #include <cstdio> // for snprintf
39 #include <cinttypes> // for PRId64 et.al.
40 #include <cstdint>
41
42 #include "libtorrent/config.hpp"
43
44 #include <libtorrent/hex.hpp> // to_hex
45 #include "libtorrent/kademlia/routing_table.hpp"
46 #include "libtorrent/session_status.hpp"
47 #include "libtorrent/kademlia/node_id.hpp"
48 #include "libtorrent/kademlia/dht_observer.hpp"
49 #include "libtorrent/kademlia/dht_settings.hpp"
50 #include "libtorrent/aux_/time.hpp"
51 #include "libtorrent/alert_types.hpp" // for dht_routing_bucket
52 #include "libtorrent/socket_io.hpp" // for print_endpoint
53 #include "libtorrent/invariant_check.hpp"
54 #include "libtorrent/address.hpp"
55 #include "libtorrent/aux_/array.hpp"
56
57 using namespace std::placeholders;
58
59 namespace libtorrent { namespace dht {
60
61 namespace {
62
63 template <typename T, typename K>
erase_one(T & container,K const & key)64 void erase_one(T& container, K const& key)
65 {
66 auto const i = container.find(key);
67 TORRENT_ASSERT(i != container.end());
68 container.erase(i);
69 }
70
verify_node_address(dht::settings const & settings,node_id const & id,address const & addr)71 bool verify_node_address(dht::settings const& settings
72 , node_id const& id, address const& addr)
73 {
74 // only when the node_id pass the verification, add it to routing table.
75 return !settings.enforce_node_id || verify_id(id, addr);
76 }
77 }
78
insert(address const & addr)79 void ip_set::insert(address const& addr)
80 {
81 if (addr.is_v6())
82 m_ip6s.insert(addr.to_v6().to_bytes());
83 else
84 m_ip4s.insert(addr.to_v4().to_bytes());
85 }
86
exists(address const & addr) const87 bool ip_set::exists(address const& addr) const
88 {
89 if (addr.is_v6())
90 return m_ip6s.find(addr.to_v6().to_bytes()) != m_ip6s.end();
91 else
92 return m_ip4s.find(addr.to_v4().to_bytes()) != m_ip4s.end();
93 }
94
erase(address const & addr)95 void ip_set::erase(address const& addr)
96 {
97 if (addr.is_v6())
98 erase_one(m_ip6s, addr.to_v6().to_bytes());
99 else
100 erase_one(m_ip4s, addr.to_v4().to_bytes());
101 }
102
mostly_verified_nodes(bucket_t const & b)103 bool mostly_verified_nodes(bucket_t const& b)
104 {
105 int const num_verified = static_cast<int>(std::count_if(b.begin(), b.end()
106 , [](node_entry const& e) { return e.verified; }));
107 if (num_verified == 0 && b.size() > 0) return false;
108 return num_verified >= static_cast<int>(b.size()) * 2 / 3;
109 }
110
classify_prefix(int const bucket_idx,bool const last_bucket,int const bucket_size,node_id nid)111 std::uint8_t classify_prefix(int const bucket_idx, bool const last_bucket
112 , int const bucket_size, node_id nid)
113 {
114 TORRENT_ASSERT_VAL(bucket_size > 0, bucket_size);
115 TORRENT_ASSERT_VAL(bucket_size <= 256, bucket_size);
116
117 std::uint32_t mask = static_cast<std::uint32_t>(bucket_size) - 1;
118 // bucket sizes must be even powers of two.
119 TORRENT_ASSERT_VAL((mask & static_cast<std::uint32_t>(bucket_size)) == 0, bucket_size);
120
121 // this is a bit weird. count_leading_zeros treats the span we pass to it as
122 // an array of chars, but we pass a span of uint32_t as an optimization, to
123 // allow it to operate on 32 bits at a time. In this case, the value fits in
124 // a single byte, so we could get away with just passing in the least
125 // significant byte of `mask`, but we can't with the current API of
126 // count_leading_zeros().
127 int const mask_shift = aux::count_leading_zeros(aux::little_endian_to_host(mask));
128 TORRENT_ASSERT_VAL(mask_shift >= 0, mask_shift);
129 TORRENT_ASSERT_VAL(mask_shift < 8, mask_shift);
130 mask <<= mask_shift;
131 TORRENT_ASSERT_VAL(mask > 0, mask);
132 TORRENT_ASSERT_VAL(bool((mask & 0x80) != 0), mask);
133
134 // the reason to shift one bit extra (except for the last bucket) is that the
135 // first bit *defines* the bucket. That bit will be the same for all entries.
136 // We're not interested in that one. However, the last bucket hasn't split
137 // yet, so it will contain entries from both "sides", so we need to include
138 // the top bit.
139 nid <<= bucket_idx + int(!last_bucket);
140 std::uint8_t const ret = (nid[0] & mask) >> mask_shift;
141 TORRENT_ASSERT_VAL(ret < bucket_size, ret);
142 return ret;
143 }
144
replace_node_impl(node_entry const & e,bucket_t & b,ip_set & ips,int const bucket_index,int const bucket_size_limit,bool const last_bucket,dht_logger * log)145 routing_table::add_node_status_t replace_node_impl(node_entry const& e
146 , bucket_t& b, ip_set& ips, int const bucket_index
147 , int const bucket_size_limit, bool const last_bucket
148 #ifndef TORRENT_DISABLE_LOGGING
149 , dht_logger* log
150 #endif
151 )
152 {
153 // if the bucket isn't full, we're not replacing anything, and this function
154 // should not have been called
155 TORRENT_ASSERT(int(b.size()) >= bucket_size_limit);
156
157 bucket_t::iterator j = std::max_element(b.begin(), b.end()
158 , [](node_entry const& lhs, node_entry const& rhs)
159 { return lhs.fail_count() < rhs.fail_count(); });
160 TORRENT_ASSERT(j != b.end());
161
162 if (j->fail_count() > 0)
163 {
164 // i points to a node that has been marked
165 // as stale. Replace it with this new one
166 ips.erase(j->addr());
167 *j = e;
168 ips.insert(e.addr());
169 return routing_table::node_added;
170 }
171
172 // then we look for nodes with the same 3 bit prefix (or however
173 // many bits prefix the bucket size warrants). If there is no other
174 // node with this prefix, remove the duplicate with the highest RTT.
175 // as the last replacement strategy, if the node we found matching our
176 // bit prefix has higher RTT than the new node, replace it.
177
178 // in order to provide as few lookups as possible before finding
179 // the data someone is looking for, make sure there is an affinity
180 // towards having a good spread of node IDs in each bucket
181 std::uint8_t const to_add_prefix = classify_prefix(bucket_index
182 , last_bucket, bucket_size_limit, e.id);
183
184 // nodes organized by their prefix
185 aux::array<std::vector<bucket_t::iterator>, 128> nodes_storage;
186 auto const nodes = span<std::vector<bucket_t::iterator>>{nodes_storage}.first(bucket_size_limit);
187
188 for (j = b.begin(); j != b.end(); ++j)
189 {
190 std::uint8_t const prefix = classify_prefix(
191 bucket_index, last_bucket, bucket_size_limit, j->id);
192 TORRENT_ASSERT(prefix < nodes.size());
193 nodes[prefix].push_back(j);
194 }
195
196 if (!nodes[to_add_prefix].empty())
197 {
198 j = *std::max_element(nodes[to_add_prefix].begin(), nodes[to_add_prefix].end()
199 , [](bucket_t::iterator lhs, bucket_t::iterator rhs)
200 { return *lhs < *rhs; });
201
202 // only if e is better than the worst node in this prefix slot do we
203 // replace it. resetting j means we're not replacing it
204 if (!(e < *j)) j = b.end();
205 }
206 else
207 {
208 // there is no node in this prefix slot. We definitely want to add it.
209 // Now we just need to figure out which one to replace
210 std::vector<bucket_t::iterator> replace_candidates;
211 for (auto const& n : nodes)
212 {
213 if (n.size() > 1) replace_candidates.insert(replace_candidates.end(), n.begin(), n.end());
214 }
215
216 // since the bucket is full, and there's no node in the prefix-slot
217 // we're about to add to, there must be at least one prefix slot that
218 // has more than one node.
219 TORRENT_ASSERT(!replace_candidates.empty());
220
221 // from these nodes, pick the "worst" one and replace it
222 j = *std::max_element(replace_candidates.begin(), replace_candidates.end()
223 , [](bucket_t::iterator lhs, bucket_t::iterator rhs)
224 { return *lhs < *rhs; });
225 }
226
227 if (j != b.end())
228 {
229 #ifndef TORRENT_DISABLE_LOGGING
230 if (log != nullptr && log->should_log(dht_logger::routing_table))
231 {
232 log->log(dht_logger::routing_table, "replacing node with better one: %s %s [%s %dms %d] vs. [%s %dms %d]"
233 , aux::to_hex(e.id).c_str(), print_address(e.addr()).c_str()
234 , e.verified ? "verified" : "not-verified", e.rtt
235 , classify_prefix(bucket_index, last_bucket, bucket_size_limit, e.id)
236 , j->verified ? "verified" : "not-verified", j->rtt
237 , classify_prefix(bucket_index, last_bucket, bucket_size_limit, j->id)
238 );
239 }
240 #endif
241 ips.erase(j->addr());
242 *j = e;
243 ips.insert(e.addr());
244 return routing_table::node_added;
245 }
246 return routing_table::need_bucket_split;
247 }
248
routing_table(node_id const & id,udp const proto,int const bucket_size,dht::settings const & settings,dht_logger * log)249 routing_table::routing_table(node_id const& id, udp const proto, int const bucket_size
250 , dht::settings const& settings
251 , dht_logger* log)
252 :
253 #ifndef TORRENT_DISABLE_LOGGING
254 m_log(log),
255 #endif
256 m_settings(settings)
257 , m_id(id)
258 , m_protocol(proto)
259 , m_depth(0)
260 , m_last_self_refresh(min_time())
261 , m_bucket_size(bucket_size)
262 {
263 // bucket sizes must be a power of 2
264 TORRENT_ASSERT_VAL(((bucket_size - 1) & bucket_size) == 0, bucket_size);
265 TORRENT_UNUSED(log);
266 m_buckets.reserve(30);
267 }
268
bucket_limit(int bucket) const269 int routing_table::bucket_limit(int bucket) const
270 {
271 if (!m_settings.extended_routing_table) return m_bucket_size;
272
273 static const aux::array<int, 4> size_exceptions{{{16, 8, 4, 2}}};
274 if (bucket < size_exceptions.end_index())
275 return m_bucket_size * size_exceptions[bucket];
276 return m_bucket_size;
277 }
278
status(std::vector<dht_routing_bucket> & s) const279 void routing_table::status(std::vector<dht_routing_bucket>& s) const
280 {
281 // TODO: This is temporary. For now, only report the largest routing table
282 // (of potentially multiple ones, for multi-homed systems)
283 // in next major version, break the ABI and support reporting all of them in
284 // the dht_stats_alert
285 if (s.size() > m_buckets.size()) return;
286 s.clear();
287 for (auto const& i : m_buckets)
288 {
289 dht_routing_bucket b;
290 b.num_nodes = int(i.live_nodes.size());
291 b.num_replacements = int(i.replacements.size());
292 s.push_back(b);
293 }
294 }
295
296 #if TORRENT_ABI_VERSION == 1
297 // TODO: 2 use the non deprecated function instead of this one
status(session_status & s) const298 void routing_table::status(session_status& s) const
299 {
300 int dht_nodes;
301 int dht_node_cache;
302 int ignore;
303 std::tie(dht_nodes, dht_node_cache, ignore) = size();
304 s.dht_nodes += dht_nodes;
305 s.dht_node_cache += dht_node_cache;
306 // TODO: arvidn note
307 // when it's across IPv4 and IPv6, adding (dht_global_nodes) would
308 // make sense. in the future though, where we may have one DHT node
309 // per external interface (which may be multiple of the same address
310 // family), then it becomes a bit trickier
311 s.dht_global_nodes += num_global_nodes();
312
313 for (auto const& i : m_buckets)
314 {
315 dht_routing_bucket b;
316 b.num_nodes = int(i.live_nodes.size());
317 b.num_replacements = int(i.replacements.size());
318 #if TORRENT_ABI_VERSION == 1
319 b.last_active = 0;
320 #endif
321 s.dht_routing_table.push_back(b);
322 }
323 }
324 #endif
325
size() const326 std::tuple<int, int, int> routing_table::size() const
327 {
328 int nodes = 0;
329 int replacements = 0;
330 int confirmed = 0;
331 for (auto const& i : m_buckets)
332 {
333 nodes += int(i.live_nodes.size());
334 confirmed += static_cast<int>(std::count_if(i.live_nodes.begin(), i.live_nodes.end()
335 , [](node_entry const& k) { return k.confirmed(); } ));
336
337 replacements += int(i.replacements.size());
338 }
339 return std::make_tuple(nodes, replacements, confirmed);
340 }
341
num_global_nodes() const342 std::int64_t routing_table::num_global_nodes() const
343 {
344 int deepest_bucket = 0;
345 int deepest_size = 0;
346 for (auto const& i : m_buckets)
347 {
348 deepest_size = i.live_nodes.end_index(); // + i.replacements.size();
349 if (deepest_size < m_bucket_size) break;
350 // this bucket is full
351 ++deepest_bucket;
352 }
353
354 if (deepest_bucket == 0) return 1 + deepest_size;
355
356 if (deepest_size < m_bucket_size / 2) return (std::int64_t(1) << deepest_bucket) * m_bucket_size;
357 else return (std::int64_t(2) << deepest_bucket) * deepest_size;
358 }
359
depth() const360 int routing_table::depth() const
361 {
362 if (m_depth >= int(m_buckets.size()))
363 m_depth = int(m_buckets.size()) - 1;
364
365 if (m_depth < 0) return m_depth;
366
367 // maybe the table is deeper now?
368 while (m_depth < int(m_buckets.size()) - 1
369 && int(m_buckets[m_depth + 1].live_nodes.size()) >= m_bucket_size / 2)
370 {
371 ++m_depth;
372 }
373
374 // maybe the table is more shallow now?
375 while (m_depth > 0
376 && int(m_buckets[m_depth - 1].live_nodes.size()) < m_bucket_size / 2)
377 {
378 --m_depth;
379 }
380
381 return m_depth;
382 }
383
next_refresh()384 node_entry const* routing_table::next_refresh()
385 {
386 // find the node with the least recent 'last_queried' field. if it's too
387 // recent, return false. Otherwise return a random target ID that's close to
388 // a missing prefix for that bucket
389
390 node_entry* candidate = nullptr;
391
392 // this will have a bias towards pinging nodes close to us first.
393 for (auto i = m_buckets.rbegin(), end(m_buckets.rend()); i != end; ++i)
394 {
395 for (auto& n : i->live_nodes)
396 {
397 // this shouldn't happen
398 TORRENT_ASSERT(m_id != n.id);
399 if (n.id == m_id) continue;
400
401 if (n.last_queried == min_time())
402 {
403 candidate = &n;
404 goto out;
405 }
406
407 if (candidate == nullptr || n.last_queried < candidate->last_queried)
408 {
409 candidate = &n;
410 }
411 }
412
413 if (i == m_buckets.rbegin()
414 || int(i->live_nodes.size()) < bucket_limit(int(std::distance(i, end)) - 1))
415 {
416 // this bucket isn't full or it can be split
417 // check for an unpinged replacement
418 // node which may be eligible for the live bucket if confirmed
419 auto r = std::find_if(i->replacements.begin(), i->replacements.end()
420 , [](node_entry const& e) { return !e.pinged() && e.last_queried == min_time(); });
421 if (r != i->replacements.end())
422 {
423 candidate = &*r;
424 goto out;
425 }
426 }
427 }
428 out:
429
430 // make sure we don't pick the same node again next time we want to refresh
431 // the routing table
432 if (candidate)
433 candidate->last_queried = aux::time_now();
434
435 return candidate;
436 }
437
find_bucket(node_id const & id)438 routing_table::table_t::iterator routing_table::find_bucket(node_id const& id)
439 {
440 // TORRENT_ASSERT(id != m_id);
441
442 int num_buckets = int(m_buckets.size());
443 if (num_buckets == 0)
444 {
445 m_buckets.push_back(routing_table_node());
446 ++num_buckets;
447 }
448
449 int bucket_index = std::min(159 - distance_exp(m_id, id), num_buckets - 1);
450 TORRENT_ASSERT(bucket_index < int(m_buckets.size()));
451 TORRENT_ASSERT(bucket_index >= 0);
452
453 auto i = m_buckets.begin();
454 std::advance(i, bucket_index);
455 return i;
456 }
457
458 // returns true if the two IPs are "too close" to each other to be allowed in
459 // the same DHT lookup. If they are, the last one to be found will be ignored
compare_ip_cidr(address const & lhs,address const & rhs)460 bool compare_ip_cidr(address const& lhs, address const& rhs)
461 {
462 TORRENT_ASSERT(lhs.is_v4() == rhs.is_v4());
463
464 if (lhs.is_v6())
465 {
466 // if IPv6 addresses is in the same /64, they're too close and we won't
467 // trust the second one
468 std::uint64_t lhs_ip;
469 std::memcpy(&lhs_ip, lhs.to_v6().to_bytes().data(), 8);
470 std::uint64_t rhs_ip;
471 std::memcpy(&rhs_ip, rhs.to_v6().to_bytes().data(), 8);
472
473 // since the condition we're looking for is all the first bits being
474 // zero, there's no need to byte-swap into host byte order here.
475 std::uint64_t const mask = lhs_ip ^ rhs_ip;
476 return mask == 0;
477 }
478 else
479 {
480 // if IPv4 addresses is in the same /24, they're too close and we won't
481 // trust the second one
482 std::uint32_t const mask
483 = std::uint32_t(lhs.to_v4().to_ulong() ^ rhs.to_v4().to_ulong());
484 return mask <= 0x000000ff;
485 }
486 }
487
488 std::tuple<node_entry*, routing_table::table_t::iterator, bucket_t*>
find_node(udp::endpoint const & ep)489 routing_table::find_node(udp::endpoint const& ep)
490 {
491 for (auto i = m_buckets.begin() , end(m_buckets.end()); i != end; ++i)
492 {
493 for (auto j = i->replacements.begin(); j != i->replacements.end(); ++j)
494 {
495 if (j->addr() != ep.address()) continue;
496 if (j->port() != ep.port()) continue;
497 return std::make_tuple(&*j, i, &i->replacements);
498 }
499 for (auto j = i->live_nodes.begin(); j != i->live_nodes.end(); ++j)
500 {
501 if (j->addr() != ep.address()) continue;
502 if (j->port() != ep.port()) continue;
503 return std::make_tuple(&*j, i, &i->live_nodes);
504 }
505 }
506 return std::tuple<node_entry*, routing_table::table_t::iterator, bucket_t*>(
507 nullptr, m_buckets.end(), nullptr);
508 }
509
510 // TODO: this need to take bucket "prefix" into account. It should be unified
511 // with add_node_impl()
fill_from_replacements(table_t::iterator bucket)512 void routing_table::fill_from_replacements(table_t::iterator bucket)
513 {
514 bucket_t& b = bucket->live_nodes;
515 bucket_t& rb = bucket->replacements;
516 int const bucket_size = bucket_limit(int(std::distance(m_buckets.begin(), bucket)));
517
518 if (int(b.size()) >= bucket_size) return;
519
520 // sort by RTT first, to find the node with the lowest
521 // RTT that is pinged
522 std::sort(rb.begin(), rb.end());
523
524 while (int(b.size()) < bucket_size && !rb.empty())
525 {
526 auto j = std::find_if(rb.begin(), rb.end(), std::bind(&node_entry::pinged, _1));
527 if (j == rb.end()) break;
528 b.push_back(*j);
529 rb.erase(j);
530 }
531 }
532
prune_empty_bucket()533 void routing_table::prune_empty_bucket()
534 {
535 if (m_buckets.back().live_nodes.empty()
536 && m_buckets.back().replacements.empty())
537 {
538 m_buckets.erase(m_buckets.end() - 1);
539 }
540 }
541
remove_node(node_entry * n,bucket_t * b)542 void routing_table::remove_node(node_entry* n, bucket_t* b)
543 {
544 std::ptrdiff_t const idx = n - b->data();
545 TORRENT_ASSERT(idx >= 0);
546 TORRENT_ASSERT(idx < intptr_t(b->size()));
547 TORRENT_ASSERT(m_ips.exists(n->addr()));
548 m_ips.erase(n->addr());
549 b->erase(b->begin() + idx);
550 }
551
add_node(node_entry const & e)552 bool routing_table::add_node(node_entry const& e)
553 {
554 add_node_status_t s = add_node_impl(e);
555 if (s == failed_to_add) return false;
556 if (s == node_added) return true;
557
558 while (s == need_bucket_split)
559 {
560 split_bucket();
561
562 // if this assert triggers a lot in the wild, we should probably
563 // harden our resistance towards this attack. Perhaps by never
564 // splitting a bucket (and discard nodes) if the two buckets above it
565 // are empty or close to empty
566 // TORRENT_ASSERT(m_buckets.size() <= 50);
567 if (m_buckets.size() > 50)
568 {
569 // this is a sanity check. In the wild, we shouldn't see routing
570 // tables deeper than 26 or 27. If we get this deep, there might
571 // be a bug in the bucket splitting logic, or there may be someone
572 // playing a prank on us, spoofing node IDs.
573 s = add_node_impl(e);
574 return s == node_added;
575 }
576
577 // if the new bucket still has too many nodes in it, we need to keep
578 // splitting
579 if (int(m_buckets.back().live_nodes.size()) > bucket_limit(int(m_buckets.size()) - 1))
580 continue;
581
582 s = add_node_impl(e);
583
584 // we just split the last bucket and tried to insert a new node. If none
585 // of the nodes in the split bucket, nor the new node ended up in the new
586 // bucket, erase it
587 if (m_buckets.back().live_nodes.empty())
588 {
589 m_buckets.erase(m_buckets.end() - 1);
590 // we just split, trying to add the node again should not request
591 // another split
592 TORRENT_ASSERT(s != need_bucket_split);
593 }
594 if (s == failed_to_add) return false;
595 if (s == node_added) return true;
596 }
597 return false;
598 }
599
all_in_same_bucket(span<node_entry const> b,node_id const & id,int const bucket_index)600 bool all_in_same_bucket(span<node_entry const> b, node_id const& id, int const bucket_index)
601 {
602 int const byte_offset = bucket_index / 8;
603 int const bit_offset = bucket_index % 8;
604 std::uint8_t const mask = 0x80 >> bit_offset;
605 int counter[2] = {0, 0};
606 int const i = (id[byte_offset] & mask) ? 1 : 0;
607 ++counter[i];
608 for (auto const& e : b)
609 {
610 int const idx = (e.id[byte_offset] & mask) ? 1 : 0;
611 ++counter[idx];
612 }
613 return counter[0] == 0 || counter[1] == 0;
614 }
615
add_node_impl(node_entry e)616 routing_table::add_node_status_t routing_table::add_node_impl(node_entry e)
617 {
618 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
619 // INVARIANT_CHECK;
620 #endif
621
622 // don't add if the address isn't the right type
623 if (!native_endpoint(e.ep()))
624 return failed_to_add;
625
626 // if we already have this (IP,port), don't do anything
627 if (m_router_nodes.find(e.ep()) != m_router_nodes.end())
628 return failed_to_add;
629
630 // do we already have this IP in the table?
631 if (m_ips.exists(e.addr()))
632 {
633 // This exact IP already exists in the table. A node with the same IP and
634 // port but a different ID may be a sign of a malicious node. To be
635 // conservative in this case the node is removed.
636 // pinged means that we have sent a message to the IP, port and received
637 // a response with a correct transaction ID, i.e. it is verified to not
638 // be the result of a poisoned routing table
639
640 node_entry * existing;
641 routing_table::table_t::iterator existing_bucket;
642 bucket_t* bucket;
643 std::tie(existing, existing_bucket, bucket) = find_node(e.ep());
644 if (existing == nullptr)
645 {
646 // the node we're trying to add is not a match with an existing node. we
647 // should ignore it, unless we allow duplicate IPs in our routing
648 // table. There could be a node with the same IP, but with a different
649 // port. m_ips just contain IP addresses, whereas the lookup we just
650 // performed was for full endpoints (address, port).
651 if (m_settings.restrict_routing_ips)
652 {
653 #ifndef TORRENT_DISABLE_LOGGING
654 if (m_log != nullptr && m_log->should_log(dht_logger::routing_table))
655 {
656 m_log->log(dht_logger::routing_table, "ignoring node (duplicate IP): %s %s"
657 , aux::to_hex(e.id).c_str(), print_address(e.addr()).c_str());
658 }
659 #endif
660 return failed_to_add;
661 }
662 }
663 else if (existing->id == e.id)
664 {
665 // if the node ID is the same, just update the failcount
666 // and be done with it.
667 existing->timeout_count = 0;
668 if (e.pinged())
669 {
670 existing->update_rtt(e.rtt);
671 existing->last_queried = e.last_queried;
672 }
673 // if this was a replacement node it may be elligible for
674 // promotion to the live bucket
675 fill_from_replacements(existing_bucket);
676 prune_empty_bucket();
677 return node_added;
678 }
679 else if (existing->id.is_all_zeros())
680 {
681 // this node's ID was unknown. remove the old entry and
682 // replace it with the node's real ID
683 remove_node(existing, bucket);
684 }
685 else if (!e.pinged())
686 {
687 // this may be a routing table poison attack. If we haven't confirmed
688 // that this peer actually exist with this new node ID yet, ignore it.
689 // we definitely don't want to replace the existing entry with this one
690 if (m_settings.restrict_routing_ips)
691 return failed_to_add;
692 }
693 else
694 {
695 TORRENT_ASSERT(existing->id != e.id);
696 // This is the same IP and port, but with a new node ID.
697 // This may indicate a malicious node so remove the entry.
698 #ifndef TORRENT_DISABLE_LOGGING
699 if (m_log != nullptr && m_log->should_log(dht_logger::routing_table))
700 {
701 m_log->log(dht_logger::routing_table, "evicting node (changed ID): old: %s new: %s %s"
702 , aux::to_hex(existing->id).c_str(), aux::to_hex(e.id).c_str(), print_address(e.addr()).c_str());
703 }
704 #endif
705
706 remove_node(existing, bucket);
707 fill_from_replacements(existing_bucket);
708
709 // when we detect possible malicious activity in a bucket,
710 // schedule the other nodes in the bucket to be pinged soon
711 // to clean out any other malicious nodes
712 auto const now = aux::time_now();
713 for (auto& node : existing_bucket->live_nodes)
714 {
715 if (node.last_queried + minutes(5) < now)
716 node.last_queried = min_time();
717 }
718
719 prune_empty_bucket();
720 return failed_to_add;
721 }
722 }
723
724 // don't add ourself
725 if (e.id == m_id) return failed_to_add;
726
727 auto const i = find_bucket(e.id);
728 bucket_t& b = i->live_nodes;
729 bucket_t& rb = i->replacements;
730 int const bucket_index = int(std::distance(m_buckets.begin(), i));
731 // compare against the max size of the next bucket. Otherwise we may wait too
732 // long to split, and lose nodes (in the case where lower-numbered buckets
733 // are larger)
734 int const bucket_size_limit = bucket_limit(bucket_index);
735
736 bucket_t::iterator j;
737
738 // if the node already exists, we don't need it
739 j = std::find_if(b.begin(), b.end()
740 , [&e](node_entry const& ne) { return ne.id == e.id; });
741
742 if (j != b.end())
743 {
744 // a new IP address just claimed this node-ID
745 // ignore it
746 if (j->addr() != e.addr() || j->port() != e.port())
747 return failed_to_add;
748
749 // we already have the node in our bucket
750 TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
751 j->timeout_count = 0;
752 j->update_rtt(e.rtt);
753 return node_added;
754 }
755
756 // if this node exists in the replacement bucket. update it and
757 // pull it out from there. We may add it back to the replacement
758 // bucket, but we may also replace a node in the main bucket, now
759 // that we have an updated RTT
760 j = std::find_if(rb.begin(), rb.end()
761 , [&e](node_entry const& ne) { return ne.id == e.id; });
762 if (j != rb.end())
763 {
764 // a new IP address just claimed this node-ID
765 // ignore it
766 if (j->addr() != e.addr() || j->port() != e.port())
767 return failed_to_add;
768
769 TORRENT_ASSERT(j->id == e.id && j->ep() == e.ep());
770 j->timeout_count = 0;
771 j->update_rtt(e.rtt);
772 e = *j;
773 m_ips.erase(j->addr());
774 rb.erase(j);
775 }
776
777 if (m_settings.restrict_routing_ips)
778 {
779 // don't allow multiple entries from IPs very close to each other
780 address const& cmp = e.addr();
781 j = std::find_if(b.begin(), b.end(), [&](node_entry const& a) { return compare_ip_cidr(a.addr(), cmp); });
782 if (j == b.end())
783 {
784 j = std::find_if(rb.begin(), rb.end(), [&](node_entry const& a) { return compare_ip_cidr(a.addr(), cmp); });
785 if (j == rb.end()) goto ip_ok;
786 }
787
788 // we already have a node in this bucket with an IP very
789 // close to this one. We know that it's not the same, because
790 // it claims a different node-ID. Ignore this to avoid attacks
791 #ifndef TORRENT_DISABLE_LOGGING
792 if (m_log != nullptr && m_log->should_log(dht_logger::routing_table))
793 {
794 m_log->log(dht_logger::routing_table, "ignoring node: %s %s existing node: %s %s"
795 , aux::to_hex(e.id).c_str(), print_address(e.addr()).c_str()
796 , aux::to_hex(j->id).c_str(), print_address(j->addr()).c_str());
797 }
798 #endif
799 return failed_to_add;
800 }
801 ip_ok:
802
803 // if there's room in the main bucket, just insert it
804 // if we can split the bucket (i.e. it's the last bucket) use the next
805 // bucket's size limit. This makes use split the low-numbered buckets split
806 // earlier when we have larger low buckets, to make it less likely that we
807 // lose nodes
808 if (e.pinged() && int(b.size()) < bucket_size_limit)
809 {
810 if (b.empty()) b.reserve(bucket_size_limit);
811 b.push_back(e);
812 m_ips.insert(e.addr());
813 return node_added;
814 }
815
816 // if there is no room, we look for nodes marked as stale
817 // in the k-bucket. If we find one, we can replace it.
818
819 // A node is considered stale if it has failed at least one
820 // time. Here we choose the node that has failed most times.
821 // If we don't find one, place this node in the replacement-
822 // cache and replace any nodes that will fail in the future
823 // with nodes from that cache.
824
825 bool const last_bucket = bucket_index + 1 == int(m_buckets.size());
826
827 // only nodes that have been confirmed can split the bucket, and we can only
828 // split the last bucket
829 // if all nodes in the bucket, including the new node id (e.id) fall in the
830 // same bucket, splitting isn't going to do anything.
831 bool const can_split = (std::next(i) == m_buckets.end()
832 && m_buckets.size() < 159)
833 && (m_settings.prefer_verified_node_ids == false
834 || (e.verified && mostly_verified_nodes(b)))
835 && e.confirmed()
836 && (i == m_buckets.begin() || std::prev(i)->live_nodes.size() > 1)
837 && !all_in_same_bucket(b, e.id, bucket_index);
838
839 if (can_split) return need_bucket_split;
840
841 if (e.confirmed())
842 {
843 auto const ret = replace_node_impl(e, b, m_ips, bucket_index, bucket_size_limit, last_bucket
844 #ifndef TORRENT_DISABLE_LOGGING
845 , m_log
846 #endif
847 );
848 if (ret != need_bucket_split) return ret;
849 }
850
851 // if we can't split, nor replace anything in the live buckets try to insert
852 // into the replacement bucket
853
854 // if we don't have any identified stale nodes in
855 // the bucket, and the bucket is full, we have to
856 // cache this node and wait until some node fails
857 // and then replace it.
858 j = std::find_if(rb.begin(), rb.end()
859 , [&e](node_entry const& ne) { return ne.id == e.id; });
860
861 // if the node is already in the replacement bucket
862 // just return.
863 if (j != rb.end())
864 {
865 // if the IP address matches, it's the same node
866 // make sure it's marked as pinged
867 if (j->ep() == e.ep()) j->set_pinged();
868 return node_added;
869 }
870
871 if (int(rb.size()) >= m_bucket_size)
872 {
873 // if the replacement bucket is full, remove the oldest entry
874 // but prefer nodes that haven't been pinged, since they are
875 // less reliable than this one, that has been pinged
876 j = std::find_if(rb.begin(), rb.end()
877 , [] (node_entry const& ne) { return !ne.pinged(); });
878 if (j == rb.end())
879 {
880 auto const ret = replace_node_impl(e, rb, m_ips, bucket_index, m_bucket_size, last_bucket
881 #ifndef TORRENT_DISABLE_LOGGING
882 , nullptr
883 #endif
884 );
885 return ret == node_added ? node_added : failed_to_add;
886 }
887 m_ips.erase(j->addr());
888 rb.erase(j);
889 }
890
891 if (rb.empty()) rb.reserve(m_bucket_size);
892 rb.push_back(e);
893 m_ips.insert(e.addr());
894 return node_added;
895 }
896
split_bucket()897 void routing_table::split_bucket()
898 {
899 INVARIANT_CHECK;
900
901 int const bucket_index = int(m_buckets.size()) - 1;
902 int const bucket_size_limit = bucket_limit(bucket_index);
903 TORRENT_ASSERT(int(m_buckets.back().live_nodes.size()) >= bucket_limit(bucket_index + 1));
904
905 // this is the last bucket, and it's full already. Split
906 // it by adding another bucket
907 m_buckets.push_back(routing_table_node());
908 bucket_t& new_bucket = m_buckets.back().live_nodes;
909 bucket_t& new_replacement_bucket = m_buckets.back().replacements;
910
911 bucket_t& b = m_buckets[bucket_index].live_nodes;
912 bucket_t& rb = m_buckets[bucket_index].replacements;
913
914 // move any node whose (160 - distance_exp(m_id, id)) >= (i - m_buckets.begin())
915 // to the new bucket
916 int const new_bucket_size = bucket_limit(bucket_index + 1);
917 for (auto j = b.begin(); j != b.end();)
918 {
919 int const d = distance_exp(m_id, j->id);
920 if (d >= 159 - bucket_index)
921 {
922 ++j;
923 continue;
924 }
925 // this entry belongs in the new bucket
926 new_bucket.push_back(*j);
927 j = b.erase(j);
928 }
929
930 if (int(b.size()) > bucket_size_limit)
931 {
932 // TODO: 2 move the lowest priority nodes to the replacement bucket
933 for (auto i = b.begin() + bucket_size_limit
934 , end(b.end()); i != end; ++i)
935 {
936 rb.push_back(*i);
937 }
938
939 b.resize(bucket_size_limit);
940 }
941
942 // split the replacement bucket as well. If the live bucket
943 // is not full anymore, also move the replacement entries
944 // into the main bucket
945 for (auto j = rb.begin(); j != rb.end();)
946 {
947 if (distance_exp(m_id, j->id) >= 159 - bucket_index)
948 {
949 if (!j->pinged() || int(b.size()) >= bucket_size_limit)
950 {
951 ++j;
952 continue;
953 }
954 b.push_back(*j);
955 }
956 else
957 {
958 // this entry belongs in the new bucket
959 if (j->pinged() && int(new_bucket.size()) < new_bucket_size)
960 new_bucket.push_back(*j);
961 else
962 new_replacement_bucket.push_back(*j);
963 }
964 j = rb.erase(j);
965 }
966 }
967
update_node_id(node_id const & id)968 void routing_table::update_node_id(node_id const& id)
969 {
970 m_id = id;
971
972 m_ips.clear();
973
974 // pull all nodes out of the routing table, effectively emptying it
975 table_t old_buckets;
976 old_buckets.swap(m_buckets);
977
978 // then add them all back. First add the main nodes, then the replacement
979 // nodes
980 for (auto const& b : old_buckets)
981 for (auto const& n : b.live_nodes)
982 add_node(n);
983
984 // now add back the replacement nodes
985 for (auto const& b : old_buckets)
986 for (auto const& n : b.replacements)
987 add_node(n);
988 }
989
for_each_node(std::function<void (node_entry const &)> live_cb,std::function<void (node_entry const &)> replacements_cb) const990 void routing_table::for_each_node(std::function<void(node_entry const&)> live_cb
991 , std::function<void(node_entry const&)> replacements_cb) const
992 {
993 for (auto const& i : m_buckets)
994 {
995 if (live_cb)
996 {
997 for (auto const& j : i.live_nodes)
998 live_cb(j);
999 }
1000 if (replacements_cb)
1001 {
1002 for (auto const& j : i.replacements)
1003 replacements_cb(j);
1004 }
1005 }
1006 }
1007
node_failed(node_id const & nid,udp::endpoint const & ep)1008 void routing_table::node_failed(node_id const& nid, udp::endpoint const& ep)
1009 {
1010 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
1011 INVARIANT_CHECK;
1012 #endif
1013
1014 // if messages to ourself fails, ignore it
1015 if (nid == m_id) return;
1016
1017 auto const i = find_bucket(nid);
1018 bucket_t& b = i->live_nodes;
1019 bucket_t& rb = i->replacements;
1020
1021 auto j = std::find_if(b.begin(), b.end()
1022 , [&nid](node_entry const& ne) { return ne.id == nid; });
1023
1024 if (j == b.end())
1025 {
1026 j = std::find_if(rb.begin(), rb.end()
1027 , [&nid](node_entry const& ne) { return ne.id == nid; });
1028
1029 if (j == rb.end()
1030 || j->ep() != ep) return;
1031
1032 j->timed_out();
1033
1034 #ifndef TORRENT_DISABLE_LOGGING
1035 log_node_failed(nid, *j);
1036 #endif
1037 return;
1038 }
1039
1040 // if the endpoint doesn't match, it's a different node
1041 // claiming the same ID. The node we have in our routing
1042 // table is not necessarily stale
1043 if (j->ep() != ep) return;
1044
1045 if (rb.empty())
1046 {
1047 j->timed_out();
1048
1049 #ifndef TORRENT_DISABLE_LOGGING
1050 log_node_failed(nid, *j);
1051 #endif
1052
1053 // if this node has failed too many times, or if this node
1054 // has never responded at all, remove it
1055 if (j->fail_count() >= m_settings.max_fail_count || !j->pinged())
1056 {
1057 m_ips.erase(j->addr());
1058 b.erase(j);
1059 }
1060 return;
1061 }
1062
1063 m_ips.erase(j->addr());
1064 b.erase(j);
1065
1066 fill_from_replacements(i);
1067 prune_empty_bucket();
1068 }
1069
add_router_node(udp::endpoint const & router)1070 void routing_table::add_router_node(udp::endpoint const& router)
1071 {
1072 m_router_nodes.insert(router);
1073 }
1074
1075 // we heard from this node, but we don't know if it was spoofed or not (i.e.
1076 // pinged == false)
heard_about(node_id const & id,udp::endpoint const & ep)1077 void routing_table::heard_about(node_id const& id, udp::endpoint const& ep)
1078 {
1079 if (!verify_node_address(m_settings, id, ep.address())) return;
1080 add_node(node_entry(id, ep));
1081 }
1082
1083 // this function is called every time the node sees a sign of a node being
1084 // alive. This node will either be inserted in the k-buckets or be moved to the
1085 // top of its bucket. the return value indicates if the table needs a refresh.
1086 // if true, the node should refresh the table (i.e. do a find_node on its own
1087 // id)
node_seen(node_id const & id,udp::endpoint const & ep,int const rtt)1088 bool routing_table::node_seen(node_id const& id, udp::endpoint const& ep, int const rtt)
1089 {
1090 return verify_node_address(m_settings, id, ep.address()) && add_node(node_entry(id, ep, rtt, true));
1091 }
1092
1093 // fills the vector with the k nodes from our buckets that
1094 // are nearest to the given id.
find_node(node_id const & target,std::vector<node_entry> & l,int const options,int count)1095 void routing_table::find_node(node_id const& target
1096 , std::vector<node_entry>& l, int const options, int count)
1097 {
1098 l.clear();
1099 if (count == 0) count = m_bucket_size;
1100
1101 auto const i = find_bucket(target);
1102 int const bucket_index = int(std::distance(m_buckets.begin(), i));
1103 int const bucket_size_limit = bucket_limit(bucket_index);
1104
1105 l.reserve(aux::numeric_cast<std::size_t>(bucket_size_limit));
1106
1107 table_t::iterator j = i;
1108
1109 int unsorted_start_idx = 0;
1110 for (; j != m_buckets.end() && int(l.size()) < count; ++j)
1111 {
1112 bucket_t const& b = j->live_nodes;
1113 if (options & include_failed)
1114 {
1115 std::copy(b.begin(), b.end(), std::back_inserter(l));
1116 }
1117 else
1118 {
1119 std::remove_copy_if(b.begin(), b.end(), std::back_inserter(l)
1120 , [](node_entry const& ne) { return !ne.confirmed(); });
1121 }
1122
1123 if (int(l.size()) == count) return;
1124
1125 if (int(l.size()) > count)
1126 {
1127 // sort the nodes by how close they are to the target
1128 std::sort(l.begin() + unsorted_start_idx, l.end()
1129 , [&target](node_entry const& lhs, node_entry const& rhs)
1130 { return compare_ref(lhs.id, rhs.id, target); });
1131
1132 l.resize(aux::numeric_cast<std::size_t>(count));
1133 return;
1134 }
1135 unsorted_start_idx = int(l.size());
1136 }
1137
1138 // if we still don't have enough nodes, copy nodes
1139 // further away from us
1140
1141 if (i == m_buckets.begin())
1142 return;
1143
1144 j = i;
1145
1146 unsorted_start_idx = int(l.size());
1147 do
1148 {
1149 --j;
1150 bucket_t const& b = j->live_nodes;
1151
1152 if (options & include_failed)
1153 {
1154 std::copy(b.begin(), b.end(), std::back_inserter(l));
1155 }
1156 else
1157 {
1158 std::remove_copy_if(b.begin(), b.end(), std::back_inserter(l)
1159 , [](node_entry const& ne) { return !ne.confirmed(); });
1160 }
1161
1162 if (int(l.size()) == count) return;
1163
1164 if (int(l.size()) > count)
1165 {
1166 // sort the nodes by how close they are to the target
1167 std::sort(l.begin() + unsorted_start_idx, l.end()
1168 , [&target](node_entry const& lhs, node_entry const& rhs)
1169 { return compare_ref(lhs.id, rhs.id, target); });
1170
1171 l.resize(aux::numeric_cast<std::size_t>(count));
1172 return;
1173 }
1174 unsorted_start_idx = int(l.size());
1175 }
1176 while (j != m_buckets.begin() && int(l.size()) < count);
1177
1178 TORRENT_ASSERT(int(l.size()) <= count);
1179 }
1180
1181 #if TORRENT_USE_INVARIANT_CHECKS
check_invariant() const1182 void routing_table::check_invariant() const
1183 {
1184 ip_set all_ips;
1185
1186 for (auto const& i : m_buckets)
1187 {
1188 for (auto const& j : i.replacements)
1189 {
1190 all_ips.insert(j.addr());
1191 }
1192 for (auto const& j : i.live_nodes)
1193 {
1194 TORRENT_ASSERT(j.addr().is_v4() == i.live_nodes.begin()->addr().is_v4());
1195 TORRENT_ASSERT(j.pinged());
1196 all_ips.insert(j.addr());
1197 }
1198 }
1199
1200 TORRENT_ASSERT(all_ips == m_ips);
1201 }
1202 #endif
1203
is_full(int const bucket) const1204 bool routing_table::is_full(int const bucket) const
1205 {
1206 int const num_buckets = int(m_buckets.size());
1207 if (num_buckets == 0) return false;
1208 if (bucket >= num_buckets) return false;
1209
1210 auto i = m_buckets.cbegin();
1211 std::advance(i, bucket);
1212 return (int(i->live_nodes.size()) >= bucket_limit(bucket)
1213 && int(i->replacements.size()) >= m_bucket_size);
1214 }
1215
1216 #ifndef TORRENT_DISABLE_LOGGING
log_node_failed(node_id const & nid,node_entry const & ne) const1217 void routing_table::log_node_failed(node_id const& nid, node_entry const& ne) const
1218 {
1219 if (m_log != nullptr && m_log->should_log(dht_logger::routing_table))
1220 {
1221 m_log->log(dht_logger::routing_table, "NODE FAILED id: %s ip: %s fails: %d pinged: %d up-time: %d"
1222 , aux::to_hex(nid).c_str(), print_endpoint(ne.ep()).c_str()
1223 , ne.fail_count()
1224 , int(ne.pinged())
1225 , int(total_seconds(aux::time_now() - ne.first_seen)));
1226 }
1227 }
1228 #endif
1229
1230 } } // namespace libtorrent::dht
1231