1 /* 2 3 Copyright (c) 2006-2018, Arvid Norberg 4 All rights reserved. 5 6 Redistribution and use in source and binary forms, with or without 7 modification, are permitted provided that the following conditions 8 are met: 9 10 * Redistributions of source code must retain the above copyright 11 notice, this list of conditions and the following disclaimer. 12 * Redistributions in binary form must reproduce the above copyright 13 notice, this list of conditions and the following disclaimer in 14 the documentation and/or other materials provided with the distribution. 15 * Neither the name of the author nor the names of its 16 contributors may be used to endorse or promote products derived 17 from this software without specific prior written permission. 18 19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 POSSIBILITY OF SUCH DAMAGE. 30 31 */ 32 33 #include "libtorrent/kademlia/dht_tracker.hpp" 34 35 #include <libtorrent/config.hpp> 36 37 #include <libtorrent/kademlia/msg.hpp> 38 #include <libtorrent/kademlia/dht_observer.hpp> 39 #include <libtorrent/kademlia/dht_settings.hpp> 40 41 #include <libtorrent/bencode.hpp> 42 #include <libtorrent/version.hpp> 43 #include <libtorrent/time.hpp> 44 #include <libtorrent/performance_counters.hpp> // for counters 45 #include <libtorrent/aux_/time.hpp> 46 #include <libtorrent/session_status.hpp> 47 #include <libtorrent/broadcast_socket.hpp> // for is_local 48 49 #ifndef TORRENT_DISABLE_LOGGING 50 #include <libtorrent/hex.hpp> // to_hex 51 #endif 52 53 using namespace std::placeholders; 54 55 namespace libtorrent { namespace dht { 56 57 namespace { 58 59 // generate a new write token key every 5 minutes 60 auto const key_refresh 61 = duration_cast<time_duration>(minutes(5)); 62 add_dht_counters(node const & dht,counters & c)63 void add_dht_counters(node const& dht, counters& c) 64 { 65 int nodes, replacements, allocated_observers; 66 std::tie(nodes, replacements, allocated_observers) = dht.get_stats_counters(); 67 68 c.inc_stats_counter(counters::dht_nodes, nodes); 69 c.inc_stats_counter(counters::dht_node_cache, replacements); 70 c.inc_stats_counter(counters::dht_allocated_observers, allocated_observers); 71 } 72 concat(std::vector<udp::endpoint> const & v1,std::vector<udp::endpoint> const & v2)73 std::vector<udp::endpoint> concat(std::vector<udp::endpoint> const& v1 74 , std::vector<udp::endpoint> const& v2) 75 { 76 std::vector<udp::endpoint> r = v1; 77 r.insert(r.end(), v2.begin(), v2.end()); 78 return r; 79 } 80 81 } // anonymous namespace 82 83 // class that puts the networking and the kademlia node in a single 84 // unit and connecting them together. dht_tracker(dht_observer * observer,io_service & ios,send_fun_t const & send_fun,dht::settings const & settings,counters & cnt,dht_storage_interface & storage,dht_state && state)85 dht_tracker::dht_tracker(dht_observer* observer 86 , io_service& ios 87 , send_fun_t const& send_fun 88 , dht::settings const& settings 89 , counters& cnt 90 , dht_storage_interface& storage 91 , dht_state&& state) 92 : m_counters(cnt) 93 , m_storage(storage) 94 , m_state(std::move(state)) 95 , m_send_fun(send_fun) 96 , m_log(observer) 97 , m_key_refresh_timer(ios) 98 , m_refresh_timer(ios) 99 , m_settings(settings) 100 , m_running(false) 101 , m_host_resolver(ios) 102 , m_send_quota(settings.upload_rate_limit) 103 , m_last_tick(aux::time_now()) 104 { 105 m_blocker.set_block_timer(m_settings.block_timeout); 106 m_blocker.set_rate_limit(m_settings.block_ratelimit); 107 } 108 update_node_id(aux::listen_socket_handle const & s)109 void dht_tracker::update_node_id(aux::listen_socket_handle const& s) 110 { 111 auto n = m_nodes.find(s); 112 if (n != m_nodes.end()) 113 n->second.dht.update_node_id(); 114 update_storage_node_ids(); 115 } 116 new_socket(aux::listen_socket_handle const & s)117 void dht_tracker::new_socket(aux::listen_socket_handle const& s) 118 { 119 address const local_address = s.get_local_endpoint().address(); 120 auto stored_nid = std::find_if(m_state.nids.begin(), m_state.nids.end() 121 , [&](node_ids_t::value_type const& nid) { return nid.first == local_address; }); 122 node_id const nid = stored_nid != m_state.nids.end() ? stored_nid->second : node_id(); 123 // must use piecewise construction because tracker_node::connection_timer 124 // is neither copyable nor movable 125 auto n = m_nodes.emplace(std::piecewise_construct_t(), std::forward_as_tuple(s) 126 , std::forward_as_tuple(get_io_service(m_key_refresh_timer) 127 , s, this, m_settings, nid, m_log, m_counters 128 , std::bind(&dht_tracker::get_node, this, _1, _2) 129 , m_storage)); 130 131 update_storage_node_ids(); 132 133 #ifndef TORRENT_DISABLE_LOGGING 134 if (m_log->should_log(dht_logger::tracker)) 135 { 136 m_log->log(dht_logger::tracker, "starting %s DHT tracker with node id: %s" 137 , local_address.is_v4() ? "IPv4" : "IPv6" 138 , aux::to_hex(n.first->second.dht.nid()).c_str()); 139 } 140 #endif 141 142 if (m_running && n.second) 143 { 144 ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout"); 145 error_code ec; 146 n.first->second.connection_timer.expires_from_now(seconds(1), ec); 147 n.first->second.connection_timer.async_wait( 148 std::bind(&dht_tracker::connection_timeout, self(), n.first->first, _1)); 149 n.first->second.dht.bootstrap({}, find_data::nodes_callback()); 150 } 151 } 152 delete_socket(aux::listen_socket_handle const & s)153 void dht_tracker::delete_socket(aux::listen_socket_handle const& s) 154 { 155 m_nodes.erase(s); 156 157 update_storage_node_ids(); 158 } 159 start(find_data::nodes_callback const & f)160 void dht_tracker::start(find_data::nodes_callback const& f) 161 { 162 m_running = true; 163 error_code ec; 164 165 ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_key"); 166 refresh_key(ec); 167 168 for (auto& n : m_nodes) 169 { 170 ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout"); 171 n.second.connection_timer.expires_from_now(seconds(1), ec); 172 n.second.connection_timer.async_wait( 173 std::bind(&dht_tracker::connection_timeout, self(), n.first, _1)); 174 if (is_v6(n.first.get_local_endpoint())) 175 n.second.dht.bootstrap(concat(m_state.nodes6, m_state.nodes), f); 176 else 177 n.second.dht.bootstrap(concat(m_state.nodes, m_state.nodes6), f); 178 } 179 180 ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_timeout"); 181 m_refresh_timer.expires_from_now(seconds(5), ec); 182 m_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_timeout, self(), _1)); 183 184 m_state.clear(); 185 } 186 stop()187 void dht_tracker::stop() 188 { 189 m_running = false; 190 error_code ec; 191 m_key_refresh_timer.cancel(ec); 192 for (auto& n : m_nodes) 193 n.second.connection_timer.cancel(ec); 194 m_refresh_timer.cancel(ec); 195 m_host_resolver.cancel(); 196 } 197 198 #if TORRENT_ABI_VERSION == 1 dht_status(session_status & s)199 void dht_tracker::dht_status(session_status& s) 200 { 201 s.dht_torrents += int(m_storage.num_torrents()); 202 203 s.dht_nodes = 0; 204 s.dht_node_cache = 0; 205 s.dht_global_nodes = 0; 206 s.dht_torrents = 0; 207 s.active_requests.clear(); 208 s.dht_total_allocations = 0; 209 210 for (auto& n : m_nodes) 211 n.second.dht.status(s); 212 } 213 #endif 214 dht_status(std::vector<dht_routing_bucket> & table,std::vector<dht_lookup> & requests)215 void dht_tracker::dht_status(std::vector<dht_routing_bucket>& table 216 , std::vector<dht_lookup>& requests) 217 { 218 for (auto& n : m_nodes) 219 n.second.dht.status(table, requests); 220 } 221 update_stats_counters(counters & c) const222 void dht_tracker::update_stats_counters(counters& c) const 223 { 224 const dht_storage_counters& dht_cnt = m_storage.counters(); 225 c.set_value(counters::dht_torrents, dht_cnt.torrents); 226 c.set_value(counters::dht_peers, dht_cnt.peers); 227 c.set_value(counters::dht_immutable_data, dht_cnt.immutable_data); 228 c.set_value(counters::dht_mutable_data, dht_cnt.mutable_data); 229 230 c.set_value(counters::dht_nodes, 0); 231 c.set_value(counters::dht_node_cache, 0); 232 c.set_value(counters::dht_allocated_observers, 0); 233 234 for (auto& n : m_nodes) 235 add_dht_counters(n.second.dht, c); 236 } 237 connection_timeout(aux::listen_socket_handle const & s,error_code const & e)238 void dht_tracker::connection_timeout(aux::listen_socket_handle const& s, error_code const& e) 239 { 240 COMPLETE_ASYNC("dht_tracker::connection_timeout"); 241 if (e || !m_running) return; 242 243 auto const it = m_nodes.find(s); 244 // this could happen if the task is about to be executed (and not cancellable) and 245 // the socket is just removed 246 if (it == m_nodes.end()) return; // node already destroyed 247 248 tracker_node& n = it->second; 249 time_duration const d = n.dht.connection_timeout(); 250 error_code ec; 251 deadline_timer& timer = n.connection_timer; 252 timer.expires_from_now(d, ec); 253 ADD_OUTSTANDING_ASYNC("dht_tracker::connection_timeout"); 254 timer.async_wait(std::bind(&dht_tracker::connection_timeout, self(), s, _1)); 255 } 256 refresh_timeout(error_code const & e)257 void dht_tracker::refresh_timeout(error_code const& e) 258 { 259 COMPLETE_ASYNC("dht_tracker::refresh_timeout"); 260 if (e || !m_running) return; 261 262 for (auto& n : m_nodes) 263 n.second.dht.tick(); 264 265 // periodically update the DOS blocker's settings from the dht_settings 266 m_blocker.set_block_timer(m_settings.block_timeout); 267 m_blocker.set_rate_limit(m_settings.block_ratelimit); 268 269 error_code ec; 270 m_refresh_timer.expires_from_now(seconds(5), ec); 271 ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_timeout"); 272 m_refresh_timer.async_wait( 273 std::bind(&dht_tracker::refresh_timeout, self(), _1)); 274 } 275 refresh_key(error_code const & e)276 void dht_tracker::refresh_key(error_code const& e) 277 { 278 COMPLETE_ASYNC("dht_tracker::refresh_key"); 279 if (e || !m_running) return; 280 281 ADD_OUTSTANDING_ASYNC("dht_tracker::refresh_key"); 282 error_code ec; 283 m_key_refresh_timer.expires_from_now(key_refresh, ec); 284 m_key_refresh_timer.async_wait(std::bind(&dht_tracker::refresh_key, self(), _1)); 285 286 for (auto& n : m_nodes) 287 n.second.dht.new_write_key(); 288 289 #ifndef TORRENT_DISABLE_LOGGING 290 m_log->log(dht_logger::tracker, "*** new write key***"); 291 #endif 292 } 293 update_storage_node_ids()294 void dht_tracker::update_storage_node_ids() 295 { 296 std::vector<sha1_hash> ids; 297 for (auto& n : m_nodes) 298 ids.push_back(n.second.dht.nid()); 299 m_storage.update_node_ids(ids); 300 } 301 get_node(node_id const & id,std::string const & family_name)302 node* dht_tracker::get_node(node_id const& id, std::string const& family_name) 303 { 304 TORRENT_UNUSED(id); 305 for (auto& n : m_nodes) 306 { 307 // TODO: pick the closest node rather than the first 308 if (n.second.dht.protocol_family_name() == family_name) 309 return &n.second.dht; 310 } 311 312 return nullptr; 313 } 314 get_peers(sha1_hash const & ih,std::function<void (std::vector<tcp::endpoint> const &)> f)315 void dht_tracker::get_peers(sha1_hash const& ih 316 , std::function<void(std::vector<tcp::endpoint> const&)> f) 317 { 318 for (auto& n : m_nodes) 319 n.second.dht.get_peers(ih, f, {}, {}); 320 } 321 announce(sha1_hash const & ih,int listen_port,announce_flags_t const flags,std::function<void (std::vector<tcp::endpoint> const &)> f)322 void dht_tracker::announce(sha1_hash const& ih, int listen_port 323 , announce_flags_t const flags 324 , std::function<void(std::vector<tcp::endpoint> const&)> f) 325 { 326 for (auto& n : m_nodes) 327 n.second.dht.announce(ih, listen_port, flags, f); 328 } 329 sample_infohashes(udp::endpoint const & ep,sha1_hash const & target,std::function<void (time_duration,int,std::vector<sha1_hash>,std::vector<std::pair<sha1_hash,udp::endpoint>>)> f)330 void dht_tracker::sample_infohashes(udp::endpoint const& ep, sha1_hash const& target 331 , std::function<void(time_duration 332 , int, std::vector<sha1_hash> 333 , std::vector<std::pair<sha1_hash, udp::endpoint>>)> f) 334 { 335 for (auto& n : m_nodes) 336 { 337 if (ep.protocol() != (n.first.get_external_address().is_v4() ? udp::v4() : udp::v6())) 338 continue; 339 n.second.dht.sample_infohashes(ep, target, f); 340 break; 341 } 342 } 343 344 namespace { 345 346 struct get_immutable_item_ctx 347 { get_immutable_item_ctxlibtorrent::dht::__anon40b16b180311::get_immutable_item_ctx348 explicit get_immutable_item_ctx(int traversals) 349 : active_traversals(traversals) 350 , item_posted(false) 351 {} 352 int active_traversals; 353 bool item_posted; 354 }; 355 356 // these functions provide a slightly higher level 357 // interface to the get/put functionality in the DHT get_immutable_item_callback(item const & it,std::shared_ptr<get_immutable_item_ctx> ctx,std::function<void (item const &)> f)358 void get_immutable_item_callback(item const& it 359 , std::shared_ptr<get_immutable_item_ctx> ctx 360 , std::function<void(item const&)> f) 361 { 362 // the reason to wrap here is to control the return value 363 // since it controls whether we re-put the content 364 TORRENT_ASSERT(!it.is_mutable()); 365 --ctx->active_traversals; 366 if (!ctx->item_posted && (!it.empty() || ctx->active_traversals == 0)) 367 { 368 ctx->item_posted = true; 369 f(it); 370 } 371 } 372 373 struct get_mutable_item_ctx 374 { get_mutable_item_ctxlibtorrent::dht::__anon40b16b180311::get_mutable_item_ctx375 explicit get_mutable_item_ctx(int traversals) : active_traversals(traversals) {} 376 int active_traversals; 377 item it; 378 }; 379 get_mutable_item_callback(item const & it,bool authoritative,std::shared_ptr<get_mutable_item_ctx> ctx,std::function<void (item const &,bool)> f)380 void get_mutable_item_callback(item const& it, bool authoritative 381 , std::shared_ptr<get_mutable_item_ctx> ctx 382 , std::function<void(item const&, bool)> f) 383 { 384 TORRENT_ASSERT(it.is_mutable()); 385 if (authoritative) --ctx->active_traversals; 386 authoritative = authoritative && ctx->active_traversals == 0; 387 if ((ctx->it.empty() && !it.empty()) || (ctx->it.seq() < it.seq())) 388 { 389 ctx->it = it; 390 f(it, authoritative); 391 } 392 else if (authoritative) 393 f(it, authoritative); 394 } 395 396 struct put_item_ctx 397 { put_item_ctxlibtorrent::dht::__anon40b16b180311::put_item_ctx398 explicit put_item_ctx(int traversals) 399 : active_traversals(traversals) 400 , response_count(0) 401 {} 402 403 int active_traversals; 404 int response_count; 405 }; 406 put_immutable_item_callback(int responses,std::shared_ptr<put_item_ctx> ctx,std::function<void (int)> f)407 void put_immutable_item_callback(int responses, std::shared_ptr<put_item_ctx> ctx 408 , std::function<void(int)> f) 409 { 410 ctx->response_count += responses; 411 if (--ctx->active_traversals == 0) 412 f(ctx->response_count); 413 } 414 put_mutable_item_callback(item const & it,int responses,std::shared_ptr<put_item_ctx> ctx,std::function<void (item const &,int)> cb)415 void put_mutable_item_callback(item const& it, int responses, std::shared_ptr<put_item_ctx> ctx 416 , std::function<void(item const&, int)> cb) 417 { 418 ctx->response_count += responses; 419 if (--ctx->active_traversals == 0) 420 cb(it, ctx->response_count); 421 } 422 423 } // anonymous namespace 424 get_item(sha1_hash const & target,std::function<void (item const &)> cb)425 void dht_tracker::get_item(sha1_hash const& target 426 , std::function<void(item const&)> cb) 427 { 428 auto ctx = std::make_shared<get_immutable_item_ctx>(int(m_nodes.size())); 429 for (auto& n : m_nodes) 430 n.second.dht.get_item(target, std::bind(&get_immutable_item_callback, _1, ctx, cb)); 431 } 432 433 // key is a 32-byte binary string, the public key to look up. 434 // the salt is optional get_item(public_key const & key,std::function<void (item const &,bool)> cb,std::string salt)435 void dht_tracker::get_item(public_key const& key 436 , std::function<void(item const&, bool)> cb 437 , std::string salt) 438 { 439 auto ctx = std::make_shared<get_mutable_item_ctx>(int(m_nodes.size())); 440 for (auto& n : m_nodes) 441 n.second.dht.get_item(key, salt, std::bind(&get_mutable_item_callback, _1, _2, ctx, cb)); 442 } 443 put_item(entry const & data,std::function<void (int)> cb)444 void dht_tracker::put_item(entry const& data 445 , std::function<void(int)> cb) 446 { 447 std::string flat_data; 448 bencode(std::back_inserter(flat_data), data); 449 sha1_hash const target = item_target_id(flat_data); 450 451 auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size())); 452 for (auto& n : m_nodes) 453 n.second.dht.put_item(target, data, std::bind(&put_immutable_item_callback 454 , _1, ctx, cb)); 455 } 456 put_item(public_key const & key,std::function<void (item const &,int)> cb,std::function<void (item &)> data_cb,std::string salt)457 void dht_tracker::put_item(public_key const& key 458 , std::function<void(item const&, int)> cb 459 , std::function<void(item&)> data_cb, std::string salt) 460 { 461 auto ctx = std::make_shared<put_item_ctx>(int(m_nodes.size())); 462 for (auto& n : m_nodes) 463 n.second.dht.put_item(key, salt, std::bind(&put_mutable_item_callback 464 , _1, _2, ctx, cb), data_cb); 465 } 466 direct_request(udp::endpoint const & ep,entry & e,std::function<void (msg const &)> f)467 void dht_tracker::direct_request(udp::endpoint const& ep, entry& e 468 , std::function<void(msg const&)> f) 469 { 470 for (auto& n : m_nodes) 471 { 472 if (ep.protocol() != (n.first.get_external_address().is_v4() ? udp::v4() : udp::v6())) 473 continue; 474 n.second.dht.direct_request(ep, e, f); 475 break; 476 } 477 } 478 incoming_error(error_code const & ec,udp::endpoint const & ep)479 void dht_tracker::incoming_error(error_code const& ec, udp::endpoint const& ep) 480 { 481 if (ec == boost::asio::error::connection_refused 482 || ec == boost::asio::error::connection_reset 483 || ec == boost::asio::error::connection_aborted 484 #ifdef _WIN32 485 || ec == error_code(ERROR_HOST_UNREACHABLE, system_category()) 486 || ec == error_code(ERROR_PORT_UNREACHABLE, system_category()) 487 || ec == error_code(ERROR_CONNECTION_REFUSED, system_category()) 488 || ec == error_code(ERROR_CONNECTION_ABORTED, system_category()) 489 #endif 490 ) 491 { 492 for (auto& n : m_nodes) 493 n.second.dht.unreachable(ep); 494 } 495 } 496 incoming_packet(aux::listen_socket_handle const & s,udp::endpoint const & ep,span<char const> const buf)497 bool dht_tracker::incoming_packet(aux::listen_socket_handle const& s 498 , udp::endpoint const& ep, span<char const> const buf) 499 { 500 int const buf_size = int(buf.size()); 501 if (buf_size <= 20 502 || buf.front() != 'd' 503 || buf.back() != 'e') return false; 504 505 m_counters.inc_stats_counter(counters::dht_bytes_in, buf_size); 506 // account for IP and UDP overhead 507 m_counters.inc_stats_counter(counters::recv_ip_overhead_bytes 508 , is_v6(ep) ? 48 : 28); 509 m_counters.inc_stats_counter(counters::dht_messages_in); 510 511 if (m_settings.ignore_dark_internet && is_v4(ep)) 512 { 513 address_v4::bytes_type b = ep.address().to_v4().to_bytes(); 514 515 // these are class A networks not available to the public 516 // if we receive messages from here, that seems suspicious 517 static std::uint8_t const class_a[] = { 3, 6, 7, 9, 11, 19, 21, 22, 25 518 , 26, 28, 29, 30, 33, 34, 48, 56 }; 519 520 if (std::find(std::begin(class_a), std::end(class_a), b[0]) != std::end(class_a)) 521 { 522 m_counters.inc_stats_counter(counters::dht_messages_in_dropped); 523 return true; 524 } 525 } 526 527 if (!m_blocker.incoming(ep.address(), clock_type::now(), m_log)) 528 { 529 m_counters.inc_stats_counter(counters::dht_messages_in_dropped); 530 return true; 531 } 532 533 TORRENT_ASSERT(buf_size > 0); 534 535 int pos; 536 error_code err; 537 int const ret = bdecode(buf.data(), buf.data() + buf_size, m_msg, err, &pos, 10, 500); 538 if (ret != 0) 539 { 540 m_counters.inc_stats_counter(counters::dht_messages_in_dropped); 541 #ifndef TORRENT_DISABLE_LOGGING 542 m_log->log_packet(dht_logger::incoming_message, buf, ep); 543 #endif 544 return false; 545 } 546 547 if (m_msg.type() != bdecode_node::dict_t) 548 { 549 m_counters.inc_stats_counter(counters::dht_messages_in_dropped); 550 #ifndef TORRENT_DISABLE_LOGGING 551 m_log->log_packet(dht_logger::incoming_message, buf, ep); 552 #endif 553 // it's not a good idea to send a response to an invalid messages 554 return false; 555 } 556 557 #ifndef TORRENT_DISABLE_LOGGING 558 m_log->log_packet(dht_logger::incoming_message, buf, ep); 559 #endif 560 561 libtorrent::dht::msg const m(m_msg, ep); 562 for (auto& n : m_nodes) 563 n.second.dht.incoming(s, m); 564 return true; 565 } 566 tracker_node(io_service & ios,aux::listen_socket_handle const & s,socket_manager * sock,dht::settings const & settings,node_id const & nid,dht_observer * observer,counters & cnt,get_foreign_node_t get_foreign_node,dht_storage_interface & storage)567 dht_tracker::tracker_node::tracker_node(io_service& ios 568 , aux::listen_socket_handle const& s, socket_manager* sock 569 , dht::settings const& settings 570 , node_id const& nid 571 , dht_observer* observer, counters& cnt 572 , get_foreign_node_t get_foreign_node 573 , dht_storage_interface& storage) 574 : dht(s, sock, settings, nid, observer, cnt, std::move(get_foreign_node), storage) 575 , connection_timer(ios) 576 {} 577 live_nodes(node_id const & nid)578 std::vector<std::pair<node_id, udp::endpoint>> dht_tracker::live_nodes(node_id const& nid) 579 { 580 std::vector<std::pair<node_id, udp::endpoint>> ret; 581 582 auto n = std::find_if(m_nodes.begin(), m_nodes.end() 583 , [&](tracker_nodes_t::value_type const& v) { return v.second.dht.nid() == nid; }); 584 585 if (n != m_nodes.end()) 586 { 587 n->second.dht.m_table.for_each_node([&ret](node_entry const& e) 588 { ret.emplace_back(e.id, e.endpoint); }, nullptr); 589 } 590 591 return ret; 592 } 593 594 namespace { 595 save_nodes(node const & dht)596 std::vector<udp::endpoint> save_nodes(node const& dht) 597 { 598 std::vector<udp::endpoint> ret; 599 600 dht.m_table.for_each_node([&ret](node_entry const& e) 601 { ret.push_back(e.ep()); }); 602 603 return ret; 604 } 605 606 } // anonymous namespace 607 state() const608 dht_state dht_tracker::state() const 609 { 610 dht_state ret; 611 for (auto& n : m_nodes) 612 { 613 // use the local rather than external address because if the user is behind NAT 614 // we won't know the external IP on startup 615 ret.nids.emplace_back(n.first.get_local_endpoint().address(), n.second.dht.nid()); 616 auto nodes = save_nodes(n.second.dht); 617 ret.nodes.insert(ret.nodes.end(), nodes.begin(), nodes.end()); 618 } 619 return ret; 620 } 621 add_node(udp::endpoint const & node)622 void dht_tracker::add_node(udp::endpoint const& node) 623 { 624 for (auto& n : m_nodes) 625 n.second.dht.add_node(node); 626 } 627 add_router_node(udp::endpoint const & node)628 void dht_tracker::add_router_node(udp::endpoint const& node) 629 { 630 for (auto& n : m_nodes) 631 n.second.dht.add_router_node(node); 632 } 633 has_quota()634 bool dht_tracker::has_quota() 635 { 636 time_point const now = clock_type::now(); 637 time_duration const delta = now - m_last_tick; 638 m_last_tick = now; 639 640 std::int64_t const limit = m_settings.upload_rate_limit; 641 642 // allow 3 seconds worth of burst 643 std::int64_t const max_accrue = std::min(3 * limit, std::int64_t(std::numeric_limits<int>::max())); 644 645 if (delta >= seconds(3) 646 || delta >= microseconds(std::numeric_limits<int>::max() / limit)) 647 { 648 m_send_quota = aux::numeric_cast<int>(max_accrue); 649 return true; 650 } 651 652 int const add = aux::numeric_cast<int>(limit * total_microseconds(delta) / 1000000); 653 654 if (max_accrue - m_send_quota < add) 655 { 656 m_send_quota = aux::numeric_cast<int>(max_accrue); 657 return true; 658 } 659 else 660 { 661 // add any new quota we've accrued since last time 662 m_send_quota += add; 663 } 664 TORRENT_ASSERT(m_send_quota <= max_accrue); 665 return m_send_quota > 0; 666 } 667 send_packet(aux::listen_socket_handle const & s,entry & e,udp::endpoint const & addr)668 bool dht_tracker::send_packet(aux::listen_socket_handle const& s, entry& e, udp::endpoint const& addr) 669 { 670 TORRENT_ASSERT(m_nodes.find(s) != m_nodes.end()); 671 672 static_assert(LIBTORRENT_VERSION_MINOR < 16, "version number not supported by DHT"); 673 static_assert(LIBTORRENT_VERSION_TINY < 16, "version number not supported by DHT"); 674 static char const version_str[] = {'L', 'T' 675 , LIBTORRENT_VERSION_MAJOR, (LIBTORRENT_VERSION_MINOR << 4) | LIBTORRENT_VERSION_TINY}; 676 e["v"] = std::string(version_str, version_str + 4); 677 678 m_send_buf.clear(); 679 bencode(std::back_inserter(m_send_buf), e); 680 681 // update the quota. We won't prevent the packet to be sent if we exceed 682 // the quota, we'll just (potentially) block the next incoming request. 683 684 m_send_quota -= int(m_send_buf.size()); 685 686 error_code ec; 687 if (s.get_local_endpoint().protocol().family() != addr.protocol().family()) 688 { 689 // the node is trying to send a packet to a different address family 690 // than its socket, this can happen during bootstrap 691 // pick a node with the right address family and use its socket 692 auto n = std::find_if(m_nodes.begin(), m_nodes.end() 693 , [&](tracker_nodes_t::value_type const& v) 694 { return v.first.get_local_endpoint().protocol().family() == addr.protocol().family(); }); 695 696 if (n != m_nodes.end()) 697 m_send_fun(n->first, addr, m_send_buf, ec, {}); 698 else 699 ec = boost::asio::error::address_family_not_supported; 700 } 701 else 702 { 703 m_send_fun(s, addr, m_send_buf, ec, {}); 704 } 705 706 if (ec) 707 { 708 m_counters.inc_stats_counter(counters::dht_messages_out_dropped); 709 #ifndef TORRENT_DISABLE_LOGGING 710 m_log->log_packet(dht_logger::outgoing_message, m_send_buf, addr); 711 #endif 712 return false; 713 } 714 715 m_counters.inc_stats_counter(counters::dht_bytes_out, int(m_send_buf.size())); 716 // account for IP and UDP overhead 717 m_counters.inc_stats_counter(counters::sent_ip_overhead_bytes 718 , is_v6(addr) ? 48 : 28); 719 m_counters.inc_stats_counter(counters::dht_messages_out); 720 #ifndef TORRENT_DISABLE_LOGGING 721 m_log->log_packet(dht_logger::outgoing_message, m_send_buf, addr); 722 #endif 723 return true; 724 } 725 726 }} 727