1 /*
2 
3 Copyright (c) 2012-2018, Arvid Norberg, Alden Torres
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include "libtorrent/kademlia/dht_storage.hpp"
34 #include "libtorrent/kademlia/dht_settings.hpp"
35 
36 #include <tuple>
37 #include <algorithm>
38 #include <utility>
39 #include <map>
40 #include <set>
41 #include <string>
42 
43 #include <libtorrent/socket_io.hpp>
44 #include <libtorrent/aux_/time.hpp>
45 #include <libtorrent/config.hpp>
46 #include <libtorrent/bloom_filter.hpp>
47 #include <libtorrent/random.hpp>
48 #include <libtorrent/aux_/vector.hpp>
49 #include <libtorrent/aux_/numeric_cast.hpp>
50 #include <libtorrent/broadcast_socket.hpp> // for ip_v4
51 #include <libtorrent/bdecode.hpp>
52 
53 namespace libtorrent { namespace dht {
54 namespace {
55 
56 	// this is the entry for every peer
57 	// the timestamp is there to make it possible
58 	// to remove stale peers
59 	struct peer_entry
60 	{
61 		time_point added;
62 		tcp::endpoint addr;
63 		bool seed = 0;
64 	};
65 
66 	// internal
operator <(peer_entry const & lhs,peer_entry const & rhs)67 	bool operator<(peer_entry const& lhs, peer_entry const& rhs)
68 	{
69 		return lhs.addr.address() == rhs.addr.address()
70 			? lhs.addr.port() < rhs.addr.port()
71 			: lhs.addr.address() < rhs.addr.address();
72 	}
73 
74 	// this is a group. It contains a set of group members
75 	struct torrent_entry
76 	{
77 		std::string name;
78 		std::vector<peer_entry> peers4;
79 		std::vector<peer_entry> peers6;
80 	};
81 
82 	// TODO: 2 make this configurable in dht_settings
83 	constexpr time_duration announce_interval = minutes(30);
84 
85 	struct dht_immutable_item
86 	{
87 		// the actual value
88 		std::unique_ptr<char[]> value;
89 		// this counts the number of IPs we have seen
90 		// announcing this item, this is used to determine
91 		// popularity if we reach the limit of items to store
92 		bloom_filter<128> ips;
93 		// the last time we heard about this item
94 		// the correct interpretation of this field
95 		// requires a time reference
96 		time_point last_seen;
97 		// number of IPs in the bloom filter
98 		int num_announcers = 0;
99 		// size of malloced space pointed to by value
100 		int size = 0;
101 	};
102 
103 	struct dht_mutable_item : dht_immutable_item
104 	{
105 		signature sig{};
106 		sequence_number seq{};
107 		public_key key{};
108 		std::string salt;
109 	};
110 
set_value(dht_immutable_item & item,span<char const> buf)111 	void set_value(dht_immutable_item& item, span<char const> buf)
112 	{
113 		int const size = int(buf.size());
114 		if (item.size != size)
115 		{
116 			item.value.reset(new char[std::size_t(size)]);
117 			item.size = size;
118 		}
119 		std::copy(buf.begin(), buf.end(), item.value.get());
120 	}
121 
touch_item(dht_immutable_item & f,address const & addr)122 	void touch_item(dht_immutable_item& f, address const& addr)
123 	{
124 		f.last_seen = aux::time_now();
125 
126 		// maybe increase num_announcers if we haven't seen this IP before
127 		sha1_hash const iphash = hash_address(addr);
128 		if (!f.ips.find(iphash))
129 		{
130 			f.ips.set(iphash);
131 			++f.num_announcers;
132 		}
133 	}
134 
135 	// return true of the first argument is a better candidate for removal, i.e.
136 	// less important to keep
137 	struct immutable_item_comparator
138 	{
immutable_item_comparatorlibtorrent::dht::__anonfc0c32810111::immutable_item_comparator139 		explicit immutable_item_comparator(std::vector<node_id> const& node_ids) : m_node_ids(node_ids) {}
140 		immutable_item_comparator(immutable_item_comparator const&) = default;
141 
142 		template <typename Item>
operator ()libtorrent::dht::__anonfc0c32810111::immutable_item_comparator143 		bool operator()(std::pair<node_id const, Item> const& lhs
144 			, std::pair<node_id const, Item> const& rhs) const
145 		{
146 			int const l_distance = min_distance_exp(lhs.first, m_node_ids);
147 			int const r_distance = min_distance_exp(rhs.first, m_node_ids);
148 
149 			// this is a score taking the popularity (number of announcers) and the
150 			// fit, in terms of distance from ideal storing node, into account.
151 			// each additional 5 announcers is worth one extra bit in the distance.
152 			// that is, an item with 10 announcers is allowed to be twice as far
153 			// from another item with 5 announcers, from our node ID. Twice as far
154 			// because it gets one more bit.
155 			return lhs.second.num_announcers / 5 - l_distance < rhs.second.num_announcers / 5 - r_distance;
156 		}
157 
158 	private:
159 
160 		// explicitly disallow assignment, to silence msvc warning
161 		immutable_item_comparator& operator=(immutable_item_comparator const&) = delete;
162 
163 		std::vector<node_id> const& m_node_ids;
164 	};
165 
166 	// picks the least important one (i.e. the one
167 	// the fewest peers are announcing, and farthest
168 	// from our node IDs)
169 	template<class Item>
pick_least_important_item(std::vector<node_id> const & node_ids,std::map<node_id,Item> const & table)170 	typename std::map<node_id, Item>::const_iterator pick_least_important_item(
171 		std::vector<node_id> const& node_ids, std::map<node_id, Item> const& table)
172 	{
173 		return std::min_element(table.begin(), table.end()
174 			, immutable_item_comparator(node_ids));
175 	}
176 
177 	constexpr int sample_infohashes_interval_max = 21600;
178 	constexpr int infohashes_sample_count_max = 20;
179 
180 	struct infohashes_sample
181 	{
182 		aux::vector<sha1_hash> samples;
183 		time_point created = min_time();
184 
countlibtorrent::dht::__anonfc0c32810111::infohashes_sample185 		int count() const { return int(samples.size()); }
186 	};
187 
188 	class dht_default_storage final : public dht_storage_interface
189 	{
190 	public:
191 
dht_default_storage(dht_settings const & settings)192 		explicit dht_default_storage(dht_settings const& settings)
193 			: m_settings(settings)
194 		{
195 			m_counters.reset();
196 		}
197 
198 		~dht_default_storage() override = default;
199 
200 		dht_default_storage(dht_default_storage const&) = delete;
201 		dht_default_storage& operator=(dht_default_storage const&) = delete;
202 
203 #if TORRENT_ABI_VERSION == 1
num_torrents() const204 		size_t num_torrents() const override { return m_map.size(); }
num_peers() const205 		size_t num_peers() const override
206 		{
207 			size_t ret = 0;
208 			for (auto const& t : m_map)
209 				ret += t.second.peers4.size() + t.second.peers6.size();
210 			return ret;
211 		}
212 #endif
update_node_ids(std::vector<node_id> const & ids)213 		void update_node_ids(std::vector<node_id> const& ids) override
214 		{
215 			m_node_ids = ids;
216 		}
217 
get_peers(sha1_hash const & info_hash,bool const noseed,bool const scrape,address const & requester,entry & peers) const218 		bool get_peers(sha1_hash const& info_hash
219 			, bool const noseed, bool const scrape, address const& requester
220 			, entry& peers) const override
221 		{
222 			auto const i = m_map.find(info_hash);
223 			if (i == m_map.end()) return int(m_map.size()) >= m_settings.max_torrents;
224 
225 			torrent_entry const& v = i->second;
226 			auto const& peersv = requester.is_v4() ? v.peers4 : v.peers6;
227 
228 			if (!v.name.empty()) peers["n"] = v.name;
229 
230 			if (scrape)
231 			{
232 				bloom_filter<256> downloaders;
233 				bloom_filter<256> seeds;
234 
235 				for (auto const& p : peersv)
236 				{
237 					sha1_hash const iphash = hash_address(p.addr.address());
238 					if (p.seed) seeds.set(iphash);
239 					else downloaders.set(iphash);
240 				}
241 
242 				peers["BFpe"] = downloaders.to_string();
243 				peers["BFsd"] = seeds.to_string();
244 			}
245 			else
246 			{
247 				tcp const protocol = requester.is_v4() ? tcp::v4() : tcp::v6();
248 				int to_pick = m_settings.max_peers_reply;
249 				TORRENT_ASSERT(to_pick >= 0);
250 				// if these are IPv6 peers their addresses are 4x the size of IPv4
251 				// so reduce the max peers 4 fold to compensate
252 				// max_peers_reply should probably be specified in bytes
253 				if (!peersv.empty() && protocol == tcp::v6())
254 					to_pick /= 4;
255 				entry::list_type& pe = peers["values"].list();
256 
257 				int candidates = int(std::count_if(peersv.begin(), peersv.end()
258 					, [=](peer_entry const& e) { return !(noseed && e.seed); }));
259 
260 				to_pick = std::min(to_pick, candidates);
261 
262 				for (auto iter = peersv.begin(); to_pick > 0; ++iter)
263 				{
264 					// if the node asking for peers is a seed, skip seeds from the
265 					// peer list
266 					if (noseed && iter->seed) continue;
267 
268 					TORRENT_ASSERT(candidates >= to_pick);
269 
270 					// pick this peer with probability
271 					// <peers left to pick> / <peers left in the set>
272 					if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick))
273 						continue;
274 
275 					pe.emplace_back();
276 					std::string& str = pe.back().string();
277 
278 					str.resize(18);
279 					std::string::iterator out = str.begin();
280 					detail::write_endpoint(iter->addr, out);
281 					str.resize(std::size_t(out - str.begin()));
282 
283 					--to_pick;
284 				}
285 			}
286 
287 			if (int(peersv.size()) < m_settings.max_peers)
288 				return false;
289 
290 			// we're at the max peers stored for this torrent
291 			// only send a write token if the requester is already in the set
292 			// only check for a match on IP because the peer may be announcing
293 			// a different port than the one it is using to send DHT messages
294 			peer_entry requester_entry;
295 			requester_entry.addr.address(requester);
296 			auto requester_iter = std::lower_bound(peersv.begin(), peersv.end(), requester_entry);
297 			return requester_iter == peersv.end()
298 				|| requester_iter->addr.address() != requester;
299 		}
300 
announce_peer(sha1_hash const & info_hash,tcp::endpoint const & endp,string_view name,bool const seed)301 		void announce_peer(sha1_hash const& info_hash
302 			, tcp::endpoint const& endp
303 			, string_view name, bool const seed) override
304 		{
305 			auto const ti = m_map.find(info_hash);
306 			torrent_entry* v;
307 			if (ti == m_map.end())
308 			{
309 				if (int(m_map.size()) >= m_settings.max_torrents)
310 				{
311 					// we're at capacity, drop the announce
312 					return;
313 				}
314 
315 				m_counters.torrents += 1;
316 				v = &m_map[info_hash];
317 			}
318 			else
319 			{
320 				v = &ti->second;
321 			}
322 
323 			// the peer announces a torrent name, and we don't have a name
324 			// for this torrent. Store it.
325 			if (!name.empty() && v->name.empty())
326 			{
327 				v->name = name.substr(0, 100).to_string();
328 			}
329 
330 			auto& peersv = is_v4(endp) ? v->peers4 : v->peers6;
331 
332 			peer_entry peer;
333 			peer.addr = endp;
334 			peer.added = aux::time_now();
335 			peer.seed = seed;
336 			auto i = std::lower_bound(peersv.begin(), peersv.end(), peer);
337 			if (i != peersv.end() && i->addr == endp)
338 			{
339 				*i = peer;
340 			}
341 			else if (int(peersv.size()) >= m_settings.max_peers)
342 			{
343 				// we're at capacity, drop the announce
344 				return;
345 			}
346 			else
347 			{
348 				peersv.insert(i, peer);
349 				m_counters.peers += 1;
350 			}
351 		}
352 
get_immutable_item(sha1_hash const & target,entry & item) const353 		bool get_immutable_item(sha1_hash const& target
354 			, entry& item) const override
355 		{
356 			auto const i = m_immutable_table.find(target);
357 			if (i == m_immutable_table.end()) return false;
358 
359 			error_code ec;
360 			item["v"] = bdecode({i->second.value.get(), i->second.size}, ec);
361 			return true;
362 		}
363 
put_immutable_item(sha1_hash const & target,span<char const> buf,address const & addr)364 		void put_immutable_item(sha1_hash const& target
365 			, span<char const> buf
366 			, address const& addr) override
367 		{
368 			TORRENT_ASSERT(!m_node_ids.empty());
369 			auto i = m_immutable_table.find(target);
370 			if (i == m_immutable_table.end())
371 			{
372 				// make sure we don't add too many items
373 				if (int(m_immutable_table.size()) >= m_settings.max_dht_items)
374 				{
375 					auto const j = pick_least_important_item(m_node_ids
376 						, m_immutable_table);
377 
378 					TORRENT_ASSERT(j != m_immutable_table.end());
379 					m_immutable_table.erase(j);
380 					m_counters.immutable_data -= 1;
381 				}
382 				dht_immutable_item to_add;
383 				set_value(to_add, buf);
384 
385 				std::tie(i, std::ignore) = m_immutable_table.insert(
386 					std::make_pair(target, std::move(to_add)));
387 				m_counters.immutable_data += 1;
388 			}
389 
390 //			std::fprintf(stderr, "added immutable item (%d)\n", int(m_immutable_table.size()));
391 
392 			touch_item(i->second, addr);
393 		}
394 
get_mutable_item_seq(sha1_hash const & target,sequence_number & seq) const395 		bool get_mutable_item_seq(sha1_hash const& target
396 			, sequence_number& seq) const override
397 		{
398 			auto const i = m_mutable_table.find(target);
399 			if (i == m_mutable_table.end()) return false;
400 
401 			seq = i->second.seq;
402 			return true;
403 		}
404 
get_mutable_item(sha1_hash const & target,sequence_number const seq,bool const force_fill,entry & item) const405 		bool get_mutable_item(sha1_hash const& target
406 			, sequence_number const seq, bool const force_fill
407 			, entry& item) const override
408 		{
409 			auto const i = m_mutable_table.find(target);
410 			if (i == m_mutable_table.end()) return false;
411 
412 			dht_mutable_item const& f = i->second;
413 			item["seq"] = f.seq.value;
414 			if (force_fill || (sequence_number(0) <= seq && seq < f.seq))
415 			{
416 				error_code ec;
417 				item["v"] = bdecode({f.value.get(), f.size}, ec);
418 				item["sig"] = f.sig.bytes;
419 				item["k"] = f.key.bytes;
420 			}
421 			return true;
422 		}
423 
put_mutable_item(sha1_hash const & target,span<char const> buf,signature const & sig,sequence_number const seq,public_key const & pk,span<char const> salt,address const & addr)424 		void put_mutable_item(sha1_hash const& target
425 			, span<char const> buf
426 			, signature const& sig
427 			, sequence_number const seq
428 			, public_key const& pk
429 			, span<char const> salt
430 			, address const& addr) override
431 		{
432 			TORRENT_ASSERT(!m_node_ids.empty());
433 			auto i = m_mutable_table.find(target);
434 			if (i == m_mutable_table.end())
435 			{
436 				// this is the case where we don't have an item in this slot
437 				// make sure we don't add too many items
438 				if (int(m_mutable_table.size()) >= m_settings.max_dht_items)
439 				{
440 					auto const j = pick_least_important_item(m_node_ids
441 						, m_mutable_table);
442 
443 					TORRENT_ASSERT(j != m_mutable_table.end());
444 					m_mutable_table.erase(j);
445 					m_counters.mutable_data -= 1;
446 				}
447 				dht_mutable_item to_add;
448 				set_value(to_add, buf);
449 				to_add.seq = seq;
450 				to_add.salt = {salt.begin(), salt.end()};
451 				to_add.sig = sig;
452 				to_add.key = pk;
453 
454 				std::tie(i, std::ignore) = m_mutable_table.insert(
455 					std::make_pair(target, std::move(to_add)));
456 				m_counters.mutable_data += 1;
457 			}
458 			else
459 			{
460 				// this is the case where we already have an item in this slot
461 				dht_mutable_item& item = i->second;
462 
463 				if (item.seq < seq)
464 				{
465 					set_value(item, buf);
466 					item.seq = seq;
467 					item.sig = sig;
468 				}
469 			}
470 
471 			touch_item(i->second, addr);
472 		}
473 
get_infohashes_sample(entry & item)474 		int get_infohashes_sample(entry& item) override
475 		{
476 			item["interval"] = aux::clamp(m_settings.sample_infohashes_interval
477 				, 0, sample_infohashes_interval_max);
478 			item["num"] = int(m_map.size());
479 
480 			refresh_infohashes_sample();
481 
482 			aux::vector<sha1_hash> const& samples = m_infohashes_sample.samples;
483 			item["samples"] = span<char const>(
484 				reinterpret_cast<char const*>(samples.data()), static_cast<std::ptrdiff_t>(samples.size()) * 20);
485 
486 			return m_infohashes_sample.count();
487 		}
488 
tick()489 		void tick() override
490 		{
491 			// look through all peers and see if any have timed out
492 			for (auto i = m_map.begin(), end(m_map.end()); i != end;)
493 			{
494 				torrent_entry& t = i->second;
495 				purge_peers(t.peers4);
496 				purge_peers(t.peers6);
497 
498 				if (!t.peers4.empty() || !t.peers6.empty())
499 				{
500 					++i;
501 					continue;
502 				}
503 
504 				// if there are no more peers, remove the entry altogether
505 				i = m_map.erase(i);
506 				m_counters.torrents -= 1;// peers is decreased by purge_peers
507 			}
508 
509 			if (0 == m_settings.item_lifetime) return;
510 
511 			time_point const now = aux::time_now();
512 			time_duration lifetime = seconds(m_settings.item_lifetime);
513 			// item lifetime must >= 120 minutes.
514 			if (lifetime < minutes(120)) lifetime = minutes(120);
515 
516 			for (auto i = m_immutable_table.begin(); i != m_immutable_table.end();)
517 			{
518 				if (i->second.last_seen + lifetime > now)
519 				{
520 					++i;
521 					continue;
522 				}
523 				i = m_immutable_table.erase(i);
524 				m_counters.immutable_data -= 1;
525 			}
526 
527 			for (auto i = m_mutable_table.begin(); i != m_mutable_table.end();)
528 			{
529 				if (i->second.last_seen + lifetime > now)
530 				{
531 					++i;
532 					continue;
533 				}
534 				i = m_mutable_table.erase(i);
535 				m_counters.mutable_data -= 1;
536 			}
537 		}
538 
counters() const539 		dht_storage_counters counters() const override
540 		{
541 			return m_counters;
542 		}
543 
544 	private:
545 		dht_settings const& m_settings;
546 		dht_storage_counters m_counters;
547 
548 		std::vector<node_id> m_node_ids;
549 		std::map<node_id, torrent_entry> m_map;
550 		std::map<node_id, dht_immutable_item> m_immutable_table;
551 		std::map<node_id, dht_mutable_item> m_mutable_table;
552 
553 		infohashes_sample m_infohashes_sample;
554 
purge_peers(std::vector<peer_entry> & peers)555 		void purge_peers(std::vector<peer_entry>& peers)
556 		{
557 			auto now = aux::time_now();
558 			auto new_end = std::remove_if(peers.begin(), peers.end()
559 				, [=](peer_entry const& e)
560 			{
561 				return e.added + announce_interval * 3 / 2 < now;
562 			});
563 
564 			m_counters.peers -= std::int32_t(std::distance(new_end, peers.end()));
565 			peers.erase(new_end, peers.end());
566 			// if we're using less than 1/4 of the capacity free up the excess
567 			if (!peers.empty() && peers.capacity() / peers.size() >= 4U)
568 				peers.shrink_to_fit();
569 		}
570 
refresh_infohashes_sample()571 		void refresh_infohashes_sample()
572 		{
573 			time_point const now = aux::time_now();
574 			int const interval = aux::clamp(m_settings.sample_infohashes_interval
575 				, 0, sample_infohashes_interval_max);
576 
577 			int const max_count = aux::clamp(m_settings.max_infohashes_sample_count
578 				, 0, infohashes_sample_count_max);
579 			int const count = std::min(max_count, int(m_map.size()));
580 
581 			if (interval > 0
582 				&& m_infohashes_sample.created + seconds(interval) > now
583 				&& m_infohashes_sample.count() >= max_count)
584 				return;
585 
586 			aux::vector<sha1_hash>& samples = m_infohashes_sample.samples;
587 			samples.clear();
588 			samples.reserve(count);
589 
590 			int to_pick = count;
591 			int candidates = int(m_map.size());
592 
593 			for (auto const& t : m_map)
594 			{
595 				if (to_pick == 0)
596 					break;
597 
598 				TORRENT_ASSERT(candidates >= to_pick);
599 
600 				// pick this key with probability
601 				// <keys left to pick> / <keys left in the set>
602 				if (random(std::uint32_t(candidates--)) > std::uint32_t(to_pick))
603 					continue;
604 
605 				samples.push_back(t.first);
606 				--to_pick;
607 			}
608 
609 			TORRENT_ASSERT(int(samples.size()) == count);
610 			m_infohashes_sample.created = now;
611 		}
612 	};
613 }
614 
reset()615 void dht_storage_counters::reset()
616 {
617 	torrents = 0;
618 	peers = 0;
619 	immutable_data = 0;
620 	mutable_data = 0;
621 }
622 
dht_default_storage_constructor(dht_settings const & settings)623 std::unique_ptr<dht_storage_interface> dht_default_storage_constructor(
624 	dht_settings const& settings)
625 {
626 	return std::unique_ptr<dht_default_storage>(new dht_default_storage(settings));
627 }
628 
629 } } // namespace libtorrent::dht
630