1 /*
2 
3 Copyright (c) 2006-2018, Arvid Norberg
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include "libtorrent/kademlia/dht_tracker.hpp"
34 
35 #include <libtorrent/config.hpp>
36 
37 #include <libtorrent/kademlia/msg.hpp>
38 #include <libtorrent/kademlia/dht_observer.hpp>
39 #include <libtorrent/kademlia/dht_settings.hpp>
40 
41 #include <libtorrent/bencode.hpp>
42 #include <libtorrent/version.hpp>
43 #include <libtorrent/time.hpp>
44 #include <libtorrent/performance_counters.hpp> // for counters
45 #include <libtorrent/aux_/time.hpp>
46 #include <libtorrent/session_status.hpp>
47 #include <libtorrent/broadcast_socket.hpp> // for is_local
48 
49 #ifndef TORRENT_DISABLE_LOGGING
50 #include <libtorrent/hex.hpp> // to_hex
51 #endif
52 
53 using namespace std::placeholders;
54 
55 namespace libtorrent { namespace dht {
56 
57 	namespace {
58 
59 	// generate a new write token key every 5 minutes
60 	auto const key_refresh
61 		= duration_cast<time_duration>(minutes(5));
62 
add_dht_counters(node const & dht,counters & c)63 	void add_dht_counters(node const& dht, counters& c)
64 	{
65 		int nodes, replacements, allocated_observers;
66 		std::tie(nodes, replacements, allocated_observers) = dht.get_stats_counters();
67 
68 		c.inc_stats_counter(counters::dht_nodes, nodes);
69 		c.inc_stats_counter(counters::dht_node_cache, replacements);
70 		c.inc_stats_counter(counters::dht_allocated_observers, allocated_observers);
71 	}
72 
concat(std::vector<udp::endpoint> const & v1,std::vector<udp::endpoint> const & v2)73 	std::vector<udp::endpoint> concat(std::vector<udp::endpoint> const& v1
74 		, std::vector<udp::endpoint> const& v2)
75 	{
76 		std::vector<udp::endpoint> r = v1;
77 		r.insert(r.end(), v2.begin(), v2.end());
78 		return r;
79 	}
80 
81 	} // anonymous namespace
82 
83 	// class that puts the networking and the kademlia node in a single
84 	// unit and connecting them together.
dht_tracker(dht_observer * observer,io_service & ios,send_fun_t const & send_fun,dht::settings const & settings,counters & cnt,dht_storage_interface & storage,dht_state && state)85 	dht_tracker::dht_tracker(dht_observer* observer
86 		, io_service& ios
87 		, send_fun_t const& send_fun
88 		, dht::settings const& settings
89 		, counters& cnt
90 		, dht_storage_interface& storage
91 		, dht_state&& state)
92 		: m_counters(cnt)
93 		, m_storage(storage)
94 		, m_state(std::move(state))
95 		, m_send_fun(send_fun)
96 		, m_log(observer)
97 		, m_key_refresh_timer(ios)
98 		, m_refresh_timer(ios)
99 		, m_settings(settings)
100 		, m_running(false)
101 		, m_host_resolver(ios)
102 		, m_send_quota(settings.upload_rate_limit)
103 		, m_last_tick(aux::time_now())
104 	{
105 		m_blocker.set_block_timer(m_settings.block_timeout);
106 		m_blocker.set_rate_limit(m_settings.block_ratelimit);
107 	}
108 
update_node_id(aux::listen_socket_handle const & s)109 	void dht_tracker::update_node_id(aux::listen_socket_handle const& s)
110 	{
111 		auto n = m_nodes.find(s);
112 		if (n != m_nodes.end())
113 			n->second.dht.update_node_id();
114 		update_storage_node_ids();
115 	}
116 
new_socket(aux::listen_socket_handle const & s)117 	void dht_tracker::new_socket(aux::listen_socket_handle const& s)
118 	{
119 		address const local_address = s.get_local_endpoint().address();
120 		auto stored_nid = std::find_if(m_state.nids.begin(), m_state.nids.end()
121 			, [&](node_ids_t::value_type const& nid) { return nid.first == local_address; });
122 		node_id const nid = stored_nid != m_state.nids.end() ? stored_nid->second : node_id();
123 		// must use piecewise construction because tracker_node::connection_timer
124 		// is neither copyable nor movable
125 		auto n = m_nodes.emplace(std::piecewise_construct_t(), std::forward_as_tuple(s)
126 			, std::forward_as_tuple(get_io_service(m_key_refresh_timer)
127 			, s, this, m_settings, nid, m_log, m_counters
128 			, std::bind(&dht_tracker::get_node, this, _1, _2)
129 			, m_storage));
130 
131 		update_storage_node_ids();
132 
133 #ifndef TORRENT_DISABLE_LOGGING
134 		if (m_log->should_log(dht_logger::tracker))
135 		{
136 			m_log->log(dht_logger::tracker, "starting %s DHT tracker with node id: %s"
137 				, local_address.is_v4() ? "IPv4" : "IPv6"
138 				, aux::to_hex(n.first->second.dht.nid()).c_str());
139 		}
140 #endif
141 
142 		if (m_running && n.second)
143 		{
144 			ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout");
145 			error_code ec;
146 			n.first->second.connection_timer.expires_from_now(seconds(1), ec);
147 			n.first->second.connection_timer.async_wait(
148 				std::bind(&dht_tracker::connection_timeout, self(), n.first->first, _1));
149 			n.first->second.dht.bootstrap({}, find_data::nodes_callback());
150 		}
151 	}
152 
delete_socket(aux::listen_socket_handle const & s)153 	void dht_tracker::delete_socket(aux::listen_socket_handle const& s)
154 	{
155 		m_nodes.erase(s);
156 
157 		update_storage_node_ids();
158 	}
159 
start(find_data::nodes_callback const & f)160 	void dht_tracker::start(find_data::nodes_callback const& f)
161 	{
162 		m_running = true;
163 		error_code ec;
164 
165 		ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_key");
166 		refresh_key(ec);
167 
168 		for (auto& n : m_nodes)
169 		{
170 			ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout");
171 			n.second.connection_timer.expires_from_now(seconds(1), ec);
172 			n.second.connection_timer.async_wait(
173 				std::bind(&dht_tracker::connection_timeout, self(), n.first, _1));
174 			if (is_v6(n.first.get_local_endpoint()))
175 				n.second.dht.bootstrap(concat(m_state.nodes6, m_state.nodes), f);
176 			else
177 				n.second.dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f);
178 		}
179 
180 		ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_timeout");
181 		m_refresh_timer.expires_from_now(seconds(5), ec);
182 		m_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_timeout, self(), _1));
183 
184 		m_state.clear();
185 	}
186 
stop()187 	void dht_tracker::stop()
188 	{
189 		m_running = false;
190 		error_code ec;
191 		m_key_refresh_timer.cancel(ec);
192 		for (auto& n : m_nodes)
193 			n.second.connection_timer.cancel(ec);
194 		m_refresh_timer.cancel(ec);
195 		m_host_resolver.cancel();
196 	}
197 
198 #if TORRENT_ABI_VERSION == 1
dht_status(session_status & s)199 	void dht_tracker::dht_status(session_status& s)
200 	{
201 		s.dht_torrents += int(m_storage.num_torrents());
202 
203 		s.dht_nodes = 0;
204 		s.dht_node_cache = 0;
205 		s.dht_global_nodes = 0;
206 		s.dht_torrents = 0;
207 		s.active_requests.clear();
208 		s.dht_total_allocations = 0;
209 
210 		for (auto& n : m_nodes)
211 			n.second.dht.status(s);
212 	}
213 #endif
214 
dht_status(std::vector<dht_routing_bucket> & table,std::vector<dht_lookup> & requests)215 	void dht_tracker::dht_status(std::vector<dht_routing_bucket>& table
216 		, std::vector<dht_lookup>& requests)
217 	{
218 		for (auto& n : m_nodes)
219 			n.second.dht.status(table, requests);
220 	}
221 
update_stats_counters(counters & c) const222 	void dht_tracker::update_stats_counters(counters& c) const
223 	{
224 		const dht_storage_counters& dht_cnt = m_storage.counters();
225 		c.set_value(counters::dht_torrents, dht_cnt.torrents);
226 		c.set_value(counters::dht_peers, dht_cnt.peers);
227 		c.set_value(counters::dht_immutable_data, dht_cnt.immutable_data);
228 		c.set_value(counters::dht_mutable_data, dht_cnt.mutable_data);
229 
230 		c.set_value(counters::dht_nodes, 0);
231 		c.set_value(counters::dht_node_cache, 0);
232 		c.set_value(counters::dht_allocated_observers, 0);
233 
234 		for (auto& n : m_nodes)
235 			add_dht_counters(n.second.dht, c);
236 	}
237 
connection_timeout(aux::listen_socket_handle const & s,error_code const & e)238 	void dht_tracker::connection_timeout(aux::listen_socket_handle const& s, error_code const& e)
239 	{
240 		COMPLETE_ASYNC("dht_tracker::connection_timeout");
241 		if (e || !m_running) return;
242 
243 		auto const it = m_nodes.find(s);
244 		// this could happen if the task is about to be executed (and not cancellable) and
245 		// the socket is just removed
246 		if (it == m_nodes.end()) return; // node already destroyed
247 
248 		tracker_node& n = it->second;
249 		time_duration const d = n.dht.connection_timeout();
250 		error_code ec;
251 		deadline_timer& timer = n.connection_timer;
252 		timer.expires_from_now(d, ec);
253 		ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout");
254 		timer.async_wait(std::bind(&dht_tracker::connection_timeout, self(), s, _1));
255 	}
256 
refresh_timeout(error_code const & e)257 	void dht_tracker::refresh_timeout(error_code const& e)
258 	{
259 		COMPLETE_ASYNC("dht_tracker::refresh_timeout");
260 		if (e || !m_running) return;
261 
262 		for (auto& n : m_nodes)
263 			n.second.dht.tick();
264 
265 		// periodically update the DOS blocker's settings from the dht_settings
266 		m_blocker.set_block_timer(m_settings.block_timeout);
267 		m_blocker.set_rate_limit(m_settings.block_ratelimit);
268 
269 		error_code ec;
270 		m_refresh_timer.expires_from_now(seconds(5), ec);
271 		ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_timeout");
272 		m_refresh_timer.async_wait(
273 			std::bind(&dht_tracker::refresh_timeout, self(), _1));
274 	}
275 
refresh_key(error_code const & e)276 	void dht_tracker::refresh_key(error_code const& e)
277 	{
278 		COMPLETE_ASYNC("dht_tracker::refresh_key");
279 		if (e || !m_running) return;
280 
281 		ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_key");
282 		error_code ec;
283 		m_key_refresh_timer.expires_from_now(key_refresh, ec);
284 		m_key_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_key, self(), _1));
285 
286 		for (auto& n : m_nodes)
287 			n.second.dht.new_write_key();
288 
289 #ifndef TORRENT_DISABLE_LOGGING
290 		m_log->log(dht_logger::tracker, "*** new write key***");
291 #endif
292 	}
293 
update_storage_node_ids()294 	void dht_tracker::update_storage_node_ids()
295 	{
296 		std::vector<sha1_hash> ids;
297 		for (auto& n : m_nodes)
298 			ids.push_back(n.second.dht.nid());
299 		m_storage.update_node_ids(ids);
300 	}
301 
get_node(node_id const & id,std::string const & family_name)302 	node* dht_tracker::get_node(node_id const& id, std::string const& family_name)
303 	{
304 		TORRENT_UNUSED(id);
305 		for (auto& n : m_nodes)
306 		{
307 			// TODO: pick the closest node rather than the first
308 			if (n.second.dht.protocol_family_name() == family_name)
309 				return &n.second.dht;
310 		}
311 
312 		return nullptr;
313 	}
314 
get_peers(sha1_hash const & ih,std::function<void (std::vector<tcp::endpoint> const &)> f)315 	void dht_tracker::get_peers(sha1_hash const& ih
316 		, std::function<void(std::vector<tcp::endpoint> const&)> f)
317 	{
318 		for (auto& n : m_nodes)
319 			n.second.dht.get_peers(ih, f, {}, {});
320 	}
321 
announce(sha1_hash const & ih,int listen_port,announce_flags_t const flags,std::function<void (std::vector<tcp::endpoint> const &)> f)322 	void dht_tracker::announce(sha1_hash const& ih, int listen_port
323 		, announce_flags_t const flags
324 		, std::function<void(std::vector<tcp::endpoint> const&)> f)
325 	{
326 		for (auto& n : m_nodes)
327 			n.second.dht.announce(ih, listen_port, flags, f);
328 	}
329 
sample_infohashes(udp::endpoint const & ep,sha1_hash const & target,std::function<void (time_duration,int,std::vector<sha1_hash>,std::vector<std::pair<sha1_hash,udp::endpoint>>)> f)330 	void dht_tracker::sample_infohashes(udp::endpoint const& ep, sha1_hash const& target
331 		, std::function<void(time_duration
332 			, int, std::vector<sha1_hash>
333 			, std::vector<std::pair<sha1_hash, udp::endpoint>>)> f)
334 	{
335 		for (auto& n : m_nodes)
336 		{
337 			if (ep.protocol() != (n.first.get_external_address().is_v4() ? udp::v4() : udp::v6()))
338 				continue;
339 			n.second.dht.sample_infohashes(ep, target, f);
340 			break;
341 		}
342 	}
343 
344 	namespace {
345 
346 	struct get_immutable_item_ctx
347 	{
get_immutable_item_ctxlibtorrent::dht::__anon40b16b180311::get_immutable_item_ctx348 		explicit get_immutable_item_ctx(int traversals)
349 			: active_traversals(traversals)
350 			, item_posted(false)
351 		{}
352 		int active_traversals;
353 		bool item_posted;
354 	};
355 
356 	// these functions provide a slightly higher level
357 	// interface to the get/put functionality in the DHT
get_immutable_item_callback(item const & it,std::shared_ptr<get_immutable_item_ctx> ctx,std::function<void (item const &)> f)358 	void get_immutable_item_callback(item const& it
359 		, std::shared_ptr<get_immutable_item_ctx> ctx
360 		, std::function<void(item const&)> f)
361 	{
362 		// the reason to wrap here is to control the return value
363 		// since it controls whether we re-put the content
364 		TORRENT_ASSERT(!it.is_mutable());
365 		--ctx->active_traversals;
366 		if (!ctx->item_posted && (!it.empty() || ctx->active_traversals == 0))
367 		{
368 			ctx->item_posted = true;
369 			f(it);
370 		}
371 	}
372 
373 	struct get_mutable_item_ctx
374 	{
get_mutable_item_ctxlibtorrent::dht::__anon40b16b180311::get_mutable_item_ctx375 		explicit get_mutable_item_ctx(int traversals) : active_traversals(traversals) {}
376 		int active_traversals;
377 		item it;
378 	};
379 
get_mutable_item_callback(item const & it,bool authoritative,std::shared_ptr<get_mutable_item_ctx> ctx,std::function<void (item const &,bool)> f)380 	void get_mutable_item_callback(item const& it, bool authoritative
381 		, std::shared_ptr<get_mutable_item_ctx> ctx
382 		, std::function<void(item const&, bool)> f)
383 	{
384 		TORRENT_ASSERT(it.is_mutable());
385 		if (authoritative) --ctx->active_traversals;
386 		authoritative = authoritative && ctx->active_traversals == 0;
387 		if ((ctx->it.empty() && !it.empty()) || (ctx->it.seq() < it.seq()))
388 		{
389 			ctx->it = it;
390 			f(it, authoritative);
391 		}
392 		else if (authoritative)
393 			f(it, authoritative);
394 	}
395 
396 	struct put_item_ctx
397 	{
put_item_ctxlibtorrent::dht::__anon40b16b180311::put_item_ctx398 		explicit put_item_ctx(int traversals)
399 			: active_traversals(traversals)
400 			, response_count(0)
401 		{}
402 
403 		int active_traversals;
404 		int response_count;
405 	};
406 
put_immutable_item_callback(int responses,std::shared_ptr<put_item_ctx> ctx,std::function<void (int)> f)407 	void put_immutable_item_callback(int responses, std::shared_ptr<put_item_ctx> ctx
408 		, std::function<void(int)> f)
409 	{
410 		ctx->response_count += responses;
411 		if (--ctx->active_traversals == 0)
412 			f(ctx->response_count);
413 	}
414 
put_mutable_item_callback(item const & it,int responses,std::shared_ptr<put_item_ctx> ctx,std::function<void (item const &,int)> cb)415 	void put_mutable_item_callback(item const& it, int responses, std::shared_ptr<put_item_ctx> ctx
416 		, std::function<void(item const&, int)> cb)
417 	{
418 		ctx->response_count += responses;
419 		if (--ctx->active_traversals == 0)
420 			cb(it, ctx->response_count);
421 	}
422 
423 	} // anonymous namespace
424 
get_item(sha1_hash const & target,std::function<void (item const &)> cb)425 	void dht_tracker::get_item(sha1_hash const& target
426 		, std::function<void(item const&)> cb)
427 	{
428 		auto ctx = std::make_shared<get_immutable_item_ctx>(int(m_nodes.size()));
429 		for (auto& n : m_nodes)
430 			n.second.dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb));
431 	}
432 
433 	// key is a 32-byte binary string, the public key to look up.
434 	// the salt is optional
get_item(public_key const & key,std::function<void (item const &,bool)> cb,std::string salt)435 	void dht_tracker::get_item(public_key const& key
436 		, std::function<void(item const&, bool)> cb
437 		, std::string salt)
438 	{
439 		auto ctx = std::make_shared<get_mutable_item_ctx>(int(m_nodes.size()));
440 		for (auto& n : m_nodes)
441 			n.second.dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb));
442 	}
443 
put_item(entry const & data,std::function<void (int)> cb)444 	void dht_tracker::put_item(entry const& data
445 		, std::function<void(int)> cb)
446 	{
447 		std::string flat_data;
448 		bencode(std::back_inserter(flat_data), data);
449 		sha1_hash const target = item_target_id(flat_data);
450 
451 		auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size()));
452 		for (auto& n : m_nodes)
453 			n.second.dht.put_item(target, data, std::bind(&put_immutable_item_callback
454 			, _1, ctx, cb));
455 	}
456 
put_item(public_key const & key,std::function<void (item const &,int)> cb,std::function<void (item &)> data_cb,std::string salt)457 	void dht_tracker::put_item(public_key const& key
458 		, std::function<void(item const&, int)> cb
459 		, std::function<void(item&)> data_cb, std::string salt)
460 	{
461 		auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size()));
462 		for (auto& n : m_nodes)
463 			n.second.dht.put_item(key, salt, std::bind(&put_mutable_item_callback
464 				, _1, _2, ctx, cb), data_cb);
465 	}
466 
direct_request(udp::endpoint const & ep,entry & e,std::function<void (msg const &)> f)467 	void dht_tracker::direct_request(udp::endpoint const& ep, entry& e
468 		, std::function<void(msg const&)> f)
469 	{
470 		for (auto& n : m_nodes)
471 		{
472 			if (ep.protocol() != (n.first.get_external_address().is_v4() ? udp::v4() : udp::v6()))
473 				continue;
474 			n.second.dht.direct_request(ep, e, f);
475 			break;
476 		}
477 	}
478 
incoming_error(error_code const & ec,udp::endpoint const & ep)479 	void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep)
480 	{
481 		if (ec == boost::asio::error::connection_refused
482 			|| ec == boost::asio::error::connection_reset
483 			|| ec == boost::asio::error::connection_aborted
484 #ifdef _WIN32
485 			|| ec == error_code(ERROR_HOST_UNREACHABLE, system_category())
486 			|| ec == error_code(ERROR_PORT_UNREACHABLE, system_category())
487 			|| ec == error_code(ERROR_CONNECTION_REFUSED, system_category())
488 			|| ec == error_code(ERROR_CONNECTION_ABORTED, system_category())
489 #endif
490 			)
491 		{
492 			for (auto& n : m_nodes)
493 				n.second.dht.unreachable(ep);
494 		}
495 	}
496 
incoming_packet(aux::listen_socket_handle const & s,udp::endpoint const & ep,span<char const> const buf)497 	bool dht_tracker::incoming_packet(aux::listen_socket_handle const& s
498 		, udp::endpoint const& ep, span<char const> const buf)
499 	{
500 		int const buf_size = int(buf.size());
501 		if (buf_size <= 20
502 			|| buf.front() != 'd'
503 			|| buf.back() != 'e') return false;
504 
505 		m_counters.inc_stats_counter(counters::dht_bytes_in, buf_size);
506 		// account for IP and UDP overhead
507 		m_counters.inc_stats_counter(counters::recv_ip_overhead_bytes
508 			, is_v6(ep) ? 48 : 28);
509 		m_counters.inc_stats_counter(counters::dht_messages_in);
510 
511 		if (m_settings.ignore_dark_internet && is_v4(ep))
512 		{
513 			address_v4::bytes_type b = ep.address().to_v4().to_bytes();
514 
515 			// these are class A networks not available to the public
516 			// if we receive messages from here, that seems suspicious
517 			static std::uint8_t const class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25
518 				, 26, 28, 29, 30, 33, 34, 48, 56 };
519 
520 			if (std::find(std::begin(class_a), std::end(class_a), b[0]) != std::end(class_a))
521 			{
522 				m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
523 				return true;
524 			}
525 		}
526 
527 		if (!m_blocker.incoming(ep.address(), clock_type::now(), m_log))
528 		{
529 			m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
530 			return true;
531 		}
532 
533 		TORRENT_ASSERT(buf_size > 0);
534 
535 		int pos;
536 		error_code err;
537 		int const ret = bdecode(buf.data(), buf.data() + buf_size, m_msg, err, &pos, 10, 500);
538 		if (ret != 0)
539 		{
540 			m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
541 #ifndef TORRENT_DISABLE_LOGGING
542 			m_log->log_packet(dht_logger::incoming_message, buf, ep);
543 #endif
544 			return false;
545 		}
546 
547 		if (m_msg.type() != bdecode_node::dict_t)
548 		{
549 			m_counters.inc_stats_counter(counters::dht_messages_in_dropped);
550 #ifndef TORRENT_DISABLE_LOGGING
551 			m_log->log_packet(dht_logger::incoming_message, buf, ep);
552 #endif
553 			// it's not a good idea to send a response to an invalid messages
554 			return false;
555 		}
556 
557 #ifndef TORRENT_DISABLE_LOGGING
558 		m_log->log_packet(dht_logger::incoming_message, buf, ep);
559 #endif
560 
561 		libtorrent::dht::msg const m(m_msg, ep);
562 		for (auto& n : m_nodes)
563 			n.second.dht.incoming(s, m);
564 		return true;
565 	}
566 
tracker_node(io_service & ios,aux::listen_socket_handle const & s,socket_manager * sock,dht::settings const & settings,node_id const & nid,dht_observer * observer,counters & cnt,get_foreign_node_t get_foreign_node,dht_storage_interface & storage)567 	dht_tracker::tracker_node::tracker_node(io_service& ios
568 		, aux::listen_socket_handle const& s, socket_manager* sock
569 		, dht::settings const& settings
570 		, node_id const& nid
571 		, dht_observer* observer, counters& cnt
572 		, get_foreign_node_t get_foreign_node
573 		, dht_storage_interface& storage)
574 		: dht(s, sock, settings, nid, observer, cnt, std::move(get_foreign_node), storage)
575 		, connection_timer(ios)
576 	{}
577 
live_nodes(node_id const & nid)578 	std::vector<std::pair<node_id, udp::endpoint>> dht_tracker::live_nodes(node_id const& nid)
579 	{
580 		std::vector<std::pair<node_id, udp::endpoint>> ret;
581 
582 		auto n = std::find_if(m_nodes.begin(), m_nodes.end()
583 			, [&](tracker_nodes_t::value_type const& v) { return v.second.dht.nid() == nid; });
584 
585 		if (n != m_nodes.end())
586 		{
587 			n->second.dht.m_table.for_each_node([&ret](node_entry const& e)
588 				{ ret.emplace_back(e.id, e.endpoint); }, nullptr);
589 		}
590 
591 		return ret;
592 	}
593 
594 namespace {
595 
save_nodes(node const & dht)596 	std::vector<udp::endpoint> save_nodes(node const& dht)
597 	{
598 		std::vector<udp::endpoint> ret;
599 
600 		dht.m_table.for_each_node([&ret](node_entry const& e)
601 		{ ret.push_back(e.ep()); });
602 
603 		return ret;
604 	}
605 
606 } // anonymous namespace
607 
state() const608 	dht_state dht_tracker::state() const
609 	{
610 		dht_state ret;
611 		for (auto& n : m_nodes)
612 		{
613 			// use the local rather than external address because if the user is behind NAT
614 			// we won't know the external IP on startup
615 			ret.nids.emplace_back(n.first.get_local_endpoint().address(), n.second.dht.nid());
616 			auto nodes = save_nodes(n.second.dht);
617 			ret.nodes.insert(ret.nodes.end(), nodes.begin(), nodes.end());
618 		}
619 		return ret;
620 	}
621 
add_node(udp::endpoint const & node)622 	void dht_tracker::add_node(udp::endpoint const& node)
623 	{
624 		for (auto& n : m_nodes)
625 			n.second.dht.add_node(node);
626 	}
627 
add_router_node(udp::endpoint const & node)628 	void dht_tracker::add_router_node(udp::endpoint const& node)
629 	{
630 		for (auto& n : m_nodes)
631 			n.second.dht.add_router_node(node);
632 	}
633 
has_quota()634 	bool dht_tracker::has_quota()
635 	{
636 		time_point const now = clock_type::now();
637 		time_duration const delta = now - m_last_tick;
638 		m_last_tick = now;
639 
640 		std::int64_t const limit = m_settings.upload_rate_limit;
641 
642 		// allow 3 seconds worth of burst
643 		std::int64_t const max_accrue = std::min(3 * limit, std::int64_t(std::numeric_limits<int>::max()));
644 
645 		if (delta >= seconds(3)
646 			|| delta >= microseconds(std::numeric_limits<int>::max() / limit))
647 		{
648 			m_send_quota = aux::numeric_cast<int>(max_accrue);
649 			return true;
650 		}
651 
652 		int const add = aux::numeric_cast<int>(limit * total_microseconds(delta) / 1000000);
653 
654 		if (max_accrue - m_send_quota < add)
655 		{
656 			m_send_quota = aux::numeric_cast<int>(max_accrue);
657 			return true;
658 		}
659 		else
660 		{
661 			// add any new quota we've accrued since last time
662 			m_send_quota += add;
663 		}
664 		TORRENT_ASSERT(m_send_quota <= max_accrue);
665 		return m_send_quota > 0;
666 	}
667 
send_packet(aux::listen_socket_handle const & s,entry & e,udp::endpoint const & addr)668 	bool dht_tracker::send_packet(aux::listen_socket_handle const& s, entry& e, udp::endpoint const& addr)
669 	{
670 		TORRENT_ASSERT(m_nodes.find(s) != m_nodes.end());
671 
672 		static_assert(LIBTORRENT_VERSION_MINOR < 16, "version number not supported by DHT");
673 		static_assert(LIBTORRENT_VERSION_TINY < 16, "version number not supported by DHT");
674 		static char const version_str[] = {'L', 'T'
675 			, LIBTORRENT_VERSION_MAJOR, (LIBTORRENT_VERSION_MINOR << 4) | LIBTORRENT_VERSION_TINY};
676 		e["v"] = std::string(version_str, version_str + 4);
677 
678 		m_send_buf.clear();
679 		bencode(std::back_inserter(m_send_buf), e);
680 
681 		// update the quota. We won't prevent the packet to be sent if we exceed
682 		// the quota, we'll just (potentially) block the next incoming request.
683 
684 		m_send_quota -= int(m_send_buf.size());
685 
686 		error_code ec;
687 		if (s.get_local_endpoint().protocol().family() != addr.protocol().family())
688 		{
689 			// the node is trying to send a packet to a different address family
690 			// than its socket, this can happen during bootstrap
691 			// pick a node with the right address family and use its socket
692 			auto n = std::find_if(m_nodes.begin(), m_nodes.end()
693 				, [&](tracker_nodes_t::value_type const& v)
694 					{ return v.first.get_local_endpoint().protocol().family() == addr.protocol().family(); });
695 
696 			if (n != m_nodes.end())
697 				m_send_fun(n->first, addr, m_send_buf, ec, {});
698 			else
699 				ec = boost::asio::error::address_family_not_supported;
700 		}
701 		else
702 		{
703 			m_send_fun(s, addr, m_send_buf, ec, {});
704 		}
705 
706 		if (ec)
707 		{
708 			m_counters.inc_stats_counter(counters::dht_messages_out_dropped);
709 #ifndef TORRENT_DISABLE_LOGGING
710 			m_log->log_packet(dht_logger::outgoing_message, m_send_buf, addr);
711 #endif
712 			return false;
713 		}
714 
715 		m_counters.inc_stats_counter(counters::dht_bytes_out, int(m_send_buf.size()));
716 		// account for IP and UDP overhead
717 		m_counters.inc_stats_counter(counters::sent_ip_overhead_bytes
718 			, is_v6(addr) ? 48 : 28);
719 		m_counters.inc_stats_counter(counters::dht_messages_out);
720 #ifndef TORRENT_DISABLE_LOGGING
721 		m_log->log_packet(dht_logger::outgoing_message, m_send_buf, addr);
722 #endif
723 		return true;
724 	}
725 
726 }}
727