1 /*
2
3 Copyright (c) 2012-2018, Arvid Norberg, Alden Torres
4 All rights reserved.
5
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30
31 */
32
33 #include "libtorrent/kademlia/dht_storage.hpp"
34 #include "libtorrent/kademlia/dht_settings.hpp"
35
36 #include <tuple>
37 #include <algorithm>
38 #include <utility>
39 #include <map>
40 #include <set>
41 #include <string>
42
43 #include <libtorrent/socket_io.hpp>
44 #include <libtorrent/aux_/time.hpp>
45 #include <libtorrent/config.hpp>
46 #include <libtorrent/bloom_filter.hpp>
47 #include <libtorrent/random.hpp>
48 #include <libtorrent/aux_/vector.hpp>
49 #include <libtorrent/aux_/numeric_cast.hpp>
50 #include <libtorrent/broadcast_socket.hpp> // for ip_v4
51 #include <libtorrent/bdecode.hpp>
52
53 namespace libtorrent { namespace dht {
54 namespace {
55
56 // this is the entry for every peer
57 // the timestamp is there to make it possible
58 // to remove stale peers
59 struct peer_entry
60 {
61 time_point added;
62 tcp::endpoint addr;
63 bool seed = 0;
64 };
65
66 // internal
operator <(peer_entry const & lhs,peer_entry const & rhs)67 bool operator<(peer_entry const& lhs, peer_entry const& rhs)
68 {
69 return lhs.addr.address() == rhs.addr.address()
70 ? lhs.addr.port() < rhs.addr.port()
71 : lhs.addr.address() < rhs.addr.address();
72 }
73
74 // this is a group. It contains a set of group members
75 struct torrent_entry
76 {
77 std::string name;
78 std::vector<peer_entry> peers4;
79 std::vector<peer_entry> peers6;
80 };
81
82 // TODO: 2 make this configurable in dht_settings
83 constexpr time_duration announce_interval = minutes(30);
84
85 struct dht_immutable_item
86 {
87 // the actual value
88 std::unique_ptr<char[]> value;
89 // this counts the number of IPs we have seen
90 // announcing this item, this is used to determine
91 // popularity if we reach the limit of items to store
92 bloom_filter<128> ips;
93 // the last time we heard about this item
94 // the correct interpretation of this field
95 // requires a time reference
96 time_point last_seen;
97 // number of IPs in the bloom filter
98 int num_announcers = 0;
99 // size of malloced space pointed to by value
100 int size = 0;
101 };
102
103 struct dht_mutable_item : dht_immutable_item
104 {
105 signature sig{};
106 sequence_number seq{};
107 public_key key{};
108 std::string salt;
109 };
110
set_value(dht_immutable_item & item,span<char const> buf)111 void set_value(dht_immutable_item& item, span<char const> buf)
112 {
113 int const size = int(buf.size());
114 if (item.size != size)
115 {
116 item.value.reset(new char[std::size_t(size)]);
117 item.size = size;
118 }
119 std::copy(buf.begin(), buf.end(), item.value.get());
120 }
121
touch_item(dht_immutable_item & f,address const & addr)122 void touch_item(dht_immutable_item& f, address const& addr)
123 {
124 f.last_seen = aux::time_now();
125
126 // maybe increase num_announcers if we haven't seen this IP before
127 sha1_hash const iphash = hash_address(addr);
128 if (!f.ips.find(iphash))
129 {
130 f.ips.set(iphash);
131 ++f.num_announcers;
132 }
133 }
134
135 // return true of the first argument is a better candidate for removal, i.e.
136 // less important to keep
137 struct immutable_item_comparator
138 {
immutable_item_comparatorlibtorrent::dht::__anonfc0c32810111::immutable_item_comparator139 explicit immutable_item_comparator(std::vector<node_id> const& node_ids) : m_node_ids(node_ids) {}
140 immutable_item_comparator(immutable_item_comparator const&) = default;
141
142 template <typename Item>
operator ()libtorrent::dht::__anonfc0c32810111::immutable_item_comparator143 bool operator()(std::pair<node_id const, Item> const& lhs
144 , std::pair<node_id const, Item> const& rhs) const
145 {
146 int const l_distance = min_distance_exp(lhs.first, m_node_ids);
147 int const r_distance = min_distance_exp(rhs.first, m_node_ids);
148
149 // this is a score taking the popularity (number of announcers) and the
150 // fit, in terms of distance from ideal storing node, into account.
151 // each additional 5 announcers is worth one extra bit in the distance.
152 // that is, an item with 10 announcers is allowed to be twice as far
153 // from another item with 5 announcers, from our node ID. Twice as far
154 // because it gets one more bit.
155 return lhs.second.num_announcers / 5 - l_distance < rhs.second.num_announcers / 5 - r_distance;
156 }
157
158 private:
159
160 // explicitly disallow assignment, to silence msvc warning
161 immutable_item_comparator& operator=(immutable_item_comparator const&) = delete;
162
163 std::vector<node_id> const& m_node_ids;
164 };
165
166 // picks the least important one (i.e. the one
167 // the fewest peers are announcing, and farthest
168 // from our node IDs)
169 template<class Item>
pick_least_important_item(std::vector<node_id> const & node_ids,std::map<node_id,Item> const & table)170 typename std::map<node_id, Item>::const_iterator pick_least_important_item(
171 std::vector<node_id> const& node_ids, std::map<node_id, Item> const& table)
172 {
173 return std::min_element(table.begin(), table.end()
174 , immutable_item_comparator(node_ids));
175 }
176
177 constexpr int sample_infohashes_interval_max = 21600;
178 constexpr int infohashes_sample_count_max = 20;
179
180 struct infohashes_sample
181 {
182 aux::vector<sha1_hash> samples;
183 time_point created = min_time();
184
countlibtorrent::dht::__anonfc0c32810111::infohashes_sample185 int count() const { return int(samples.size()); }
186 };
187
188 class dht_default_storage final : public dht_storage_interface
189 {
190 public:
191
dht_default_storage(dht_settings const & settings)192 explicit dht_default_storage(dht_settings const& settings)
193 : m_settings(settings)
194 {
195 m_counters.reset();
196 }
197
198 ~dht_default_storage() override = default;
199
200 dht_default_storage(dht_default_storage const&) = delete;
201 dht_default_storage& operator=(dht_default_storage const&) = delete;
202
203 #if TORRENT_ABI_VERSION == 1
num_torrents() const204 size_t num_torrents() const override { return m_map.size(); }
num_peers() const205 size_t num_peers() const override
206 {
207 size_t ret = 0;
208 for (auto const& t : m_map)
209 ret += t.second.peers4.size() + t.second.peers6.size();
210 return ret;
211 }
212 #endif
update_node_ids(std::vector<node_id> const & ids)213 void update_node_ids(std::vector<node_id> const& ids) override
214 {
215 m_node_ids = ids;
216 }
217
get_peers(sha1_hash const & info_hash,bool const noseed,bool const scrape,address const & requester,entry & peers) const218 bool get_peers(sha1_hash const& info_hash
219 , bool const noseed, bool const scrape, address const& requester
220 , entry& peers) const override
221 {
222 auto const i = m_map.find(info_hash);
223 if (i == m_map.end()) return int(m_map.size()) >= m_settings.max_torrents;
224
225 torrent_entry const& v = i->second;
226 auto const& peersv = requester.is_v4() ? v.peers4 : v.peers6;
227
228 if (!v.name.empty()) peers["n"] = v.name;
229
230 if (scrape)
231 {
232 bloom_filter<256> downloaders;
233 bloom_filter<256> seeds;
234
235 for (auto const& p : peersv)
236 {
237 sha1_hash const iphash = hash_address(p.addr.address());
238 if (p.seed) seeds.set(iphash);
239 else downloaders.set(iphash);
240 }
241
242 peers["BFpe"] = downloaders.to_string();
243 peers["BFsd"] = seeds.to_string();
244 }
245 else
246 {
247 tcp const protocol = requester.is_v4() ? tcp::v4() : tcp::v6();
248 int to_pick = m_settings.max_peers_reply;
249 TORRENT_ASSERT(to_pick >= 0);
250 // if these are IPv6 peers their addresses are 4x the size of IPv4
251 // so reduce the max peers 4 fold to compensate
252 // max_peers_reply should probably be specified in bytes
253 if (!peersv.empty() && protocol == tcp::v6())
254 to_pick /= 4;
255 entry::list_type& pe = peers["values"].list();
256
257 int candidates = int(std::count_if(peersv.begin(), peersv.end()
258 , [=](peer_entry const& e) { return !(noseed && e.seed); }));
259
260 to_pick = std::min(to_pick, candidates);
261
262 for (auto iter = peersv.begin(); to_pick > 0; ++iter)
263 {
264 // if the node asking for peers is a seed, skip seeds from the
265 // peer list
266 if (noseed && iter->seed) continue;
267
268 TORRENT_ASSERT(candidates >= to_pick);
269
270 // pick this peer with probability
271 // <peers left to pick> / <peers left in the set>
272 if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick))
273 continue;
274
275 pe.emplace_back();
276 std::string& str = pe.back().string();
277
278 str.resize(18);
279 std::string::iterator out = str.begin();
280 detail::write_endpoint(iter->addr, out);
281 str.resize(std::size_t(out - str.begin()));
282
283 --to_pick;
284 }
285 }
286
287 if (int(peersv.size()) < m_settings.max_peers)
288 return false;
289
290 // we're at the max peers stored for this torrent
291 // only send a write token if the requester is already in the set
292 // only check for a match on IP because the peer may be announcing
293 // a different port than the one it is using to send DHT messages
294 peer_entry requester_entry;
295 requester_entry.addr.address(requester);
296 auto requester_iter = std::lower_bound(peersv.begin(), peersv.end(), requester_entry);
297 return requester_iter == peersv.end()
298 || requester_iter->addr.address() != requester;
299 }
300
announce_peer(sha1_hash const & info_hash,tcp::endpoint const & endp,string_view name,bool const seed)301 void announce_peer(sha1_hash const& info_hash
302 , tcp::endpoint const& endp
303 , string_view name, bool const seed) override
304 {
305 auto const ti = m_map.find(info_hash);
306 torrent_entry* v;
307 if (ti == m_map.end())
308 {
309 if (int(m_map.size()) >= m_settings.max_torrents)
310 {
311 // we're at capacity, drop the announce
312 return;
313 }
314
315 m_counters.torrents += 1;
316 v = &m_map[info_hash];
317 }
318 else
319 {
320 v = &ti->second;
321 }
322
323 // the peer announces a torrent name, and we don't have a name
324 // for this torrent. Store it.
325 if (!name.empty() && v->name.empty())
326 {
327 v->name = name.substr(0, 100).to_string();
328 }
329
330 auto& peersv = is_v4(endp) ? v->peers4 : v->peers6;
331
332 peer_entry peer;
333 peer.addr = endp;
334 peer.added = aux::time_now();
335 peer.seed = seed;
336 auto i = std::lower_bound(peersv.begin(), peersv.end(), peer);
337 if (i != peersv.end() && i->addr == endp)
338 {
339 *i = peer;
340 }
341 else if (int(peersv.size()) >= m_settings.max_peers)
342 {
343 // we're at capacity, drop the announce
344 return;
345 }
346 else
347 {
348 peersv.insert(i, peer);
349 m_counters.peers += 1;
350 }
351 }
352
get_immutable_item(sha1_hash const & target,entry & item) const353 bool get_immutable_item(sha1_hash const& target
354 , entry& item) const override
355 {
356 auto const i = m_immutable_table.find(target);
357 if (i == m_immutable_table.end()) return false;
358
359 error_code ec;
360 item["v"] = bdecode({i->second.value.get(), i->second.size}, ec);
361 return true;
362 }
363
put_immutable_item(sha1_hash const & target,span<char const> buf,address const & addr)364 void put_immutable_item(sha1_hash const& target
365 , span<char const> buf
366 , address const& addr) override
367 {
368 TORRENT_ASSERT(!m_node_ids.empty());
369 auto i = m_immutable_table.find(target);
370 if (i == m_immutable_table.end())
371 {
372 // make sure we don't add too many items
373 if (int(m_immutable_table.size()) >= m_settings.max_dht_items)
374 {
375 auto const j = pick_least_important_item(m_node_ids
376 , m_immutable_table);
377
378 TORRENT_ASSERT(j != m_immutable_table.end());
379 m_immutable_table.erase(j);
380 m_counters.immutable_data -= 1;
381 }
382 dht_immutable_item to_add;
383 set_value(to_add, buf);
384
385 std::tie(i, std::ignore) = m_immutable_table.insert(
386 std::make_pair(target, std::move(to_add)));
387 m_counters.immutable_data += 1;
388 }
389
390 // std::fprintf(stderr, "added immutable item (%d)\n", int(m_immutable_table.size()));
391
392 touch_item(i->second, addr);
393 }
394
get_mutable_item_seq(sha1_hash const & target,sequence_number & seq) const395 bool get_mutable_item_seq(sha1_hash const& target
396 , sequence_number& seq) const override
397 {
398 auto const i = m_mutable_table.find(target);
399 if (i == m_mutable_table.end()) return false;
400
401 seq = i->second.seq;
402 return true;
403 }
404
get_mutable_item(sha1_hash const & target,sequence_number const seq,bool const force_fill,entry & item) const405 bool get_mutable_item(sha1_hash const& target
406 , sequence_number const seq, bool const force_fill
407 , entry& item) const override
408 {
409 auto const i = m_mutable_table.find(target);
410 if (i == m_mutable_table.end()) return false;
411
412 dht_mutable_item const& f = i->second;
413 item["seq"] = f.seq.value;
414 if (force_fill || (sequence_number(0) <= seq && seq < f.seq))
415 {
416 error_code ec;
417 item["v"] = bdecode({f.value.get(), f.size}, ec);
418 item["sig"] = f.sig.bytes;
419 item["k"] = f.key.bytes;
420 }
421 return true;
422 }
423
put_mutable_item(sha1_hash const & target,span<char const> buf,signature const & sig,sequence_number const seq,public_key const & pk,span<char const> salt,address const & addr)424 void put_mutable_item(sha1_hash const& target
425 , span<char const> buf
426 , signature const& sig
427 , sequence_number const seq
428 , public_key const& pk
429 , span<char const> salt
430 , address const& addr) override
431 {
432 TORRENT_ASSERT(!m_node_ids.empty());
433 auto i = m_mutable_table.find(target);
434 if (i == m_mutable_table.end())
435 {
436 // this is the case where we don't have an item in this slot
437 // make sure we don't add too many items
438 if (int(m_mutable_table.size()) >= m_settings.max_dht_items)
439 {
440 auto const j = pick_least_important_item(m_node_ids
441 , m_mutable_table);
442
443 TORRENT_ASSERT(j != m_mutable_table.end());
444 m_mutable_table.erase(j);
445 m_counters.mutable_data -= 1;
446 }
447 dht_mutable_item to_add;
448 set_value(to_add, buf);
449 to_add.seq = seq;
450 to_add.salt = {salt.begin(), salt.end()};
451 to_add.sig = sig;
452 to_add.key = pk;
453
454 std::tie(i, std::ignore) = m_mutable_table.insert(
455 std::make_pair(target, std::move(to_add)));
456 m_counters.mutable_data += 1;
457 }
458 else
459 {
460 // this is the case where we already have an item in this slot
461 dht_mutable_item& item = i->second;
462
463 if (item.seq < seq)
464 {
465 set_value(item, buf);
466 item.seq = seq;
467 item.sig = sig;
468 }
469 }
470
471 touch_item(i->second, addr);
472 }
473
get_infohashes_sample(entry & item)474 int get_infohashes_sample(entry& item) override
475 {
476 item["interval"] = aux::clamp(m_settings.sample_infohashes_interval
477 , 0, sample_infohashes_interval_max);
478 item["num"] = int(m_map.size());
479
480 refresh_infohashes_sample();
481
482 aux::vector<sha1_hash> const& samples = m_infohashes_sample.samples;
483 item["samples"] = span<char const>(
484 reinterpret_cast<char const*>(samples.data()), static_cast<std::ptrdiff_t>(samples.size()) * 20);
485
486 return m_infohashes_sample.count();
487 }
488
tick()489 void tick() override
490 {
491 // look through all peers and see if any have timed out
492 for (auto i = m_map.begin(), end(m_map.end()); i != end;)
493 {
494 torrent_entry& t = i->second;
495 purge_peers(t.peers4);
496 purge_peers(t.peers6);
497
498 if (!t.peers4.empty() || !t.peers6.empty())
499 {
500 ++i;
501 continue;
502 }
503
504 // if there are no more peers, remove the entry altogether
505 i = m_map.erase(i);
506 m_counters.torrents -= 1;// peers is decreased by purge_peers
507 }
508
509 if (0 == m_settings.item_lifetime) return;
510
511 time_point const now = aux::time_now();
512 time_duration lifetime = seconds(m_settings.item_lifetime);
513 // item lifetime must >= 120 minutes.
514 if (lifetime < minutes(120)) lifetime = minutes(120);
515
516 for (auto i = m_immutable_table.begin(); i != m_immutable_table.end();)
517 {
518 if (i->second.last_seen + lifetime > now)
519 {
520 ++i;
521 continue;
522 }
523 i = m_immutable_table.erase(i);
524 m_counters.immutable_data -= 1;
525 }
526
527 for (auto i = m_mutable_table.begin(); i != m_mutable_table.end();)
528 {
529 if (i->second.last_seen + lifetime > now)
530 {
531 ++i;
532 continue;
533 }
534 i = m_mutable_table.erase(i);
535 m_counters.mutable_data -= 1;
536 }
537 }
538
counters() const539 dht_storage_counters counters() const override
540 {
541 return m_counters;
542 }
543
544 private:
545 dht_settings const& m_settings;
546 dht_storage_counters m_counters;
547
548 std::vector<node_id> m_node_ids;
549 std::map<node_id, torrent_entry> m_map;
550 std::map<node_id, dht_immutable_item> m_immutable_table;
551 std::map<node_id, dht_mutable_item> m_mutable_table;
552
553 infohashes_sample m_infohashes_sample;
554
purge_peers(std::vector<peer_entry> & peers)555 void purge_peers(std::vector<peer_entry>& peers)
556 {
557 auto now = aux::time_now();
558 auto new_end = std::remove_if(peers.begin(), peers.end()
559 , [=](peer_entry const& e)
560 {
561 return e.added + announce_interval * 3 / 2 < now;
562 });
563
564 m_counters.peers -= std::int32_t(std::distance(new_end, peers.end()));
565 peers.erase(new_end, peers.end());
566 // if we're using less than 1/4 of the capacity free up the excess
567 if (!peers.empty() && peers.capacity() / peers.size() >= 4U)
568 peers.shrink_to_fit();
569 }
570
refresh_infohashes_sample()571 void refresh_infohashes_sample()
572 {
573 time_point const now = aux::time_now();
574 int const interval = aux::clamp(m_settings.sample_infohashes_interval
575 , 0, sample_infohashes_interval_max);
576
577 int const max_count = aux::clamp(m_settings.max_infohashes_sample_count
578 , 0, infohashes_sample_count_max);
579 int const count = std::min(max_count, int(m_map.size()));
580
581 if (interval > 0
582 && m_infohashes_sample.created + seconds(interval) > now
583 && m_infohashes_sample.count() >= max_count)
584 return;
585
586 aux::vector<sha1_hash>& samples = m_infohashes_sample.samples;
587 samples.clear();
588 samples.reserve(count);
589
590 int to_pick = count;
591 int candidates = int(m_map.size());
592
593 for (auto const& t : m_map)
594 {
595 if (to_pick == 0)
596 break;
597
598 TORRENT_ASSERT(candidates >= to_pick);
599
600 // pick this key with probability
601 // <keys left to pick> / <keys left in the set>
602 if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick))
603 continue;
604
605 samples.push_back(t.first);
606 --to_pick;
607 }
608
609 TORRENT_ASSERT(int(samples.size()) == count);
610 m_infohashes_sample.created = now;
611 }
612 };
613 }
614
reset()615 void dht_storage_counters::reset()
616 {
617 torrents = 0;
618 peers = 0;
619 immutable_data = 0;
620 mutable_data = 0;
621 }
622
dht_default_storage_constructor(dht_settings const & settings)623 std::unique_ptr<dht_storage_interface> dht_default_storage_constructor(
624 dht_settings const& settings)
625 {
626 return std::unique_ptr<dht_default_storage>(new dht_default_storage(settings));
627 }
628
629 } } // namespace libtorrent::dht
630