1 /* 2 3 Copyright (c) 2003-2018, Arvid Norberg 4 All rights reserved. 5 6 Redistribution and use in source and binary forms, with or without 7 modification, are permitted provided that the following conditions 8 are met: 9 10 * Redistributions of source code must retain the above copyright 11 notice, this list of conditions and the following disclaimer. 12 * Redistributions in binary form must reproduce the above copyright 13 notice, this list of conditions and the following disclaimer in 14 the documentation and/or other materials provided with the distribution. 15 * Neither the name of the author nor the names of its 16 contributors may be used to endorse or promote products derived 17 from this software without specific prior written permission. 18 19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 POSSIBILITY OF SUCH DAMAGE. 30 31 */ 32 33 #include <vector> 34 #include <functional> 35 #include <cstdint> 36 37 #include "libtorrent/config.hpp" 38 #include "libtorrent/peer_connection.hpp" 39 #include "libtorrent/entry.hpp" 40 #include "libtorrent/bencode.hpp" 41 #include "libtorrent/alert_types.hpp" 42 #include "libtorrent/invariant_check.hpp" 43 #include "libtorrent/io.hpp" 44 #include "libtorrent/extensions.hpp" 45 #include "libtorrent/aux_/session_interface.hpp" 46 #include "libtorrent/peer_list.hpp" 47 #include "libtorrent/aux_/socket_type.hpp" 48 #include "libtorrent/hasher.hpp" 49 #include "libtorrent/assert.hpp" 50 #include "libtorrent/broadcast_socket.hpp" 51 #include "libtorrent/torrent.hpp" 52 #include "libtorrent/peer_info.hpp" 53 #include "libtorrent/bt_peer_connection.hpp" 54 #include "libtorrent/error.hpp" 55 #include "libtorrent/aux_/alloca.hpp" 56 #include "libtorrent/disk_interface.hpp" 57 #include "libtorrent/bandwidth_manager.hpp" 58 #include "libtorrent/request_blocks.hpp" // for request_a_block 59 #include "libtorrent/performance_counters.hpp" // for counters 60 #include "libtorrent/alert_manager.hpp" // for alert_manager 61 #include "libtorrent/ip_filter.hpp" 62 #include "libtorrent/ip_voter.hpp" 63 #include "libtorrent/kademlia/node_id.hpp" 64 #include "libtorrent/close_reason.hpp" 65 #include "libtorrent/aux_/has_block.hpp" 66 #include "libtorrent/aux_/time.hpp" 67 #include "libtorrent/buffer.hpp" 68 #include "libtorrent/aux_/array.hpp" 69 #include "libtorrent/aux_/set_socket_buffer.hpp" 70 71 #if TORRENT_USE_ASSERTS 72 #include <set> 73 #endif 74 75 #ifdef TORRENT_USE_OPENSSL 76 #include <openssl/rand.h> 77 #endif 78 79 #ifndef TORRENT_DISABLE_LOGGING 80 #include <cstdarg> // for va_start, va_end 81 #include <cstdio> // for vsnprintf 82 #include "libtorrent/socket_io.hpp" 83 #include "libtorrent/hex.hpp" // to_hex 84 #endif 85 86 #include "libtorrent/aux_/torrent_impl.hpp" 87 88 //#define TORRENT_CORRUPT_DATA 89 90 using namespace std::placeholders; 91 92 namespace libtorrent { 93 94 constexpr request_flags_t peer_connection::time_critical; 95 constexpr request_flags_t peer_connection::busy; 96 97 namespace { 98 99 // the limits of the download queue size 100 constexpr int min_request_queue = 2; 101 pending_block_in_buffer(pending_block const & pb)102 bool pending_block_in_buffer(pending_block const& pb) 103 { 104 return pb.send_buffer_offset != pending_block::not_in_buffer; 105 } 106 107 } 108 109 constexpr piece_index_t piece_block_progress::invalid_index; 110 111 constexpr disconnect_severity_t peer_connection_interface::normal; 112 constexpr disconnect_severity_t peer_connection_interface::failure; 113 constexpr disconnect_severity_t peer_connection_interface::peer_error; 114 115 #if TORRENT_USE_ASSERTS is_single_thread() const116 bool peer_connection::is_single_thread() const 117 { 118 #ifdef TORRENT_USE_INVARIANT_CHECKS 119 std::shared_ptr<torrent> t = m_torrent.lock(); 120 if (!t) return true; 121 return t->is_single_thread(); 122 #else 123 return true; 124 #endif 125 } 126 #endif 127 peer_connection(peer_connection_args const & pack)128 peer_connection::peer_connection(peer_connection_args const& pack) 129 : peer_connection_hot_members(pack.tor, *pack.ses, *pack.sett) 130 , m_socket(pack.s) 131 , m_peer_info(pack.peerinfo) 132 , m_counters(*pack.stats_counters) 133 , m_num_pieces(0) 134 , m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue)) 135 , m_remote(pack.endp) 136 , m_disk_thread(*pack.disk_thread) 137 , m_ios(*pack.ios) 138 , m_work(m_ios) 139 , m_outstanding_piece_verification(0) 140 , m_outgoing(!pack.tor.expired()) 141 , m_received_listen_port(false) 142 , m_fast_reconnect(false) 143 , m_failed(false) 144 , m_connected(pack.tor.expired()) 145 , m_request_large_blocks(false) 146 #ifndef TORRENT_DISABLE_SHARE_MODE 147 , m_share_mode(false) 148 #endif 149 , m_upload_only(false) 150 , m_bitfield_received(false) 151 , m_no_download(false) 152 , m_holepunch_mode(false) 153 , m_peer_choked(true) 154 , m_have_all(false) 155 , m_peer_interested(false) 156 , m_need_interest_update(false) 157 , m_has_metadata(true) 158 , m_exceeded_limit(false) 159 , m_slow_start(true) 160 { 161 m_counters.inc_stats_counter(counters::num_tcp_peers + m_socket->type() - 1); 162 std::shared_ptr<torrent> t = m_torrent.lock(); 163 164 if (m_connected) 165 m_counters.inc_stats_counter(counters::num_peers_connected); 166 else if (m_connecting) 167 m_counters.inc_stats_counter(counters::num_peers_half_open); 168 169 // if t is nullptr, we better not be connecting, since 170 // we can't decrement the connecting counter 171 TORRENT_ASSERT(t || !m_connecting); 172 #if TORRENT_ABI_VERSION == 1 173 m_est_reciprocation_rate = m_settings.get_int(settings_pack::default_est_reciprocation_rate); 174 #endif 175 176 m_channel_state[upload_channel] = peer_info::bw_idle; 177 m_channel_state[download_channel] = peer_info::bw_idle; 178 179 m_quota[0] = 0; 180 m_quota[1] = 0; 181 182 TORRENT_ASSERT(pack.peerinfo == nullptr || pack.peerinfo->banned == false); 183 #ifndef TORRENT_DISABLE_LOGGING 184 if (should_log(m_outgoing ? peer_log_alert::outgoing : peer_log_alert::incoming)) 185 { 186 error_code ec; 187 TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec); 188 tcp::endpoint local_ep = m_socket->local_endpoint(ec); 189 190 peer_log(m_outgoing ? peer_log_alert::outgoing : peer_log_alert::incoming 191 , m_outgoing ? "OUTGOING_CONNECTION" : "INCOMING_CONNECTION" 192 , "ep: %s type: %s seed: %d p: %p local: %s" 193 , print_endpoint(m_remote).c_str() 194 , m_socket->type_name() 195 , m_peer_info ? m_peer_info->seed : 0 196 , static_cast<void*>(m_peer_info) 197 , print_endpoint(local_ep).c_str()); 198 } 199 #endif 200 201 // this counter should not be incremented until we know constructing this 202 // peer object can't fail anymore 203 if (m_connecting && t) t->inc_num_connecting(m_peer_info); 204 205 #if TORRENT_USE_ASSERTS 206 piece_failed = false; 207 m_in_constructor = false; 208 #endif 209 } 210 211 template <typename Fun, typename... Args> wrap(Fun f,Args &&...a)212 void peer_connection::wrap(Fun f, Args&&... a) 213 #ifndef BOOST_NO_EXCEPTIONS 214 try 215 #endif 216 { 217 (this->*f)(std::forward<Args>(a)...); 218 } 219 #ifndef BOOST_NO_EXCEPTIONS 220 catch (std::bad_alloc const&) { 221 #ifndef TORRENT_DISABLE_LOGGING 222 peer_log(peer_log_alert::info, "EXCEPTION", "bad_alloc"); 223 #endif 224 disconnect(make_error_code(boost::system::errc::not_enough_memory) 225 , operation_t::unknown); 226 } 227 catch (system_error const& e) { 228 #ifndef TORRENT_DISABLE_LOGGING 229 peer_log(peer_log_alert::info, "EXCEPTION", "(%d %s) %s" 230 , e.code().value() 231 , e.code().message().c_str() 232 , e.what()); 233 #endif 234 disconnect(e.code(), operation_t::unknown); 235 } 236 catch (std::exception const& e) { 237 TORRENT_UNUSED(e); 238 #ifndef TORRENT_DISABLE_LOGGING 239 peer_log(peer_log_alert::info, "EXCEPTION", "%s", e.what()); 240 #endif 241 disconnect(make_error_code(boost::system::errc::not_enough_memory) 242 , operation_t::sock_write); 243 } 244 #endif // BOOST_NO_EXCEPTIONS 245 timeout() const246 int peer_connection::timeout() const 247 { 248 TORRENT_ASSERT(is_single_thread()); 249 int ret = m_settings.get_int(settings_pack::peer_timeout); 250 #if TORRENT_USE_I2P 251 if (m_peer_info && m_peer_info->is_i2p_addr) 252 { 253 // quadruple the timeout for i2p peers 254 ret *= 4; 255 } 256 #endif 257 return ret; 258 } 259 on_exception(std::exception const & e)260 void peer_connection::on_exception(std::exception const& e) 261 { 262 TORRENT_UNUSED(e); 263 #ifndef TORRENT_DISABLE_LOGGING 264 peer_log(peer_log_alert::info, "PEER_ERROR", "ERROR: %s" 265 , e.what()); 266 #endif 267 disconnect(error_code(), operation_t::unknown, peer_error); 268 } 269 on_error(error_code const & ec)270 void peer_connection::on_error(error_code const& ec) 271 { 272 disconnect(ec, operation_t::unknown, peer_error); 273 } 274 275 #if TORRENT_ABI_VERSION == 1 increase_est_reciprocation_rate()276 void peer_connection::increase_est_reciprocation_rate() 277 { 278 TORRENT_ASSERT(is_single_thread()); 279 m_est_reciprocation_rate += m_est_reciprocation_rate 280 * m_settings.get_int(settings_pack::increase_est_reciprocation_rate) / 100; 281 } 282 decrease_est_reciprocation_rate()283 void peer_connection::decrease_est_reciprocation_rate() 284 { 285 TORRENT_ASSERT(is_single_thread()); 286 m_est_reciprocation_rate -= m_est_reciprocation_rate 287 * m_settings.get_int(settings_pack::decrease_est_reciprocation_rate) / 100; 288 } 289 #endif 290 get_priority(int const channel) const291 int peer_connection::get_priority(int const channel) const 292 { 293 TORRENT_ASSERT(is_single_thread()); 294 TORRENT_ASSERT(channel >= 0 && channel < 2); 295 int prio = 1; 296 for (int i = 0; i < num_classes(); ++i) 297 { 298 int class_prio = m_ses.peer_classes().at(class_at(i))->priority[channel]; 299 if (prio < class_prio) prio = class_prio; 300 } 301 302 std::shared_ptr<torrent> t = associated_torrent().lock(); 303 304 if (t) 305 { 306 for (int i = 0; i < t->num_classes(); ++i) 307 { 308 int class_prio = m_ses.peer_classes().at(t->class_at(i))->priority[channel]; 309 if (prio < class_prio) prio = class_prio; 310 } 311 } 312 return prio; 313 } 314 reset_choke_counters()315 void peer_connection::reset_choke_counters() 316 { 317 TORRENT_ASSERT(is_single_thread()); 318 m_downloaded_at_last_round= m_statistics.total_payload_download(); 319 m_uploaded_at_last_round = m_statistics.total_payload_upload(); 320 } 321 start()322 void peer_connection::start() 323 { 324 TORRENT_ASSERT(is_single_thread()); 325 TORRENT_ASSERT(m_peer_info == nullptr || m_peer_info->connection == this); 326 std::shared_ptr<torrent> t = m_torrent.lock(); 327 328 if (!m_outgoing) 329 { 330 error_code ec; 331 m_socket->non_blocking(true, ec); 332 if (ec) 333 { 334 disconnect(ec, operation_t::iocontrol); 335 return; 336 } 337 m_remote = m_socket->remote_endpoint(ec); 338 if (ec) 339 { 340 disconnect(ec, operation_t::getpeername); 341 return; 342 } 343 m_local = m_socket->local_endpoint(ec); 344 if (ec) 345 { 346 disconnect(ec, operation_t::getname); 347 return; 348 } 349 if (is_v4(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0) 350 { 351 m_socket->set_option(type_of_service(char(m_settings.get_int(settings_pack::peer_tos))), ec); 352 #ifndef TORRENT_DISABLE_LOGGING 353 if (should_log(peer_log_alert::outgoing)) 354 { 355 peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s" 356 , m_settings.get_int(settings_pack::peer_tos), ec.message().c_str()); 357 } 358 #endif 359 } 360 #if defined IPV6_TCLASS 361 else if (is_v6(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0) 362 { 363 m_socket->set_option(traffic_class(char(m_settings.get_int(settings_pack::peer_tos))), ec); 364 } 365 #endif 366 } 367 368 #ifndef TORRENT_DISABLE_LOGGING 369 if (should_log(peer_log_alert::info)) 370 { 371 peer_log(peer_log_alert::info, "SET_PEER_CLASS", "a: %s" 372 , print_address(m_remote.address()).c_str()); 373 } 374 #endif 375 376 m_ses.set_peer_classes(this, m_remote.address(), m_socket->type()); 377 378 #ifndef TORRENT_DISABLE_LOGGING 379 if (should_log(peer_log_alert::info)) 380 { 381 std::string classes; 382 for (int i = 0; i < num_classes(); ++i) 383 { 384 classes += m_ses.peer_classes().at(class_at(i))->label; 385 classes += ' '; 386 } 387 peer_log(peer_log_alert::info, "CLASS", "%s" 388 , classes.c_str()); 389 } 390 #endif 391 392 if (t && t->ready_for_connections()) 393 { 394 init(); 395 } 396 397 // if this is an incoming connection, we're done here 398 if (!m_connecting) 399 { 400 error_code err; 401 aux::set_socket_buffer_size(*m_socket, m_settings, err); 402 #ifndef TORRENT_DISABLE_LOGGING 403 if (err && should_log(peer_log_alert::incoming)) 404 { 405 peer_log(peer_log_alert::incoming, "SOCKET_BUFFER", "%s %s" 406 , print_endpoint(m_remote).c_str() 407 , print_error(err).c_str()); 408 } 409 #endif 410 411 return; 412 } 413 414 #ifndef TORRENT_DISABLE_LOGGING 415 if (should_log(peer_log_alert::outgoing)) 416 { 417 peer_log(peer_log_alert::outgoing, "OPEN", "protocol: %s" 418 , (is_v4(m_remote) ? "IPv4" : "IPv6")); 419 } 420 #endif 421 error_code ec; 422 m_socket->open(m_remote.protocol(), ec); 423 if (ec) 424 { 425 disconnect(ec, operation_t::sock_open); 426 return; 427 } 428 429 tcp::endpoint const bound_ip = m_ses.bind_outgoing_socket(*m_socket 430 , m_remote.address(), ec); 431 #ifndef TORRENT_DISABLE_LOGGING 432 if (should_log(peer_log_alert::outgoing)) 433 { 434 peer_log(peer_log_alert::outgoing, "BIND", "dst: %s ec: %s" 435 , print_endpoint(bound_ip).c_str() 436 , ec.message().c_str()); 437 } 438 #else 439 TORRENT_UNUSED(bound_ip); 440 #endif 441 if (ec) 442 { 443 disconnect(ec, operation_t::sock_bind); 444 return; 445 } 446 447 { 448 error_code err; 449 aux::set_socket_buffer_size(*m_socket, m_settings, err); 450 #ifndef TORRENT_DISABLE_LOGGING 451 if (err && should_log(peer_log_alert::outgoing)) 452 { 453 peer_log(peer_log_alert::outgoing, "SOCKET_BUFFER", "%s %s" 454 , print_endpoint(m_remote).c_str() 455 , print_error(err).c_str()); 456 } 457 #endif 458 } 459 460 #ifndef TORRENT_DISABLE_LOGGING 461 if (should_log(peer_log_alert::outgoing)) 462 { 463 peer_log(peer_log_alert::outgoing, "ASYNC_CONNECT", "dst: %s" 464 , print_endpoint(m_remote).c_str()); 465 } 466 #endif 467 ADD_OUTSTANDING_ASYNC("peer_connection::on_connection_complete"); 468 469 auto conn = self(); 470 m_socket->async_connect(m_remote 471 , [conn](error_code const& e) { conn->wrap(&peer_connection::on_connection_complete, e); }); 472 m_connect = aux::time_now(); 473 474 sent_syn(is_v6(m_remote)); 475 476 if (t && t->alerts().should_post<peer_connect_alert>()) 477 { 478 t->alerts().emplace_alert<peer_connect_alert>( 479 t->get_handle(), remote(), pid(), m_socket->type()); 480 } 481 #ifndef TORRENT_DISABLE_LOGGING 482 if (should_log(peer_log_alert::info)) 483 { 484 peer_log(peer_log_alert::info, "LOCAL ENDPOINT", "e: %s" 485 , print_endpoint(m_socket->local_endpoint(ec)).c_str()); 486 } 487 #endif 488 } 489 update_interest()490 void peer_connection::update_interest() 491 { 492 TORRENT_ASSERT(is_single_thread()); 493 if (!m_need_interest_update) 494 { 495 // we're the first to request an interest update 496 // post a message in order to delay it enough for 497 // any potential other messages already in the queue 498 // to not trigger another one. This effectively defer 499 // the update until the current message queue is 500 // flushed 501 auto conn = self(); 502 m_ios.post([conn] { conn->wrap(&peer_connection::do_update_interest); }); 503 } 504 m_need_interest_update = true; 505 } 506 do_update_interest()507 void peer_connection::do_update_interest() 508 { 509 TORRENT_ASSERT(is_single_thread()); 510 TORRENT_ASSERT(m_need_interest_update); 511 m_need_interest_update = false; 512 513 std::shared_ptr<torrent> t = m_torrent.lock(); 514 if (!t) return; 515 516 // if m_have_piece is 0, it means the connections 517 // have not been initialized yet. The interested 518 // flag will be updated once they are. 519 if (m_have_piece.empty()) 520 { 521 #ifndef TORRENT_DISABLE_LOGGING 522 peer_log(peer_log_alert::info, "UPDATE_INTEREST", "connections not initialized"); 523 #endif 524 return; 525 } 526 if (!t->ready_for_connections()) 527 { 528 #ifndef TORRENT_DISABLE_LOGGING 529 peer_log(peer_log_alert::info, "UPDATE_INTEREST", "not ready for connections"); 530 #endif 531 return; 532 } 533 534 bool interested = false; 535 if (!t->is_upload_only()) 536 { 537 t->need_picker(); 538 piece_picker const& p = t->picker(); 539 piece_index_t const end_piece(p.num_pieces()); 540 for (piece_index_t j(0); j != end_piece; ++j) 541 { 542 if (m_have_piece[j] 543 && t->piece_priority(j) > dont_download 544 && !p.has_piece_passed(j)) 545 { 546 interested = true; 547 #ifndef TORRENT_DISABLE_LOGGING 548 peer_log(peer_log_alert::info, "UPDATE_INTEREST", "interesting, piece: %d" 549 , static_cast<int>(j)); 550 #endif 551 break; 552 } 553 } 554 } 555 556 #ifndef TORRENT_DISABLE_LOGGING 557 if (!interested) 558 peer_log(peer_log_alert::info, "UPDATE_INTEREST", "not interesting"); 559 #endif 560 561 if (!interested) send_not_interested(); 562 else t->peer_is_interesting(*this); 563 564 TORRENT_ASSERT(in_handshake() || is_interesting() == interested); 565 566 disconnect_if_redundant(); 567 } 568 569 #ifndef TORRENT_DISABLE_LOGGING should_log(peer_log_alert::direction_t) const570 bool peer_connection::should_log(peer_log_alert::direction_t) const 571 { 572 return m_ses.alerts().should_post<peer_log_alert>(); 573 } 574 peer_log(peer_log_alert::direction_t direction,char const * event) const575 void peer_connection::peer_log(peer_log_alert::direction_t direction 576 , char const* event) const noexcept 577 { 578 peer_log(direction, event, ""); 579 } 580 581 TORRENT_FORMAT(4,5) peer_log(peer_log_alert::direction_t direction,char const * event,char const * fmt,...) const582 void peer_connection::peer_log(peer_log_alert::direction_t direction 583 , char const* event, char const* fmt, ...) const noexcept try 584 { 585 TORRENT_ASSERT(is_single_thread()); 586 587 if (!m_ses.alerts().should_post<peer_log_alert>()) return; 588 589 va_list v; 590 va_start(v, fmt); 591 592 torrent_handle h; 593 std::shared_ptr<torrent> t = m_torrent.lock(); 594 if (t) h = t->get_handle(); 595 596 m_ses.alerts().emplace_alert<peer_log_alert>( 597 h, m_remote, m_peer_id, direction, event, fmt, v); 598 599 va_end(v); 600 601 } 602 catch (std::exception const&) {} 603 #endif 604 605 #ifndef TORRENT_DISABLE_EXTENSIONS add_extension(std::shared_ptr<peer_plugin> ext)606 void peer_connection::add_extension(std::shared_ptr<peer_plugin> ext) 607 { 608 TORRENT_ASSERT(is_single_thread()); 609 m_extensions.push_back(ext); 610 } 611 find_plugin(string_view type)612 peer_plugin const* peer_connection::find_plugin(string_view type) 613 { 614 TORRENT_ASSERT(is_single_thread()); 615 auto p = std::find_if(m_extensions.begin(), m_extensions.end() 616 , [&](std::shared_ptr<peer_plugin> const& e) { return e->type() == type; }); 617 return p != m_extensions.end() ? p->get() : nullptr; 618 } 619 #endif 620 send_allowed_set()621 void peer_connection::send_allowed_set() 622 { 623 TORRENT_ASSERT(is_single_thread()); 624 INVARIANT_CHECK; 625 626 std::shared_ptr<torrent> t = m_torrent.lock(); 627 TORRENT_ASSERT(t); 628 629 if (!t->valid_metadata()) 630 { 631 #ifndef TORRENT_DISABLE_LOGGING 632 peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because we don't have metadata"); 633 #endif 634 return; 635 } 636 637 #ifndef TORRENT_DISABLE_SUPERSEEDING 638 if (t->super_seeding()) 639 { 640 #ifndef TORRENT_DISABLE_LOGGING 641 peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because of super seeding"); 642 #endif 643 return; 644 } 645 #endif 646 647 if (upload_only()) 648 { 649 #ifndef TORRENT_DISABLE_LOGGING 650 peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because peer is upload only"); 651 #endif 652 return; 653 } 654 655 int const num_allowed_pieces = m_settings.get_int(settings_pack::allowed_fast_set_size); 656 if (num_allowed_pieces <= 0) return; 657 658 if (!t->valid_metadata()) return; 659 660 int const num_pieces = t->torrent_file().num_pieces(); 661 662 if (num_allowed_pieces >= num_pieces) 663 { 664 // this is a special case where we have more allowed 665 // fast pieces than pieces in the torrent. Just send 666 // an allowed fast message for every single piece 667 for (auto const i : t->torrent_file().piece_range()) 668 { 669 // there's no point in offering fast pieces 670 // that the peer already has 671 if (has_piece(i)) continue; 672 673 write_allow_fast(i); 674 TORRENT_ASSERT(std::find(m_accept_fast.begin() 675 , m_accept_fast.end(), i) 676 == m_accept_fast.end()); 677 if (m_accept_fast.empty()) 678 { 679 m_accept_fast.reserve(10); 680 m_accept_fast_piece_cnt.reserve(10); 681 } 682 m_accept_fast.push_back(i); 683 m_accept_fast_piece_cnt.push_back(0); 684 } 685 return; 686 } 687 688 std::string x; 689 address const& addr = m_remote.address(); 690 if (addr.is_v4()) 691 { 692 address_v4::bytes_type bytes = addr.to_v4().to_bytes(); 693 x.assign(reinterpret_cast<char*>(bytes.data()), bytes.size()); 694 } 695 else 696 { 697 address_v6::bytes_type bytes = addr.to_v6().to_bytes(); 698 x.assign(reinterpret_cast<char*>(bytes.data()), bytes.size()); 699 } 700 x.append(t->torrent_file().info_hash().data(), 20); 701 702 sha1_hash hash = hasher(x).final(); 703 int attempts = 0; 704 int loops = 0; 705 for (;;) 706 { 707 char const* p = hash.data(); 708 for (int i = 0; i < int(hash.size() / sizeof(std::uint32_t)); ++i) 709 { 710 ++loops; 711 TORRENT_ASSERT(num_pieces > 0); 712 piece_index_t const piece(int(detail::read_uint32(p) % std::uint32_t(num_pieces))); 713 if (std::find(m_accept_fast.begin(), m_accept_fast.end(), piece) 714 != m_accept_fast.end()) 715 { 716 // this is our safety-net to make sure this loop terminates, even 717 // under the worst conditions 718 if (++loops > 500) return; 719 continue; 720 } 721 722 if (!has_piece(piece)) 723 { 724 write_allow_fast(piece); 725 if (m_accept_fast.empty()) 726 { 727 m_accept_fast.reserve(10); 728 m_accept_fast_piece_cnt.reserve(10); 729 } 730 m_accept_fast.push_back(piece); 731 m_accept_fast_piece_cnt.push_back(0); 732 } 733 if (++attempts >= num_allowed_pieces) return; 734 } 735 hash = hasher(hash).final(); 736 } 737 } 738 on_metadata_impl()739 void peer_connection::on_metadata_impl() 740 { 741 TORRENT_ASSERT(is_single_thread()); 742 std::shared_ptr<torrent> t = associated_torrent().lock(); 743 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all); 744 m_num_pieces = m_have_piece.count(); 745 746 piece_index_t const limit(m_num_pieces); 747 748 // now that we know how many pieces there are 749 // remove any invalid allowed_fast and suggest pieces 750 // now that we know what the number of pieces are 751 m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin(), m_allowed_fast.end() 752 , [=](piece_index_t const p) { return p >= limit; }) 753 , m_allowed_fast.end()); 754 755 // remove any piece suggested to us whose index is invalid 756 // now that we know how many pieces there are 757 m_suggested_pieces.erase( 758 std::remove_if(m_suggested_pieces.begin(), m_suggested_pieces.end() 759 , [=](piece_index_t const p) { return p >= limit; }) 760 , m_suggested_pieces.end()); 761 762 on_metadata(); 763 if (m_disconnecting) return; 764 } 765 init()766 void peer_connection::init() 767 { 768 TORRENT_ASSERT(is_single_thread()); 769 INVARIANT_CHECK; 770 771 std::shared_ptr<torrent> t = m_torrent.lock(); 772 TORRENT_ASSERT(t); 773 TORRENT_ASSERT(t->valid_metadata()); 774 TORRENT_ASSERT(t->ready_for_connections()); 775 776 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all); 777 778 if (m_have_all) 779 { 780 m_num_pieces = t->torrent_file().num_pieces(); 781 m_have_piece.set_all(); 782 } 783 784 #if TORRENT_USE_ASSERTS 785 TORRENT_ASSERT(!m_initialized); 786 m_initialized = true; 787 #endif 788 // now that we have a piece_picker, 789 // update it with this peer's pieces 790 791 TORRENT_ASSERT(m_num_pieces == m_have_piece.count()); 792 793 if (m_num_pieces == m_have_piece.size()) 794 { 795 #ifndef TORRENT_DISABLE_LOGGING 796 peer_log(peer_log_alert::info, "INIT", "this is a seed p: %p" 797 , static_cast<void*>(m_peer_info)); 798 #endif 799 800 TORRENT_ASSERT(m_have_piece.all_set()); 801 TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size()); 802 TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces()); 803 804 // if this is a web seed. we don't have a peer_info struct 805 t->set_seed(m_peer_info, true); 806 TORRENT_ASSERT(is_seed()); 807 m_upload_only = true; 808 809 t->peer_has_all(this); 810 811 #if TORRENT_USE_INVARIANT_CHECKS 812 if (t && t->has_picker()) 813 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 814 #endif 815 if (t->is_upload_only()) send_not_interested(); 816 else t->peer_is_interesting(*this); 817 disconnect_if_redundant(); 818 return; 819 } 820 821 TORRENT_ASSERT(!is_seed()); 822 823 // if we're a seed, we don't keep track of piece availability 824 if (t->has_picker()) 825 { 826 TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces()); 827 t->peer_has(m_have_piece, this); 828 bool interesting = false; 829 for (auto const i : m_have_piece.range()) 830 { 831 if (!m_have_piece[i]) continue; 832 // if the peer has a piece and we don't, the peer is interesting 833 if (!t->have_piece(i) 834 && t->picker().piece_priority(i) != dont_download) 835 interesting = true; 836 } 837 if (interesting) t->peer_is_interesting(*this); 838 else send_not_interested(); 839 } 840 else 841 { 842 update_interest(); 843 } 844 } 845 ~peer_connection()846 peer_connection::~peer_connection() 847 { 848 m_counters.inc_stats_counter(counters::num_tcp_peers + m_socket->type() - 1, -1); 849 850 // INVARIANT_CHECK; 851 TORRENT_ASSERT(!m_in_constructor); 852 TORRENT_ASSERT(!m_destructed); 853 #if TORRENT_USE_ASSERTS 854 m_destructed = true; 855 #endif 856 857 #if TORRENT_USE_ASSERTS 858 m_in_use = 0; 859 #endif 860 861 // decrement the stats counter 862 set_endgame(false); 863 864 if (m_interesting) 865 m_counters.inc_stats_counter(counters::num_peers_down_interested, -1); 866 if (m_peer_interested) 867 m_counters.inc_stats_counter(counters::num_peers_up_interested, -1); 868 if (!m_choked) 869 { 870 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1); 871 if (!ignore_unchoke_slots()) 872 m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); 873 } 874 if (!m_peer_choked) 875 m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1); 876 if (m_connected) 877 m_counters.inc_stats_counter(counters::num_peers_connected, -1); 878 m_connected = false; 879 if (!m_download_queue.empty()) 880 m_counters.inc_stats_counter(counters::num_peers_down_requests, -1); 881 882 // defensive 883 std::shared_ptr<torrent> t = m_torrent.lock(); 884 // if t is nullptr, we better not be connecting, since 885 // we can't decrement the connecting counter 886 TORRENT_ASSERT(t || !m_connecting); 887 888 // we should really have dealt with this already 889 if (m_connecting) 890 { 891 m_counters.inc_stats_counter(counters::num_peers_half_open, -1); 892 if (t) t->dec_num_connecting(m_peer_info); 893 m_connecting = false; 894 } 895 896 #ifndef TORRENT_DISABLE_EXTENSIONS 897 m_extensions.clear(); 898 #endif 899 900 #ifndef TORRENT_DISABLE_LOGGING 901 peer_log(peer_log_alert::info, "CONNECTION CLOSED"); 902 #endif 903 TORRENT_ASSERT(m_request_queue.empty()); 904 TORRENT_ASSERT(m_download_queue.empty()); 905 } 906 on_parole() const907 bool peer_connection::on_parole() const 908 { return peer_info_struct() && peer_info_struct()->on_parole; } 909 picker_options() const910 picker_options_t peer_connection::picker_options() const 911 { 912 TORRENT_ASSERT(is_single_thread()); 913 picker_options_t ret = m_picker_options; 914 915 std::shared_ptr<torrent> t = m_torrent.lock(); 916 TORRENT_ASSERT(t); 917 if (!t) return {}; 918 919 if (t->num_time_critical_pieces() > 0) 920 { 921 ret |= piece_picker::time_critical_mode; 922 } 923 924 if (t->is_sequential_download()) 925 { 926 ret |= piece_picker::sequential; 927 } 928 else if (t->num_have() < m_settings.get_int(settings_pack::initial_picker_threshold)) 929 { 930 // if we have fewer pieces than a certain threshold 931 // don't pick rare pieces, just pick random ones, 932 // and prioritize finishing them 933 ret |= piece_picker::prioritize_partials; 934 } 935 else 936 { 937 ret |= piece_picker::rarest_first; 938 939 if (m_snubbed) 940 { 941 // snubbed peers should request 942 // the common pieces first, just to make 943 // it more likely for all snubbed peers to 944 // request blocks from the same piece 945 ret |= piece_picker::reverse; 946 } 947 else 948 { 949 if (m_settings.get_bool(settings_pack::piece_extent_affinity) 950 && t->num_time_critical_pieces() == 0) 951 ret |= piece_picker::piece_extent_affinity; 952 } 953 } 954 955 if (m_settings.get_bool(settings_pack::prioritize_partial_pieces)) 956 ret |= piece_picker::prioritize_partials; 957 958 if (on_parole()) ret |= piece_picker::on_parole 959 | piece_picker::prioritize_partials; 960 961 // only one of rarest_first and sequential can be set. i.e. the sum of 962 // whether the bit is set or not may only be 0 or 1 (never 2) 963 TORRENT_ASSERT(((ret & piece_picker::rarest_first) ? 1 : 0) 964 + ((ret & piece_picker::sequential) ? 1 : 0) <= 1); 965 return ret; 966 } 967 fast_reconnect(bool r)968 void peer_connection::fast_reconnect(bool r) 969 { 970 TORRENT_ASSERT(is_single_thread()); 971 if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1) 972 return; 973 m_fast_reconnect = r; 974 peer_info_struct()->last_connected = std::uint16_t(m_ses.session_time()); 975 int const rewind = m_settings.get_int(settings_pack::min_reconnect_time) 976 * m_settings.get_int(settings_pack::max_failcount); 977 if (int(peer_info_struct()->last_connected) < rewind) peer_info_struct()->last_connected = 0; 978 else peer_info_struct()->last_connected -= std::uint16_t(rewind); 979 980 if (peer_info_struct()->fast_reconnects < 15) 981 ++peer_info_struct()->fast_reconnects; 982 } 983 received_piece(piece_index_t const index)984 void peer_connection::received_piece(piece_index_t const index) 985 { 986 TORRENT_ASSERT(is_single_thread()); 987 // dont announce during handshake 988 if (in_handshake()) return; 989 990 #ifndef TORRENT_DISABLE_LOGGING 991 peer_log(peer_log_alert::incoming, "RECEIVED", "piece: %d" 992 , static_cast<int>(index)); 993 #endif 994 995 // remove suggested pieces once we have them 996 auto i = std::find(m_suggested_pieces.begin(), m_suggested_pieces.end(), index); 997 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i); 998 999 // remove allowed fast pieces 1000 i = std::find(m_allowed_fast.begin(), m_allowed_fast.end(), index); 1001 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i); 1002 1003 if (has_piece(index)) 1004 { 1005 // if we got a piece that this peer has 1006 // it might have been the last interesting 1007 // piece this peer had. We might not be 1008 // interested anymore 1009 update_interest(); 1010 if (is_disconnecting()) return; 1011 } 1012 1013 if (disconnect_if_redundant()) return; 1014 1015 #if TORRENT_USE_ASSERTS 1016 std::shared_ptr<torrent> t = m_torrent.lock(); 1017 TORRENT_ASSERT(t); 1018 #endif 1019 } 1020 announce_piece(piece_index_t const index)1021 void peer_connection::announce_piece(piece_index_t const index) 1022 { 1023 TORRENT_ASSERT(is_single_thread()); 1024 // dont announce during handshake 1025 if (in_handshake()) return; 1026 1027 // optimization, don't send have messages 1028 // to peers that already have the piece 1029 if (!m_settings.get_bool(settings_pack::send_redundant_have) 1030 && has_piece(index)) 1031 { 1032 #ifndef TORRENT_DISABLE_LOGGING 1033 peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d SUPPRESSED" 1034 , static_cast<int>(index)); 1035 #endif 1036 return; 1037 } 1038 1039 if (disconnect_if_redundant()) return; 1040 1041 #ifndef TORRENT_DISABLE_LOGGING 1042 peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d" 1043 , static_cast<int>(index)); 1044 #endif 1045 write_have(index); 1046 #if TORRENT_USE_ASSERTS 1047 std::shared_ptr<torrent> t = m_torrent.lock(); 1048 TORRENT_ASSERT(t); 1049 #endif 1050 } 1051 has_piece(piece_index_t const i) const1052 bool peer_connection::has_piece(piece_index_t const i) const 1053 { 1054 TORRENT_ASSERT(is_single_thread()); 1055 #if TORRENT_USE_ASSERTS 1056 std::shared_ptr<torrent> t = m_torrent.lock(); 1057 TORRENT_ASSERT(t); 1058 TORRENT_ASSERT(t->valid_metadata()); 1059 TORRENT_ASSERT(i >= piece_index_t(0)); 1060 TORRENT_ASSERT(i < t->torrent_file().end_piece()); 1061 #endif 1062 if (m_have_piece.empty()) return false; 1063 return m_have_piece[i]; 1064 } 1065 request_queue() const1066 std::vector<pending_block> const& peer_connection::request_queue() const 1067 { 1068 TORRENT_ASSERT(is_single_thread()); 1069 return m_request_queue; 1070 } 1071 download_queue() const1072 std::vector<pending_block> const& peer_connection::download_queue() const 1073 { 1074 TORRENT_ASSERT(is_single_thread()); 1075 return m_download_queue; 1076 } 1077 upload_queue() const1078 std::vector<peer_request> const& peer_connection::upload_queue() const 1079 { 1080 TORRENT_ASSERT(is_single_thread()); 1081 return m_requests; 1082 } 1083 download_queue_time(int const extra_bytes) const1084 time_duration peer_connection::download_queue_time(int const extra_bytes) const 1085 { 1086 TORRENT_ASSERT(is_single_thread()); 1087 std::shared_ptr<torrent> t = m_torrent.lock(); 1088 TORRENT_ASSERT(t); 1089 1090 int rate = 0; 1091 1092 // if we haven't received any data recently, the current download rate 1093 // is not representative 1094 if (aux::time_now() - m_last_piece > seconds(30) && m_download_rate_peak > 0) 1095 { 1096 rate = m_download_rate_peak; 1097 } 1098 else if (aux::time_now() - m_last_unchoked < seconds(5) 1099 && m_statistics.total_payload_upload() < 2 * 0x4000) 1100 { 1101 // if we're have only been unchoked for a short period of time, 1102 // we don't know what rate we can get from this peer. Instead of assuming 1103 // the lowest possible rate, assume the average. 1104 1105 int peers_with_requests = int(stats_counters()[counters::num_peers_down_requests]); 1106 // avoid division by 0 1107 if (peers_with_requests == 0) peers_with_requests = 1; 1108 1109 // TODO: this should be the global download rate 1110 rate = t->statistics().transfer_rate(stat::download_payload) / peers_with_requests; 1111 } 1112 else 1113 { 1114 // current download rate in bytes per seconds 1115 rate = m_statistics.transfer_rate(stat::download_payload); 1116 } 1117 1118 // avoid division by zero 1119 if (rate < 50) rate = 50; 1120 1121 // average of current rate and peak 1122 // rate = (rate + m_download_rate_peak) / 2; 1123 1124 return milliseconds((m_outstanding_bytes + extra_bytes 1125 + m_queued_time_critical * t->block_size() * 1000) / rate); 1126 } 1127 add_stat(std::int64_t const downloaded,std::int64_t const uploaded)1128 void peer_connection::add_stat(std::int64_t const downloaded, std::int64_t const uploaded) 1129 { 1130 TORRENT_ASSERT(is_single_thread()); 1131 m_statistics.add_stat(downloaded, uploaded); 1132 } 1133 received_bytes(int const bytes_payload,int const bytes_protocol)1134 void peer_connection::received_bytes(int const bytes_payload, int const bytes_protocol) 1135 { 1136 TORRENT_ASSERT(is_single_thread()); 1137 m_statistics.received_bytes(bytes_payload, bytes_protocol); 1138 if (m_ignore_stats) return; 1139 std::shared_ptr<torrent> t = m_torrent.lock(); 1140 if (!t) return; 1141 t->received_bytes(bytes_payload, bytes_protocol); 1142 } 1143 sent_bytes(int const bytes_payload,int const bytes_protocol)1144 void peer_connection::sent_bytes(int const bytes_payload, int const bytes_protocol) 1145 { 1146 TORRENT_ASSERT(is_single_thread()); 1147 m_statistics.sent_bytes(bytes_payload, bytes_protocol); 1148 #ifndef TORRENT_DISABLE_EXTENSIONS 1149 if (bytes_payload) 1150 { 1151 for (auto const& e : m_extensions) 1152 { 1153 e->sent_payload(bytes_payload); 1154 } 1155 } 1156 #endif 1157 if (bytes_payload > 0) m_last_sent_payload = clock_type::now(); 1158 if (m_ignore_stats) return; 1159 std::shared_ptr<torrent> t = m_torrent.lock(); 1160 if (!t) return; 1161 t->sent_bytes(bytes_payload, bytes_protocol); 1162 } 1163 trancieve_ip_packet(int const bytes,bool const ipv6)1164 void peer_connection::trancieve_ip_packet(int const bytes, bool const ipv6) 1165 { 1166 TORRENT_ASSERT(is_single_thread()); 1167 m_statistics.trancieve_ip_packet(bytes, ipv6); 1168 if (m_ignore_stats) return; 1169 std::shared_ptr<torrent> t = m_torrent.lock(); 1170 if (!t) return; 1171 t->trancieve_ip_packet(bytes, ipv6); 1172 } 1173 sent_syn(bool const ipv6)1174 void peer_connection::sent_syn(bool const ipv6) 1175 { 1176 TORRENT_ASSERT(is_single_thread()); 1177 m_statistics.sent_syn(ipv6); 1178 if (m_ignore_stats) return; 1179 std::shared_ptr<torrent> t = m_torrent.lock(); 1180 if (!t) return; 1181 t->sent_syn(ipv6); 1182 } 1183 received_synack(bool const ipv6)1184 void peer_connection::received_synack(bool const ipv6) 1185 { 1186 TORRENT_ASSERT(is_single_thread()); 1187 m_statistics.received_synack(ipv6); 1188 if (m_ignore_stats) return; 1189 std::shared_ptr<torrent> t = m_torrent.lock(); 1190 if (!t) return; 1191 t->received_synack(ipv6); 1192 } 1193 get_bitfield() const1194 typed_bitfield<piece_index_t> const& peer_connection::get_bitfield() const 1195 { 1196 TORRENT_ASSERT(is_single_thread()); 1197 return m_have_piece; 1198 } 1199 received_valid_data(piece_index_t const index)1200 void peer_connection::received_valid_data(piece_index_t const index) 1201 { 1202 TORRENT_ASSERT(is_single_thread()); 1203 // this fails because we haven't had time to disconnect 1204 // seeds yet, and we might have just become one 1205 // INVARIANT_CHECK; 1206 1207 #ifndef TORRENT_DISABLE_EXTENSIONS 1208 for (auto const& e : m_extensions) 1209 { 1210 e->on_piece_pass(index); 1211 } 1212 #else 1213 TORRENT_UNUSED(index); 1214 #endif 1215 } 1216 1217 // single_peer is true if the entire piece was received by a single 1218 // peer received_invalid_data(piece_index_t const index,bool single_peer)1219 bool peer_connection::received_invalid_data(piece_index_t const index, bool single_peer) 1220 { 1221 TORRENT_ASSERT(is_single_thread()); 1222 INVARIANT_CHECK; 1223 TORRENT_UNUSED(single_peer); 1224 1225 #ifndef TORRENT_DISABLE_EXTENSIONS 1226 for (auto const& e : m_extensions) 1227 { 1228 e->on_piece_failed(index); 1229 } 1230 #else 1231 TORRENT_UNUSED(index); 1232 #endif 1233 return true; 1234 } 1235 1236 // verifies a piece to see if it is valid (is within a valid range) 1237 // and if it can correspond to a request generated by libtorrent. verify_piece(peer_request const & p) const1238 bool peer_connection::verify_piece(peer_request const& p) const 1239 { 1240 TORRENT_ASSERT(is_single_thread()); 1241 std::shared_ptr<torrent> t = m_torrent.lock(); 1242 TORRENT_ASSERT(t); 1243 1244 TORRENT_ASSERT(t->valid_metadata()); 1245 torrent_info const& ti = t->torrent_file(); 1246 1247 return p.piece >= piece_index_t(0) 1248 && p.piece < ti.end_piece() 1249 && p.start >= 0 1250 && p.start < ti.piece_length() 1251 && t->to_req(piece_block(p.piece, p.start / t->block_size())) == p; 1252 } 1253 attach_to_torrent(sha1_hash const & ih)1254 void peer_connection::attach_to_torrent(sha1_hash const& ih) 1255 { 1256 TORRENT_ASSERT(is_single_thread()); 1257 INVARIANT_CHECK; 1258 1259 #ifndef TORRENT_DISABLE_LOGGING 1260 peer_log(peer_log_alert::info, "ATTACH", "attached to torrent"); 1261 #endif 1262 1263 TORRENT_ASSERT(!m_disconnecting); 1264 TORRENT_ASSERT(m_torrent.expired()); 1265 std::weak_ptr<torrent> wpt = m_ses.find_torrent(ih); 1266 std::shared_ptr<torrent> t = wpt.lock(); 1267 1268 if (t && t->is_aborted()) 1269 { 1270 #ifndef TORRENT_DISABLE_LOGGING 1271 peer_log(peer_log_alert::info, "ATTACH", "the torrent has been aborted"); 1272 #endif 1273 t.reset(); 1274 } 1275 1276 if (!t) 1277 { 1278 t = m_ses.delay_load_torrent(ih, this); 1279 #ifndef TORRENT_DISABLE_LOGGING 1280 if (t && should_log(peer_log_alert::info)) 1281 { 1282 peer_log(peer_log_alert::info, "ATTACH" 1283 , "Delay loaded torrent: %s:", aux::to_hex(ih).c_str()); 1284 } 1285 #endif 1286 } 1287 1288 if (!t) 1289 { 1290 // we couldn't find the torrent! 1291 #ifndef TORRENT_DISABLE_LOGGING 1292 if (should_log(peer_log_alert::info)) 1293 { 1294 peer_log(peer_log_alert::info, "ATTACH" 1295 , "couldn't find a torrent with the given info_hash: %s torrents:" 1296 , aux::to_hex(ih).c_str()); 1297 } 1298 #endif 1299 1300 #ifndef TORRENT_DISABLE_DHT 1301 if (dht::verify_secret_id(ih)) 1302 { 1303 // this means the hash was generated from our generate_secret_id() 1304 // as part of DHT traffic. The fact that we got an incoming 1305 // connection on this info-hash, means the other end, making this 1306 // connection fished it out of the DHT chatter. That's suspicious. 1307 m_ses.ban_ip(m_remote.address()); 1308 } 1309 #endif 1310 disconnect(errors::invalid_info_hash, operation_t::bittorrent, failure); 1311 return; 1312 } 1313 1314 if (t->is_paused() 1315 && t->is_auto_managed() 1316 && m_settings.get_bool(settings_pack::incoming_starts_queued_torrents) 1317 && !t->is_aborted()) 1318 { 1319 t->resume(); 1320 } 1321 1322 if (t->is_paused() || t->is_aborted() || t->graceful_pause()) 1323 { 1324 // paused torrents will not accept 1325 // incoming connections unless they are auto managed 1326 // and incoming_starts_queued_torrents is true 1327 // torrents that have errors should always reject 1328 // incoming peers 1329 #ifndef TORRENT_DISABLE_LOGGING 1330 peer_log(peer_log_alert::info, "ATTACH", "rejected connection to paused torrent"); 1331 #endif 1332 disconnect(errors::torrent_paused, operation_t::bittorrent, peer_error); 1333 return; 1334 } 1335 1336 #if TORRENT_USE_I2P 1337 auto* i2ps = m_socket->get<i2p_stream>(); 1338 if (!i2ps && t->torrent_file().is_i2p() 1339 && !m_settings.get_bool(settings_pack::allow_i2p_mixed)) 1340 { 1341 // the torrent is an i2p torrent, the peer is a regular peer 1342 // and we don't allow mixed mode. Disconnect the peer. 1343 #ifndef TORRENT_DISABLE_LOGGING 1344 peer_log(peer_log_alert::info, "ATTACH", "rejected regular connection to i2p torrent"); 1345 #endif 1346 disconnect(errors::peer_banned, operation_t::bittorrent, peer_error); 1347 return; 1348 } 1349 #endif // TORRENT_USE_I2P 1350 1351 TORRENT_ASSERT(m_torrent.expired()); 1352 1353 // check to make sure we don't have another connection with the same 1354 // info_hash and peer_id. If we do. close this connection. 1355 t->attach_peer(this); 1356 if (m_disconnecting) return; 1357 // it's important to assign the torrent after successfully attaching. 1358 // if the peer disconnects while attaching, it's not a proper member 1359 // of the torrent and peer_connection::disconnect() will fail if it 1360 // think it is 1361 m_torrent = t; 1362 1363 if (m_exceeded_limit) 1364 { 1365 // find a peer in some torrent (presumably the one with most peers) 1366 // and disconnect the lowest ranking peer 1367 std::weak_ptr<torrent> torr = m_ses.find_disconnect_candidate_torrent(); 1368 std::shared_ptr<torrent> other_t = torr.lock(); 1369 1370 if (other_t) 1371 { 1372 if (other_t->num_peers() <= t->num_peers()) 1373 { 1374 disconnect(errors::too_many_connections, operation_t::bittorrent); 1375 return; 1376 } 1377 // find the lowest ranking peer and disconnect that 1378 peer_connection* p = other_t->find_lowest_ranking_peer(); 1379 if (p != nullptr) 1380 { 1381 p->disconnect(errors::too_many_connections, operation_t::bittorrent); 1382 peer_disconnected_other(); 1383 } 1384 else 1385 { 1386 disconnect(errors::too_many_connections, operation_t::bittorrent); 1387 return; 1388 } 1389 } 1390 else 1391 { 1392 disconnect(errors::too_many_connections, operation_t::bittorrent); 1393 return; 1394 } 1395 } 1396 1397 TORRENT_ASSERT(!m_torrent.expired()); 1398 1399 // if the torrent isn't ready to accept 1400 // connections yet, we'll have to wait with 1401 // our initialization 1402 if (t->ready_for_connections()) init(); 1403 1404 TORRENT_ASSERT(!m_torrent.expired()); 1405 1406 // assume the other end has no pieces 1407 // if we don't have valid metadata yet, 1408 // leave the vector unallocated 1409 TORRENT_ASSERT(m_num_pieces == 0); 1410 m_have_piece.clear_all(); 1411 TORRENT_ASSERT(!m_torrent.expired()); 1412 } 1413 peer_rank() const1414 std::uint32_t peer_connection::peer_rank() const 1415 { 1416 TORRENT_ASSERT(is_single_thread()); 1417 return m_peer_info == nullptr ? 0 1418 : m_peer_info->rank(m_ses.external_address(), m_ses.listen_port()); 1419 } 1420 1421 // message handlers 1422 1423 // ----------------------------- 1424 // --------- KEEPALIVE --------- 1425 // ----------------------------- 1426 incoming_keepalive()1427 void peer_connection::incoming_keepalive() 1428 { 1429 TORRENT_ASSERT(is_single_thread()); 1430 INVARIANT_CHECK; 1431 1432 #ifndef TORRENT_DISABLE_LOGGING 1433 peer_log(peer_log_alert::incoming_message, "KEEPALIVE"); 1434 #endif 1435 } 1436 1437 // ----------------------------- 1438 // ----------- CHOKE ----------- 1439 // ----------------------------- 1440 set_endgame(bool b)1441 void peer_connection::set_endgame(bool b) 1442 { 1443 TORRENT_ASSERT(is_single_thread()); 1444 if (m_endgame_mode == b) return; 1445 m_endgame_mode = b; 1446 if (m_endgame_mode) 1447 m_counters.inc_stats_counter(counters::num_peers_end_game); 1448 else 1449 m_counters.inc_stats_counter(counters::num_peers_end_game, -1); 1450 } 1451 incoming_choke()1452 void peer_connection::incoming_choke() 1453 { 1454 TORRENT_ASSERT(is_single_thread()); 1455 INVARIANT_CHECK; 1456 1457 #ifndef TORRENT_DISABLE_EXTENSIONS 1458 for (auto const& e : m_extensions) 1459 { 1460 if (e->on_choke()) return; 1461 } 1462 #endif 1463 if (is_disconnecting()) return; 1464 1465 #ifndef TORRENT_DISABLE_LOGGING 1466 peer_log(peer_log_alert::incoming_message, "CHOKE"); 1467 #endif 1468 if (m_peer_choked == false) 1469 m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1); 1470 1471 m_peer_choked = true; 1472 set_endgame(false); 1473 1474 clear_request_queue(); 1475 } 1476 clear_request_queue()1477 void peer_connection::clear_request_queue() 1478 { 1479 TORRENT_ASSERT(is_single_thread()); 1480 std::shared_ptr<torrent> t = m_torrent.lock(); 1481 TORRENT_ASSERT(t); 1482 if (!t->has_picker()) 1483 { 1484 m_request_queue.clear(); 1485 return; 1486 } 1487 1488 // clear the requests that haven't been sent yet 1489 if (peer_info_struct() == nullptr || !peer_info_struct()->on_parole) 1490 { 1491 // if the peer is not in parole mode, clear the queued 1492 // up block requests 1493 piece_picker& p = t->picker(); 1494 for (auto const& r : m_request_queue) 1495 { 1496 p.abort_download(r.block, peer_info_struct()); 1497 } 1498 m_request_queue.clear(); 1499 m_queued_time_critical = 0; 1500 } 1501 } 1502 clear_download_queue()1503 void peer_connection::clear_download_queue() 1504 { 1505 std::shared_ptr<torrent> t = m_torrent.lock(); 1506 piece_picker& picker = t->picker(); 1507 torrent_peer* self_peer = peer_info_struct(); 1508 while (!m_download_queue.empty()) 1509 { 1510 pending_block& qe = m_download_queue.back(); 1511 if (!qe.timed_out && !qe.not_wanted) 1512 picker.abort_download(qe.block, self_peer); 1513 m_outstanding_bytes -= t->to_req(qe.block).length; 1514 if (m_outstanding_bytes < 0) m_outstanding_bytes = 0; 1515 m_download_queue.pop_back(); 1516 } 1517 } 1518 1519 // ----------------------------- 1520 // -------- REJECT PIECE ------- 1521 // ----------------------------- 1522 incoming_reject_request(peer_request const & r)1523 void peer_connection::incoming_reject_request(peer_request const& r) 1524 { 1525 TORRENT_ASSERT(is_single_thread()); 1526 INVARIANT_CHECK; 1527 1528 std::shared_ptr<torrent> t = m_torrent.lock(); 1529 TORRENT_ASSERT(t); 1530 1531 #ifndef TORRENT_DISABLE_LOGGING 1532 peer_log(peer_log_alert::incoming_message, "REJECT_PIECE", "piece: %d s: %x l: %x" 1533 , static_cast<int>(r.piece), r.start, r.length); 1534 #endif 1535 1536 #ifndef TORRENT_DISABLE_EXTENSIONS 1537 for (auto const& e : m_extensions) 1538 { 1539 if (e->on_reject(r)) return; 1540 } 1541 #endif 1542 1543 if (is_disconnecting()) return; 1544 1545 int const block_size = t->block_size(); 1546 if (r.piece < piece_index_t{} 1547 || r.piece >= t->torrent_file().files().end_piece() 1548 || r.start < 0 1549 || r.start >= t->torrent_file().piece_length() 1550 || (r.start % block_size) != 0 1551 || r.length != std::min(t->torrent_file().piece_size(r.piece) - r.start, block_size)) 1552 { 1553 #ifndef TORRENT_DISABLE_LOGGING 1554 peer_log(peer_log_alert::info, "REJECT_PIECE", "invalid reject message (%d, %d, %d)" 1555 , int(r.piece), int(r.start), int(r.length)); 1556 #endif 1557 return; 1558 } 1559 1560 auto const dlq_iter = std::find_if( 1561 m_download_queue.begin(), m_download_queue.end() 1562 , [&r, block_size](pending_block const& pb) 1563 { 1564 auto const& b = pb.block; 1565 if (b.piece_index != r.piece) return false; 1566 if (b.block_index != r.start / block_size) return false; 1567 return true; 1568 }); 1569 1570 if (dlq_iter != m_download_queue.end()) 1571 { 1572 pending_block const b = *dlq_iter; 1573 bool const remove_from_picker = !dlq_iter->timed_out && !dlq_iter->not_wanted; 1574 m_download_queue.erase(dlq_iter); 1575 TORRENT_ASSERT(m_outstanding_bytes >= r.length); 1576 m_outstanding_bytes -= r.length; 1577 if (m_outstanding_bytes < 0) m_outstanding_bytes = 0; 1578 1579 if (m_download_queue.empty()) 1580 m_counters.inc_stats_counter(counters::num_peers_down_requests, -1); 1581 1582 // if the peer is in parole mode, keep the request 1583 if (peer_info_struct() && peer_info_struct()->on_parole) 1584 { 1585 // we should only add it if the block is marked as 1586 // busy in the piece-picker 1587 if (remove_from_picker) 1588 m_request_queue.insert(m_request_queue.begin(), b); 1589 } 1590 else if (!t->is_seed() && remove_from_picker) 1591 { 1592 piece_picker& p = t->picker(); 1593 p.abort_download(b.block, peer_info_struct()); 1594 } 1595 #if TORRENT_USE_INVARIANT_CHECKS 1596 check_invariant(); 1597 #endif 1598 } 1599 #ifndef TORRENT_DISABLE_LOGGING 1600 else 1601 { 1602 peer_log(peer_log_alert::info, "REJECT_PIECE", "piece not in request queue (%d, %d, %d)" 1603 , int(r.piece), int(r.start), int(r.length)); 1604 } 1605 #endif 1606 if (has_peer_choked()) 1607 { 1608 // if we're choked and we got a rejection of 1609 // a piece in the allowed fast set, remove it 1610 // from the allow fast set. 1611 auto const i = std::find(m_allowed_fast.begin(), m_allowed_fast.end(), r.piece); 1612 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i); 1613 } 1614 else 1615 { 1616 auto const i = std::find(m_suggested_pieces.begin(), m_suggested_pieces.end(), r.piece); 1617 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i); 1618 } 1619 1620 check_graceful_pause(); 1621 if (is_disconnecting()) return; 1622 1623 if (m_request_queue.empty() && m_download_queue.size() < 2) 1624 { 1625 if (request_a_block(*t, *this)) 1626 m_counters.inc_stats_counter(counters::reject_piece_picks); 1627 } 1628 1629 send_block_requests(); 1630 } 1631 1632 // ----------------------------- 1633 // ------- SUGGEST PIECE ------- 1634 // ----------------------------- 1635 incoming_suggest(piece_index_t const index)1636 void peer_connection::incoming_suggest(piece_index_t const index) 1637 { 1638 TORRENT_ASSERT(is_single_thread()); 1639 INVARIANT_CHECK; 1640 1641 #ifndef TORRENT_DISABLE_LOGGING 1642 peer_log(peer_log_alert::incoming_message, "SUGGEST_PIECE" 1643 , "piece: %d", static_cast<int>(index)); 1644 #endif 1645 std::shared_ptr<torrent> t = m_torrent.lock(); 1646 if (!t) return; 1647 1648 #ifndef TORRENT_DISABLE_EXTENSIONS 1649 for (auto const& e : m_extensions) 1650 { 1651 if (e->on_suggest(index)) return; 1652 } 1653 #endif 1654 1655 if (is_disconnecting()) return; 1656 if (index < piece_index_t(0)) 1657 { 1658 #ifndef TORRENT_DISABLE_LOGGING 1659 peer_log(peer_log_alert::incoming_message, "INVALID_SUGGEST_PIECE" 1660 , "%d", static_cast<int>(index)); 1661 #endif 1662 return; 1663 } 1664 1665 if (t->valid_metadata()) 1666 { 1667 if (index >= m_have_piece.end_index()) 1668 { 1669 #ifndef TORRENT_DISABLE_LOGGING 1670 peer_log(peer_log_alert::incoming_message, "INVALID_SUGGEST" 1671 , "%d s: %d", static_cast<int>(index), m_have_piece.size()); 1672 #endif 1673 return; 1674 } 1675 1676 // if we already have the piece, we can 1677 // ignore this message 1678 if (t->have_piece(index)) 1679 return; 1680 } 1681 1682 // the piece picker will prioritize the pieces from the beginning to end. 1683 // the later the suggestion is received, the higher priority we should 1684 // ascribe to it, so we need to insert suggestions at the front of the 1685 // queue. 1686 if (m_suggested_pieces.end_index() > m_settings.get_int(settings_pack::max_suggest_pieces)) 1687 m_suggested_pieces.resize(m_settings.get_int(settings_pack::max_suggest_pieces) - 1); 1688 1689 m_suggested_pieces.insert(m_suggested_pieces.begin(), index); 1690 1691 #ifndef TORRENT_DISABLE_LOGGING 1692 peer_log(peer_log_alert::info, "SUGGEST_PIECE", "piece: %d added to set: %d" 1693 , static_cast<int>(index), m_suggested_pieces.end_index()); 1694 #endif 1695 } 1696 1697 // ----------------------------- 1698 // ---------- UNCHOKE ---------- 1699 // ----------------------------- 1700 incoming_unchoke()1701 void peer_connection::incoming_unchoke() 1702 { 1703 TORRENT_ASSERT(is_single_thread()); 1704 INVARIANT_CHECK; 1705 1706 std::shared_ptr<torrent> t = m_torrent.lock(); 1707 TORRENT_ASSERT(t); 1708 1709 #ifndef TORRENT_DISABLE_EXTENSIONS 1710 for (auto const& e : m_extensions) 1711 { 1712 if (e->on_unchoke()) return; 1713 } 1714 #endif 1715 1716 #ifndef TORRENT_DISABLE_LOGGING 1717 peer_log(peer_log_alert::incoming_message, "UNCHOKE"); 1718 #endif 1719 if (m_peer_choked) 1720 m_counters.inc_stats_counter(counters::num_peers_down_unchoked); 1721 1722 m_peer_choked = false; 1723 m_last_unchoked = aux::time_now(); 1724 if (is_disconnecting()) return; 1725 1726 if (is_interesting()) 1727 { 1728 if (request_a_block(*t, *this)) 1729 m_counters.inc_stats_counter(counters::unchoke_piece_picks); 1730 send_block_requests(); 1731 } 1732 } 1733 1734 // ----------------------------- 1735 // -------- INTERESTED --------- 1736 // ----------------------------- 1737 incoming_interested()1738 void peer_connection::incoming_interested() 1739 { 1740 TORRENT_ASSERT(is_single_thread()); 1741 INVARIANT_CHECK; 1742 1743 std::shared_ptr<torrent> t = m_torrent.lock(); 1744 TORRENT_ASSERT(t); 1745 1746 #ifndef TORRENT_DISABLE_EXTENSIONS 1747 for (auto const& e : m_extensions) 1748 { 1749 if (e->on_interested()) return; 1750 } 1751 #endif 1752 1753 #ifndef TORRENT_DISABLE_LOGGING 1754 peer_log(peer_log_alert::incoming_message, "INTERESTED"); 1755 #endif 1756 if (m_peer_interested == false) 1757 { 1758 m_counters.inc_stats_counter(counters::num_peers_up_interested); 1759 m_peer_interested = true; 1760 } 1761 if (is_disconnecting()) return; 1762 1763 // if the peer is ready to download stuff, it must have metadata 1764 m_has_metadata = true; 1765 1766 disconnect_if_redundant(); 1767 if (is_disconnecting()) return; 1768 1769 if (t->graceful_pause()) 1770 { 1771 #ifndef TORRENT_DISABLE_LOGGING 1772 peer_log(peer_log_alert::info, "UNCHOKE" 1773 , "did not unchoke, graceful pause mode"); 1774 #endif 1775 return; 1776 } 1777 1778 if (!is_choked()) 1779 { 1780 // the reason to send an extra unchoke message here is that 1781 // because of the handshake-round-trip optimization, we may 1782 // end up sending an unchoke before the other end sends us 1783 // an interested message. This may confuse clients, not reacting 1784 // to the first unchoke, and then not check whether it's unchoked 1785 // when sending the interested message. If the other end's client 1786 // has this problem, sending another unchoke here will kick it 1787 // to react to the fact that it's unchoked. 1788 #ifndef TORRENT_DISABLE_LOGGING 1789 peer_log(peer_log_alert::info, "UNCHOKE", "sending redundant unchoke"); 1790 #endif 1791 write_unchoke(); 1792 return; 1793 } 1794 1795 maybe_unchoke_this_peer(); 1796 } 1797 maybe_unchoke_this_peer()1798 void peer_connection::maybe_unchoke_this_peer() 1799 { 1800 TORRENT_ASSERT(is_single_thread()); 1801 if (ignore_unchoke_slots()) 1802 { 1803 #ifndef TORRENT_DISABLE_LOGGING 1804 peer_log(peer_log_alert::info, "UNCHOKE", "about to unchoke, peer ignores unchoke slots"); 1805 #endif 1806 // if this peer is exempted from the choker 1807 // just unchoke it immediately 1808 send_unchoke(); 1809 } 1810 else if (m_ses.preemptive_unchoke()) 1811 { 1812 // if the peer is choked and we have upload slots left, 1813 // then unchoke it. 1814 1815 std::shared_ptr<torrent> t = m_torrent.lock(); 1816 TORRENT_ASSERT(t); 1817 1818 t->unchoke_peer(*this); 1819 } 1820 #ifndef TORRENT_DISABLE_LOGGING 1821 else if (should_log(peer_log_alert::info)) 1822 { 1823 peer_log(peer_log_alert::info, "UNCHOKE", "did not unchoke, the number of uploads (%d) " 1824 "is more than or equal to the available slots (%d), limit (%d)" 1825 , int(m_counters[counters::num_peers_up_unchoked]) 1826 , int(m_counters[counters::num_unchoke_slots]) 1827 , m_settings.get_int(settings_pack::unchoke_slots_limit)); 1828 } 1829 #endif 1830 } 1831 1832 // ----------------------------- 1833 // ------ NOT INTERESTED ------- 1834 // ----------------------------- 1835 incoming_not_interested()1836 void peer_connection::incoming_not_interested() 1837 { 1838 TORRENT_ASSERT(is_single_thread()); 1839 INVARIANT_CHECK; 1840 1841 #ifndef TORRENT_DISABLE_EXTENSIONS 1842 for (auto const& e : m_extensions) 1843 { 1844 if (e->on_not_interested()) return; 1845 } 1846 #endif 1847 1848 #ifndef TORRENT_DISABLE_LOGGING 1849 peer_log(peer_log_alert::incoming_message, "NOT_INTERESTED"); 1850 #endif 1851 if (m_peer_interested) 1852 { 1853 m_counters.inc_stats_counter(counters::num_peers_up_interested, -1); 1854 m_became_uninterested = aux::time_now(); 1855 m_peer_interested = false; 1856 } 1857 1858 if (is_disconnecting()) return; 1859 1860 std::shared_ptr<torrent> t = m_torrent.lock(); 1861 TORRENT_ASSERT(t); 1862 1863 choke_this_peer(); 1864 } 1865 choke_this_peer()1866 void peer_connection::choke_this_peer() 1867 { 1868 TORRENT_ASSERT(is_single_thread()); 1869 if (is_choked()) return; 1870 if (ignore_unchoke_slots()) 1871 { 1872 send_choke(); 1873 return; 1874 } 1875 1876 std::shared_ptr<torrent> t = m_torrent.lock(); 1877 TORRENT_ASSERT(t); 1878 1879 if (m_peer_info && m_peer_info->optimistically_unchoked) 1880 { 1881 m_peer_info->optimistically_unchoked = false; 1882 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); 1883 t->trigger_optimistic_unchoke(); 1884 } 1885 t->choke_peer(*this); 1886 t->trigger_unchoke(); 1887 } 1888 1889 // ----------------------------- 1890 // ----------- HAVE ------------ 1891 // ----------------------------- 1892 incoming_have(piece_index_t const index)1893 void peer_connection::incoming_have(piece_index_t const index) 1894 { 1895 TORRENT_ASSERT(is_single_thread()); 1896 INVARIANT_CHECK; 1897 1898 std::shared_ptr<torrent> t = m_torrent.lock(); 1899 TORRENT_ASSERT(t); 1900 1901 #ifndef TORRENT_DISABLE_EXTENSIONS 1902 for (auto const& e : m_extensions) 1903 { 1904 if (e->on_have(index)) return; 1905 } 1906 #endif 1907 1908 if (is_disconnecting()) return; 1909 1910 // if we haven't received a bitfield, it was 1911 // probably omitted, which is the same as 'have_none' 1912 if (!m_bitfield_received) incoming_have_none(); 1913 1914 // if this peer is choked, there's no point in sending suggest messages to 1915 // it. They would just be out-of-date by the time we unchoke the peer 1916 // anyway. 1917 if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache 1918 && !is_choked() 1919 && std::any_of(m_suggest_pieces.begin(), m_suggest_pieces.end() 1920 , [=](piece_index_t const idx) { return idx == index; })) 1921 { 1922 send_piece_suggestions(2); 1923 } 1924 1925 #ifndef TORRENT_DISABLE_LOGGING 1926 peer_log(peer_log_alert::incoming_message, "HAVE", "piece: %d" 1927 , static_cast<int>(index)); 1928 #endif 1929 1930 if (is_disconnecting()) return; 1931 1932 if (!t->valid_metadata() && index >= m_have_piece.end_index()) 1933 { 1934 if (index < piece_index_t(0x200000)) 1935 { 1936 // if we don't have metadata 1937 // and we might not have received a bitfield 1938 // extend the bitmask to fit the new 1939 // have message 1940 m_have_piece.resize(static_cast<int>(index) + 1, false); 1941 } 1942 else 1943 { 1944 // unless the index > 64k, in which case 1945 // we just ignore it 1946 return; 1947 } 1948 } 1949 1950 // if we got an invalid message, abort 1951 if (index >= m_have_piece.end_index() || index < piece_index_t(0)) 1952 { 1953 #ifndef TORRENT_DISABLE_LOGGING 1954 peer_log(peer_log_alert::info, "ERROR", "have-metadata have_piece: %d size: %d" 1955 , static_cast<int>(index), m_have_piece.size()); 1956 #endif 1957 disconnect(errors::invalid_have, operation_t::bittorrent, peer_error); 1958 return; 1959 } 1960 1961 #ifndef TORRENT_DISABLE_SUPERSEEDING 1962 if (t->super_seeding() 1963 #if TORRENT_ABI_VERSION == 1 1964 && !m_settings.get_bool(settings_pack::strict_super_seeding) 1965 #endif 1966 ) 1967 { 1968 // if we're super-seeding and the peer just told 1969 // us that it completed the piece we're super-seeding 1970 // to it, change the super-seeding piece for this peer 1971 // if the peer optimizes out redundant have messages 1972 // this will be handled when the peer sends not-interested 1973 // instead. 1974 if (super_seeded_piece(index)) 1975 { 1976 superseed_piece(index, t->get_piece_to_super_seed(m_have_piece)); 1977 } 1978 } 1979 #endif 1980 1981 if (m_have_piece[index]) 1982 { 1983 #ifndef TORRENT_DISABLE_LOGGING 1984 peer_log(peer_log_alert::incoming, "HAVE" 1985 , "got redundant HAVE message for index: %d" 1986 , static_cast<int>(index)); 1987 #endif 1988 return; 1989 } 1990 1991 m_have_piece.set_bit(index); 1992 ++m_num_pieces; 1993 1994 // if the peer is downloading stuff, it must have metadata 1995 m_has_metadata = true; 1996 1997 // only update the piece_picker if 1998 // we have the metadata and if 1999 // we're not a seed (in which case 2000 // we won't have a piece picker) 2001 if (!t->valid_metadata()) return; 2002 2003 t->peer_has(index, this); 2004 2005 // it's important to not disconnect before we have 2006 // updated the piece picker, otherwise we will incorrectly 2007 // decrement the piece count without first incrementing it 2008 if (is_seed()) 2009 { 2010 #ifndef TORRENT_DISABLE_LOGGING 2011 peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p" 2012 , static_cast<void*>(m_peer_info)); 2013 #endif 2014 2015 TORRENT_ASSERT(t->ready_for_connections()); 2016 TORRENT_ASSERT(m_have_piece.all_set()); 2017 TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size()); 2018 TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces()); 2019 2020 t->seen_complete(); 2021 t->set_seed(m_peer_info, true); 2022 TORRENT_ASSERT(is_seed()); 2023 m_upload_only = true; 2024 2025 #if TORRENT_USE_INVARIANT_CHECKS 2026 if (t && t->has_picker()) 2027 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 2028 #endif 2029 if (disconnect_if_redundant()) return; 2030 } 2031 2032 // it's important to update whether we're interested in this peer before 2033 // calling disconnect_if_redundant, otherwise we may disconnect even if 2034 // we are interested 2035 if (!t->has_piece_passed(index) 2036 && !t->is_upload_only() 2037 && !is_interesting() 2038 && (!t->has_picker() || t->picker().piece_priority(index) != dont_download)) 2039 t->peer_is_interesting(*this); 2040 2041 disconnect_if_redundant(); 2042 if (is_disconnecting()) return; 2043 2044 #ifndef TORRENT_DISABLE_SUPERSEEDING 2045 #if TORRENT_ABI_VERSION == 1 2046 // if we're super seeding, this might mean that somebody 2047 // forwarded this piece. In which case we need to give 2048 // a new piece to that peer 2049 if (t->super_seeding() 2050 && m_settings.get_bool(settings_pack::strict_super_seeding) 2051 && (!super_seeded_piece(index) || t->num_peers() == 1)) 2052 { 2053 for (auto& p : *t) 2054 { 2055 if (!p->super_seeded_piece(index)) continue; 2056 if (!p->has_piece(index)) continue; 2057 p->superseed_piece(index, t->get_piece_to_super_seed(p->get_bitfield())); 2058 } 2059 } 2060 #endif // TORRENT_ABI_VERSION 2061 #endif // TORRENT_DISABLE_SUPERSEEDING 2062 } 2063 2064 // ----------------------------- 2065 // -------- DONT HAVE ---------- 2066 // ----------------------------- 2067 incoming_dont_have(piece_index_t const index)2068 void peer_connection::incoming_dont_have(piece_index_t const index) 2069 { 2070 TORRENT_ASSERT(is_single_thread()); 2071 INVARIANT_CHECK; 2072 2073 std::shared_ptr<torrent> t = m_torrent.lock(); 2074 TORRENT_ASSERT(t); 2075 2076 if (index < piece_index_t{} 2077 || index >= t->torrent_file().end_piece()) 2078 { 2079 #ifndef TORRENT_DISABLE_LOGGING 2080 peer_log(peer_log_alert::incoming, "DONT_HAVE" 2081 , "invalid piece: %d", static_cast<int>(index)); 2082 #endif 2083 return; 2084 } 2085 2086 #ifndef TORRENT_DISABLE_EXTENSIONS 2087 for (auto const& e : m_extensions) 2088 { 2089 if (e->on_dont_have(index)) return; 2090 } 2091 #endif 2092 2093 if (is_disconnecting()) return; 2094 2095 #ifndef TORRENT_DISABLE_LOGGING 2096 peer_log(peer_log_alert::incoming_message, "DONT_HAVE", "piece: %d" 2097 , static_cast<int>(index)); 2098 #endif 2099 2100 // if we got an invalid message, abort 2101 if (index >= m_have_piece.end_index() || index < piece_index_t(0)) 2102 { 2103 disconnect(errors::invalid_dont_have, operation_t::bittorrent, peer_error); 2104 return; 2105 } 2106 2107 if (!m_have_piece[index]) 2108 { 2109 #ifndef TORRENT_DISABLE_LOGGING 2110 peer_log(peer_log_alert::incoming, "DONT_HAVE" 2111 , "got redundant DONT_HAVE message for index: %d" 2112 , static_cast<int>(index)); 2113 #endif 2114 return; 2115 } 2116 2117 bool const was_seed = is_seed(); 2118 m_have_piece.clear_bit(index); 2119 TORRENT_ASSERT(m_num_pieces > 0); 2120 --m_num_pieces; 2121 m_have_all = false; 2122 2123 // only update the piece_picker if 2124 // we have the metadata and if 2125 // we're not a seed (in which case 2126 // we won't have a piece picker) 2127 if (!t->valid_metadata()) return; 2128 2129 t->peer_lost(index, this); 2130 2131 if (was_seed) 2132 { 2133 t->set_seed(m_peer_info, false); 2134 TORRENT_ASSERT(!is_seed()); 2135 } 2136 } 2137 2138 // ----------------------------- 2139 // --------- BITFIELD ---------- 2140 // ----------------------------- 2141 incoming_bitfield(typed_bitfield<piece_index_t> const & bits)2142 void peer_connection::incoming_bitfield(typed_bitfield<piece_index_t> const& bits) 2143 { 2144 TORRENT_ASSERT(is_single_thread()); 2145 INVARIANT_CHECK; 2146 2147 std::shared_ptr<torrent> t = m_torrent.lock(); 2148 TORRENT_ASSERT(t); 2149 2150 #ifndef TORRENT_DISABLE_EXTENSIONS 2151 for (auto const& e : m_extensions) 2152 { 2153 if (e->on_bitfield(bits)) return; 2154 } 2155 #endif 2156 2157 if (is_disconnecting()) return; 2158 2159 #ifndef TORRENT_DISABLE_LOGGING 2160 if (should_log(peer_log_alert::incoming_message)) 2161 { 2162 std::string bitfield_str; 2163 bitfield_str.resize(aux::numeric_cast<std::size_t>(bits.size())); 2164 for (auto const i : bits.range()) 2165 bitfield_str[std::size_t(static_cast<int>(i))] = bits[i] ? '1' : '0'; 2166 peer_log(peer_log_alert::incoming_message, "BITFIELD" 2167 , "%s", bitfield_str.c_str()); 2168 } 2169 #endif 2170 2171 // if we don't have the metadata, we cannot 2172 // verify the bitfield size 2173 if (t->valid_metadata() 2174 && bits.size() != m_have_piece.size()) 2175 { 2176 #ifndef TORRENT_DISABLE_LOGGING 2177 if (should_log(peer_log_alert::incoming_message)) 2178 { 2179 peer_log(peer_log_alert::incoming_message, "BITFIELD" 2180 , "invalid size: %d expected %d", bits.size() 2181 , m_have_piece.size()); 2182 } 2183 #endif 2184 disconnect(errors::invalid_bitfield_size, operation_t::bittorrent, peer_error); 2185 return; 2186 } 2187 2188 if (m_bitfield_received) 2189 { 2190 // if we've already received a bitfield message 2191 // we first need to count down all the pieces 2192 // we believe the peer has first 2193 t->peer_lost(m_have_piece, this); 2194 } 2195 2196 m_bitfield_received = true; 2197 2198 // if we don't have metadata yet 2199 // just remember the bitmask 2200 // don't update the piecepicker 2201 // (since it doesn't exist yet) 2202 if (!t->ready_for_connections()) 2203 { 2204 #ifndef TORRENT_DISABLE_LOGGING 2205 if (m_num_pieces == bits.size()) 2206 peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p" 2207 , static_cast<void*>(m_peer_info)); 2208 #endif 2209 m_have_piece = bits; 2210 m_num_pieces = bits.count(); 2211 t->set_seed(m_peer_info, m_num_pieces == bits.size()); 2212 TORRENT_ASSERT(is_seed() == (m_num_pieces == bits.size())); 2213 2214 #if TORRENT_USE_INVARIANT_CHECKS 2215 if (t && t->has_picker()) 2216 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 2217 #endif 2218 return; 2219 } 2220 2221 TORRENT_ASSERT(t->valid_metadata()); 2222 2223 int const num_pieces = bits.count(); 2224 t->set_seed(m_peer_info, num_pieces == m_have_piece.size()); 2225 if (num_pieces == m_have_piece.size()) 2226 { 2227 #ifndef TORRENT_DISABLE_LOGGING 2228 peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p" 2229 , static_cast<void*>(m_peer_info)); 2230 #endif 2231 m_upload_only = true; 2232 2233 m_have_piece.set_all(); 2234 m_num_pieces = num_pieces; 2235 t->peer_has_all(this); 2236 TORRENT_ASSERT(is_seed()); 2237 2238 TORRENT_ASSERT(m_have_piece.all_set()); 2239 TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size()); 2240 TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces()); 2241 2242 #if TORRENT_USE_INVARIANT_CHECKS 2243 if (t && t->has_picker()) 2244 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 2245 #endif 2246 2247 // this will cause us to send the INTERESTED message 2248 if (!t->is_upload_only()) 2249 t->peer_is_interesting(*this); 2250 2251 disconnect_if_redundant(); 2252 2253 return; 2254 } 2255 2256 // let the torrent know which pieces the peer has if we're a seed, we 2257 // don't keep track of piece availability 2258 t->peer_has(bits, this); 2259 2260 m_have_piece = bits; 2261 m_num_pieces = num_pieces; 2262 2263 update_interest(); 2264 } 2265 disconnect_if_redundant()2266 bool peer_connection::disconnect_if_redundant() 2267 { 2268 TORRENT_ASSERT(is_single_thread()); 2269 if (m_disconnecting) return false; 2270 if (m_need_interest_update) return false; 2271 2272 // we cannot disconnect in a constructor 2273 TORRENT_ASSERT(m_in_constructor == false); 2274 if (!m_settings.get_bool(settings_pack::close_redundant_connections)) return false; 2275 2276 std::shared_ptr<torrent> t = m_torrent.lock(); 2277 if (!t) return false; 2278 2279 // if we don't have the metadata yet, don't disconnect 2280 // also, if the peer doesn't have metadata we shouldn't 2281 // disconnect it, since it may want to request the 2282 // metadata from us 2283 if (!t->valid_metadata() || !has_metadata()) return false; 2284 2285 #ifndef TORRENT_DISABLE_SHARE_MODE 2286 // don't close connections in share mode, we don't know if we need them 2287 if (t->share_mode()) return false; 2288 #endif 2289 2290 if (m_upload_only && t->is_upload_only() 2291 && can_disconnect(errors::upload_upload_connection)) 2292 { 2293 #ifndef TORRENT_DISABLE_LOGGING 2294 peer_log(peer_log_alert::info, "UPLOAD_ONLY", "the peer is upload-only and our torrent is also upload-only"); 2295 #endif 2296 disconnect(errors::upload_upload_connection, operation_t::bittorrent); 2297 return true; 2298 } 2299 2300 if (m_upload_only 2301 && !m_interesting 2302 && m_bitfield_received 2303 && t->are_files_checked() 2304 && can_disconnect(errors::uninteresting_upload_peer)) 2305 { 2306 #ifndef TORRENT_DISABLE_LOGGING 2307 peer_log(peer_log_alert::info, "UPLOAD_ONLY", "the peer is upload-only and we're not interested in it"); 2308 #endif 2309 disconnect(errors::uninteresting_upload_peer, operation_t::bittorrent); 2310 return true; 2311 } 2312 2313 return false; 2314 } 2315 can_disconnect(error_code const & ec) const2316 bool peer_connection::can_disconnect(error_code const& ec) const 2317 { 2318 TORRENT_ASSERT(is_single_thread()); 2319 #ifndef TORRENT_DISABLE_EXTENSIONS 2320 for (auto const& e : m_extensions) 2321 { 2322 if (!e->can_disconnect(ec)) return false; 2323 } 2324 #else 2325 TORRENT_UNUSED(ec); 2326 #endif 2327 return true; 2328 } 2329 2330 // ----------------------------- 2331 // ---------- REQUEST ---------- 2332 // ----------------------------- 2333 incoming_request(peer_request const & r)2334 void peer_connection::incoming_request(peer_request const& r) 2335 { 2336 TORRENT_ASSERT(is_single_thread()); 2337 INVARIANT_CHECK; 2338 2339 std::shared_ptr<torrent> t = m_torrent.lock(); 2340 TORRENT_ASSERT(t); 2341 torrent_info const& ti = t->torrent_file(); 2342 2343 m_counters.inc_stats_counter(counters::piece_requests); 2344 2345 #ifndef TORRENT_DISABLE_LOGGING 2346 const bool valid_piece_index 2347 = r.piece >= piece_index_t(0) 2348 && r.piece < t->torrent_file().end_piece(); 2349 2350 peer_log(peer_log_alert::incoming_message, "REQUEST" 2351 , "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length); 2352 #endif 2353 2354 #ifndef TORRENT_DISABLE_SUPERSEEDING 2355 if (t->super_seeding() 2356 && !super_seeded_piece(r.piece)) 2357 { 2358 m_counters.inc_stats_counter(counters::invalid_piece_requests); 2359 ++m_num_invalid_requests; 2360 #ifndef TORRENT_DISABLE_LOGGING 2361 if (should_log(peer_log_alert::info)) 2362 { 2363 peer_log(peer_log_alert::info, "INVALID_REQUEST", "piece not super-seeded " 2364 "i: %d t: %d n: %d h: %d ss1: %d ss2: %d" 2365 , m_peer_interested 2366 , valid_piece_index 2367 ? t->torrent_file().piece_size(r.piece) : -1 2368 , t->torrent_file().num_pieces() 2369 , valid_piece_index ? t->has_piece_passed(r.piece) : 0 2370 , static_cast<int>(m_superseed_piece[0]) 2371 , static_cast<int>(m_superseed_piece[1])); 2372 } 2373 #endif 2374 2375 write_reject_request(r); 2376 2377 if (t->alerts().should_post<invalid_request_alert>()) 2378 { 2379 // msvc 12 appears to deduce the rvalue reference template 2380 // incorrectly for bool temporaries. So, create a dummy instance 2381 bool const peer_interested = bool(m_peer_interested); 2382 t->alerts().emplace_alert<invalid_request_alert>( 2383 t->get_handle(), m_remote, m_peer_id, r 2384 , t->has_piece_passed(r.piece), peer_interested, true); 2385 } 2386 return; 2387 } 2388 #endif // TORRENT_DISABLE_SUPERSEEDING 2389 2390 // if we haven't received a bitfield, it was 2391 // probably omitted, which is the same as 'have_none' 2392 if (!m_bitfield_received) incoming_have_none(); 2393 if (is_disconnecting()) return; 2394 2395 #ifndef TORRENT_DISABLE_EXTENSIONS 2396 for (auto const& e : m_extensions) 2397 { 2398 if (e->on_request(r)) return; 2399 } 2400 if (is_disconnecting()) return; 2401 #endif 2402 2403 if (!t->valid_metadata()) 2404 { 2405 m_counters.inc_stats_counter(counters::invalid_piece_requests); 2406 // if we don't have valid metadata yet, 2407 // we shouldn't get a request 2408 #ifndef TORRENT_DISABLE_LOGGING 2409 peer_log(peer_log_alert::info, "INVALID_REQUEST", "we don't have metadata yet"); 2410 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x no metadata" 2411 , static_cast<int>(r.piece), r.start, r.length); 2412 #endif 2413 write_reject_request(r); 2414 return; 2415 } 2416 2417 if (int(m_requests.size()) > m_settings.get_int(settings_pack::max_allowed_in_request_queue)) 2418 { 2419 m_counters.inc_stats_counter(counters::max_piece_requests); 2420 // don't allow clients to abuse our 2421 // memory consumption. 2422 // ignore requests if the client 2423 // is making too many of them. 2424 #ifndef TORRENT_DISABLE_LOGGING 2425 peer_log(peer_log_alert::info, "INVALID_REQUEST", "incoming request queue full %d" 2426 , int(m_requests.size())); 2427 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x too many requests" 2428 , static_cast<int>(r.piece), r.start, r.length); 2429 #endif 2430 write_reject_request(r); 2431 return; 2432 } 2433 2434 int fast_idx = -1; 2435 auto const fast_iter = std::find(m_accept_fast.begin() 2436 , m_accept_fast.end(), r.piece); 2437 if (fast_iter != m_accept_fast.end()) fast_idx = int(fast_iter - m_accept_fast.begin()); 2438 2439 if (!m_peer_interested) 2440 { 2441 #ifndef TORRENT_DISABLE_LOGGING 2442 if (should_log(peer_log_alert::info)) 2443 { 2444 peer_log(peer_log_alert::info, "INVALID_REQUEST", "peer is not interested " 2445 " t: %d n: %d block_limit: %d" 2446 , valid_piece_index 2447 ? t->torrent_file().piece_size(r.piece) : -1 2448 , t->torrent_file().num_pieces() 2449 , t->block_size()); 2450 peer_log(peer_log_alert::info, "INTERESTED", "artificial incoming INTERESTED message"); 2451 } 2452 #endif 2453 if (t->alerts().should_post<invalid_request_alert>()) 2454 { 2455 t->alerts().emplace_alert<invalid_request_alert>( 2456 t->get_handle(), m_remote, m_peer_id, r 2457 , t->has_piece_passed(r.piece) 2458 , false, false); 2459 } 2460 2461 // be lenient and pretend that the peer said it was interested 2462 incoming_interested(); 2463 } 2464 2465 // make sure this request 2466 // is legal and that the peer 2467 // is not choked 2468 if (r.piece < piece_index_t(0) 2469 || r.piece >= t->torrent_file().end_piece() 2470 || (!t->has_piece_passed(r.piece) 2471 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES 2472 && !t->is_predictive_piece(r.piece) 2473 #endif 2474 && !t->seed_mode()) 2475 || r.start < 0 2476 || r.start >= ti.piece_size(r.piece) 2477 || r.length <= 0 2478 || r.length + r.start > ti.piece_size(r.piece) 2479 || r.length > t->block_size()) 2480 { 2481 m_counters.inc_stats_counter(counters::invalid_piece_requests); 2482 2483 #ifndef TORRENT_DISABLE_LOGGING 2484 if (should_log(peer_log_alert::info)) 2485 { 2486 peer_log(peer_log_alert::info, "INVALID_REQUEST" 2487 , "i: %d t: %d n: %d h: %d block_limit: %d" 2488 , m_peer_interested 2489 , valid_piece_index 2490 ? t->torrent_file().piece_size(r.piece) : -1 2491 , ti.num_pieces() 2492 , t->has_piece_passed(r.piece) 2493 , t->block_size()); 2494 } 2495 2496 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE" 2497 , "piece: %d s: %d l: %d invalid request" 2498 , static_cast<int>(r.piece), r.start , r.length); 2499 #endif 2500 2501 write_reject_request(r); 2502 ++m_num_invalid_requests; 2503 2504 if (t->alerts().should_post<invalid_request_alert>()) 2505 { 2506 // msvc 12 appears to deduce the rvalue reference template 2507 // incorrectly for bool temporaries. So, create a dummy instance 2508 bool const peer_interested = bool(m_peer_interested); 2509 t->alerts().emplace_alert<invalid_request_alert>( 2510 t->get_handle(), m_remote, m_peer_id, r 2511 , t->has_piece_passed(r.piece), peer_interested, false); 2512 } 2513 2514 // every ten invalid request, remind the peer that it's choked 2515 if (!m_peer_interested && m_num_invalid_requests % 10 == 0 && m_choked) 2516 { 2517 // TODO: 2 this should probably be based on time instead of number 2518 // of request messages. For a very high throughput connection, 300 2519 // may be a legitimate number of requests to have in flight when 2520 // getting choked 2521 if (m_num_invalid_requests > 300 && !m_peer_choked 2522 && can_disconnect(errors::too_many_requests_when_choked)) 2523 { 2524 disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error); 2525 return; 2526 } 2527 #ifndef TORRENT_DISABLE_LOGGING 2528 peer_log(peer_log_alert::outgoing_message, "CHOKE"); 2529 #endif 2530 write_choke(); 2531 } 2532 2533 return; 2534 } 2535 2536 // if we have choked the client 2537 // ignore the request 2538 int const blocks_per_piece = 2539 (ti.piece_length() + t->block_size() - 1) / t->block_size(); 2540 2541 // disconnect peers that downloads more than foo times an allowed 2542 // fast piece 2543 if (m_choked && fast_idx != -1 && m_accept_fast_piece_cnt[fast_idx] >= 3 * blocks_per_piece 2544 && can_disconnect(errors::too_many_requests_when_choked)) 2545 { 2546 disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error); 2547 return; 2548 } 2549 2550 if (m_choked && fast_idx == -1) 2551 { 2552 #ifndef TORRENT_DISABLE_LOGGING 2553 peer_log(peer_log_alert::info, "REJECTING REQUEST", "peer choked and piece not in allowed fast set"); 2554 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %d l: %d peer choked" 2555 , static_cast<int>(r.piece), r.start, r.length); 2556 #endif 2557 m_counters.inc_stats_counter(counters::choked_piece_requests); 2558 write_reject_request(r); 2559 2560 // allow peers to send request up to 2 seconds after getting choked, 2561 // then disconnect them 2562 if (aux::time_now() - seconds(2) > m_last_choke 2563 && can_disconnect(errors::too_many_requests_when_choked)) 2564 { 2565 disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error); 2566 return; 2567 } 2568 } 2569 else 2570 { 2571 // increase the allowed fast set counter 2572 if (fast_idx != -1) 2573 ++m_accept_fast_piece_cnt[fast_idx]; 2574 2575 if (m_requests.empty()) 2576 m_counters.inc_stats_counter(counters::num_peers_up_requests); 2577 2578 TORRENT_ASSERT(t->valid_metadata()); 2579 TORRENT_ASSERT(r.piece >= piece_index_t(0)); 2580 TORRENT_ASSERT(r.piece < t->torrent_file().end_piece()); 2581 2582 m_requests.push_back(r); 2583 2584 if (t->alerts().should_post<incoming_request_alert>()) 2585 { 2586 t->alerts().emplace_alert<incoming_request_alert>(r, t->get_handle() 2587 , m_remote, m_peer_id); 2588 } 2589 2590 m_last_incoming_request = aux::time_now(); 2591 fill_send_buffer(); 2592 } 2593 } 2594 2595 // reject all requests to this piece reject_piece(piece_index_t const index)2596 void peer_connection::reject_piece(piece_index_t const index) 2597 { 2598 TORRENT_ASSERT(is_single_thread()); 2599 for (auto i = m_requests.begin(), end(m_requests.end()); i != end; ++i) 2600 { 2601 peer_request const& r = *i; 2602 if (r.piece != index) continue; 2603 write_reject_request(r); 2604 i = m_requests.erase(i); 2605 2606 if (m_requests.empty()) 2607 m_counters.inc_stats_counter(counters::num_peers_up_requests, -1); 2608 } 2609 } 2610 incoming_piece_fragment(int const bytes)2611 void peer_connection::incoming_piece_fragment(int const bytes) 2612 { 2613 TORRENT_ASSERT(is_single_thread()); 2614 m_last_piece = aux::time_now(); 2615 TORRENT_ASSERT_VAL(m_outstanding_bytes >= bytes, m_outstanding_bytes - bytes); 2616 m_outstanding_bytes -= bytes; 2617 if (m_outstanding_bytes < 0) m_outstanding_bytes = 0; 2618 std::shared_ptr<torrent> t = associated_torrent().lock(); 2619 #if TORRENT_USE_ASSERTS 2620 TORRENT_ASSERT(m_received_in_piece + bytes <= t->block_size()); 2621 m_received_in_piece += bytes; 2622 #endif 2623 2624 // progress of this torrent increased 2625 t->state_updated(); 2626 2627 #if TORRENT_USE_INVARIANT_CHECKS 2628 check_invariant(); 2629 #endif 2630 } 2631 start_receive_piece(peer_request const & r)2632 void peer_connection::start_receive_piece(peer_request const& r) 2633 { 2634 TORRENT_ASSERT(is_single_thread()); 2635 #if TORRENT_USE_INVARIANT_CHECKS 2636 check_invariant(); 2637 #endif 2638 #if TORRENT_USE_ASSERTS 2639 span<char const> recv_buffer = m_recv_buffer.get(); 2640 int recv_pos = int(recv_buffer.end() - recv_buffer.begin()); 2641 TORRENT_ASSERT(recv_pos >= 9); 2642 #endif 2643 2644 std::shared_ptr<torrent> t = associated_torrent().lock(); 2645 TORRENT_ASSERT(t); 2646 2647 if (!verify_piece(r)) 2648 { 2649 #ifndef TORRENT_DISABLE_LOGGING 2650 peer_log(peer_log_alert::info, "INVALID_PIECE", "piece: %d s: %d l: %d" 2651 , static_cast<int>(r.piece), r.start, r.length); 2652 #endif 2653 disconnect(errors::invalid_piece, operation_t::bittorrent, peer_error); 2654 return; 2655 } 2656 2657 piece_block const b(r.piece, r.start / t->block_size()); 2658 m_receiving_block = b; 2659 2660 bool in_req_queue = false; 2661 for (auto const& pb : m_download_queue) 2662 { 2663 if (pb.block != b) continue; 2664 in_req_queue = true; 2665 break; 2666 } 2667 2668 // if this is not in the request queue, we have to 2669 // assume our outstanding bytes includes this piece too 2670 // if we're disconnecting, we shouldn't add pieces 2671 if (!in_req_queue && !m_disconnecting) 2672 { 2673 for (auto i = m_request_queue.begin() 2674 , end(m_request_queue.end()); i != end; ++i) 2675 { 2676 if (i->block != b) continue; 2677 in_req_queue = true; 2678 if (i - m_request_queue.begin() < m_queued_time_critical) 2679 --m_queued_time_critical; 2680 m_request_queue.erase(i); 2681 break; 2682 } 2683 2684 if (m_download_queue.empty()) 2685 m_counters.inc_stats_counter(counters::num_peers_down_requests); 2686 2687 m_download_queue.insert(m_download_queue.begin(), b); 2688 if (!in_req_queue) 2689 { 2690 if (t->alerts().should_post<unwanted_block_alert>()) 2691 { 2692 t->alerts().emplace_alert<unwanted_block_alert>(t->get_handle() 2693 , m_remote, m_peer_id, b.block_index, b.piece_index); 2694 } 2695 #ifndef TORRENT_DISABLE_LOGGING 2696 peer_log(peer_log_alert::info, "INVALID_REQUEST" 2697 , "The block we just got was not in the request queue"); 2698 #endif 2699 TORRENT_ASSERT(m_download_queue.front().block == b); 2700 m_download_queue.front().not_wanted = true; 2701 } 2702 m_outstanding_bytes += r.length; 2703 } 2704 } 2705 2706 #if TORRENT_USE_INVARIANT_CHECKS 2707 struct check_postcondition 2708 { check_postconditionlibtorrent::check_postcondition2709 explicit check_postcondition(std::shared_ptr<torrent> const& t_ 2710 , bool init_check = true): t(t_) { if (init_check) check(); } 2711 ~check_postconditionlibtorrent::check_postcondition2712 ~check_postcondition() { check(); } 2713 checklibtorrent::check_postcondition2714 void check() 2715 { 2716 if (!t->is_seed()) 2717 { 2718 const int blocks_per_piece = static_cast<int>( 2719 (t->torrent_file().piece_length() + t->block_size() - 1) / t->block_size()); 2720 2721 std::vector<piece_picker::downloading_piece> const& dl_queue 2722 = t->picker().get_download_queue(); 2723 2724 for (std::vector<piece_picker::downloading_piece>::const_iterator i = 2725 dl_queue.begin(); i != dl_queue.end(); ++i) 2726 { 2727 TORRENT_ASSERT(i->finished <= blocks_per_piece); 2728 } 2729 } 2730 } 2731 2732 std::shared_ptr<torrent> t; 2733 }; 2734 #endif 2735 2736 2737 // ----------------------------- 2738 // ----------- PIECE ----------- 2739 // ----------------------------- 2740 incoming_piece(peer_request const & p,char const * data)2741 void peer_connection::incoming_piece(peer_request const& p, char const* data) 2742 { 2743 TORRENT_ASSERT(is_single_thread()); 2744 INVARIANT_CHECK; 2745 2746 std::shared_ptr<torrent> t = m_torrent.lock(); 2747 TORRENT_ASSERT(t); 2748 2749 // we're not receiving any block right now 2750 m_receiving_block = piece_block::invalid; 2751 2752 #ifdef TORRENT_CORRUPT_DATA 2753 // corrupt all pieces from certain peers 2754 if (is_v4(m_remote) 2755 && (m_remote.address().to_v4().to_ulong() & 0xf) == 0) 2756 { 2757 data[0] = ~data[0]; 2758 } 2759 #endif 2760 2761 // if we haven't received a bitfield, it was 2762 // probably omitted, which is the same as 'have_none' 2763 if (!m_bitfield_received) incoming_have_none(); 2764 if (is_disconnecting()) return; 2765 2766 // slow-start 2767 if (m_slow_start) 2768 m_desired_queue_size += 1; 2769 2770 update_desired_queue_size(); 2771 2772 #ifndef TORRENT_DISABLE_EXTENSIONS 2773 for (auto const& e : m_extensions) 2774 { 2775 if (e->on_piece(p, {data, p.length})) 2776 { 2777 #if TORRENT_USE_ASSERTS 2778 TORRENT_ASSERT(m_received_in_piece == p.length); 2779 m_received_in_piece = 0; 2780 #endif 2781 return; 2782 } 2783 } 2784 #endif 2785 if (is_disconnecting()) return; 2786 2787 #if TORRENT_USE_INVARIANT_CHECKS 2788 check_postcondition post_checker_(t); 2789 #if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS 2790 t->check_invariant(); 2791 #endif 2792 #endif 2793 2794 #ifndef TORRENT_DISABLE_LOGGING 2795 if (should_log(peer_log_alert::incoming_message)) 2796 { 2797 peer_log(peer_log_alert::incoming_message, "PIECE", "piece: %d s: %x l: %x ds: %d qs: %d q: %d" 2798 , static_cast<int>(p.piece), p.start, p.length, statistics().download_rate() 2799 , int(m_desired_queue_size), int(m_download_queue.size())); 2800 } 2801 #endif 2802 2803 if (p.length == 0) 2804 { 2805 if (t->alerts().should_post<peer_error_alert>()) 2806 { 2807 t->alerts().emplace_alert<peer_error_alert>(t->get_handle(), m_remote 2808 , m_peer_id, operation_t::bittorrent, errors::peer_sent_empty_piece); 2809 } 2810 // This is used as a reject-request by bitcomet 2811 incoming_reject_request(p); 2812 return; 2813 } 2814 2815 // if we're already seeding, don't bother, 2816 // just ignore it 2817 if (t->is_seed()) 2818 { 2819 #if TORRENT_USE_ASSERTS 2820 TORRENT_ASSERT(m_received_in_piece == p.length); 2821 m_received_in_piece = 0; 2822 #endif 2823 if (!m_download_queue.empty()) 2824 { 2825 m_download_queue.erase(m_download_queue.begin()); 2826 if (m_download_queue.empty()) 2827 m_counters.inc_stats_counter(counters::num_peers_down_requests, -1); 2828 } 2829 t->add_redundant_bytes(p.length, waste_reason::piece_seed); 2830 return; 2831 } 2832 2833 time_point const now = clock_type::now(); 2834 2835 t->need_picker(); 2836 2837 piece_picker& picker = t->picker(); 2838 2839 piece_block block_finished(p.piece, p.start / t->block_size()); 2840 TORRENT_ASSERT(verify_piece(p)); 2841 2842 auto const b = std::find_if(m_download_queue.begin() 2843 , m_download_queue.end(), aux::has_block(block_finished)); 2844 2845 if (b == m_download_queue.end()) 2846 { 2847 if (t->alerts().should_post<unwanted_block_alert>()) 2848 { 2849 t->alerts().emplace_alert<unwanted_block_alert>(t->get_handle() 2850 , m_remote, m_peer_id, block_finished.block_index 2851 , block_finished.piece_index); 2852 } 2853 #ifndef TORRENT_DISABLE_LOGGING 2854 peer_log(peer_log_alert::info, "INVALID_REQUEST", "The block we just got was not in the request queue"); 2855 #endif 2856 #if TORRENT_USE_ASSERTS 2857 TORRENT_ASSERT_VAL(m_received_in_piece == p.length, m_received_in_piece); 2858 m_received_in_piece = 0; 2859 #endif 2860 t->add_redundant_bytes(p.length, waste_reason::piece_unknown); 2861 2862 // the bytes of the piece we just completed have been deducted from 2863 // m_outstanding_bytes as we received it, in incoming_piece_fragment. 2864 // however, it now turns out the piece we received wasn't in the 2865 // download queue, so we still have the same number of pieces in the 2866 // download queue, which is why we need to add the bytes back. 2867 m_outstanding_bytes += p.length; 2868 #if TORRENT_USE_INVARIANT_CHECKS 2869 check_invariant(); 2870 #endif 2871 return; 2872 } 2873 2874 #if TORRENT_USE_ASSERTS 2875 TORRENT_ASSERT_VAL(m_received_in_piece == p.length, m_received_in_piece); 2876 m_received_in_piece = 0; 2877 #endif 2878 // if the block we got is already finished, then ignore it 2879 if (picker.is_downloaded(block_finished)) 2880 { 2881 waste_reason const reason 2882 = (b->timed_out) ? waste_reason::piece_timed_out 2883 : (b->not_wanted) ? waste_reason::piece_cancelled 2884 : (b->busy) ? waste_reason::piece_end_game 2885 : waste_reason::piece_unknown; 2886 2887 t->add_redundant_bytes(p.length, reason); 2888 2889 m_download_queue.erase(b); 2890 if (m_download_queue.empty()) 2891 m_counters.inc_stats_counter(counters::num_peers_down_requests, -1); 2892 2893 if (m_disconnecting) return; 2894 2895 m_request_time.add_sample(int(total_milliseconds(now - m_requested))); 2896 #ifndef TORRENT_DISABLE_LOGGING 2897 if (should_log(peer_log_alert::info)) 2898 { 2899 peer_log(peer_log_alert::info, "REQUEST_TIME", "%d +- %d ms" 2900 , m_request_time.mean(), m_request_time.avg_deviation()); 2901 } 2902 #endif 2903 2904 // we completed an incoming block, and there are still outstanding 2905 // requests. The next block we expect to receive now has another 2906 // timeout period until we time out. So, reset the timer. 2907 if (!m_download_queue.empty()) 2908 m_requested = now; 2909 2910 if (request_a_block(*t, *this)) 2911 m_counters.inc_stats_counter(counters::incoming_redundant_piece_picks); 2912 send_block_requests(); 2913 return; 2914 } 2915 2916 // we received a request within the timeout, make sure this peer is 2917 // not snubbed anymore 2918 if (total_seconds(now - m_requested) < request_timeout() 2919 && m_snubbed) 2920 { 2921 m_snubbed = false; 2922 if (t->alerts().should_post<peer_unsnubbed_alert>()) 2923 { 2924 t->alerts().emplace_alert<peer_unsnubbed_alert>(t->get_handle() 2925 , m_remote, m_peer_id); 2926 } 2927 } 2928 2929 #ifndef TORRENT_DISABLE_LOGGING 2930 peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE", "piece: %d s: %x l: %x" 2931 , static_cast<int>(p.piece), p.start, p.length); 2932 #endif 2933 m_download_queue.erase(b); 2934 if (m_download_queue.empty()) 2935 m_counters.inc_stats_counter(counters::num_peers_down_requests, -1); 2936 2937 if (t->is_deleted()) return; 2938 2939 auto conn = self(); 2940 bool const exceeded = m_disk_thread.async_write(t->storage(), p, data, self() 2941 , [conn, p, t] (storage_error const& e) 2942 { conn->wrap(&peer_connection::on_disk_write_complete, e, p, t); }); 2943 2944 // every peer is entitled to have two disk blocks allocated at any given 2945 // time, regardless of whether the cache size is exceeded or not. If this 2946 // was not the case, when the cache size setting is very small, most peers 2947 // would be blocked most of the time, because the disk cache would 2948 // continuously be in exceeded state. Only rarely would it actually drop 2949 // down to 0 and unblock all peers. 2950 if (exceeded && m_outstanding_writing_bytes > 0) 2951 { 2952 if (!(m_channel_state[download_channel] & peer_info::bw_disk)) 2953 m_counters.inc_stats_counter(counters::num_peers_down_disk); 2954 m_channel_state[download_channel] |= peer_info::bw_disk; 2955 #ifndef TORRENT_DISABLE_LOGGING 2956 peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark"); 2957 #endif 2958 } 2959 2960 std::int64_t const write_queue_size = m_counters.inc_stats_counter( 2961 counters::queued_write_bytes, p.length); 2962 m_outstanding_writing_bytes += p.length; 2963 2964 std::int64_t const max_queue_size = m_settings.get_int( 2965 settings_pack::max_queued_disk_bytes); 2966 if (write_queue_size > max_queue_size 2967 && write_queue_size - p.length < max_queue_size 2968 && m_settings.get_int(settings_pack::cache_size) > 5 2969 && t->alerts().should_post<performance_alert>()) 2970 { 2971 t->alerts().emplace_alert<performance_alert>(t->get_handle() 2972 , performance_alert::too_high_disk_queue_limit); 2973 } 2974 2975 m_request_time.add_sample(int(total_milliseconds(now - m_requested))); 2976 #ifndef TORRENT_DISABLE_LOGGING 2977 if (should_log(peer_log_alert::info)) 2978 { 2979 peer_log(peer_log_alert::info, "REQUEST_TIME", "%d +- %d ms" 2980 , m_request_time.mean(), m_request_time.avg_deviation()); 2981 } 2982 #endif 2983 2984 // we completed an incoming block, and there are still outstanding 2985 // requests. The next block we expect to receive now has another 2986 // timeout period until we time out. So, reset the timer. 2987 if (!m_download_queue.empty()) 2988 m_requested = now; 2989 2990 bool const was_finished = picker.is_piece_finished(p.piece); 2991 // did we request this block from any other peers? 2992 bool const multi = picker.num_peers(block_finished) > 1; 2993 // std::fprintf(stderr, "peer_connection mark_as_writing peer: %p piece: %d block: %d\n" 2994 // , peer_info_struct(), block_finished.piece_index, block_finished.block_index); 2995 picker.mark_as_writing(block_finished, peer_info_struct()); 2996 2997 TORRENT_ASSERT(picker.num_peers(block_finished) == 0); 2998 // if we requested this block from other peers, cancel it now 2999 if (multi) t->cancel_block(block_finished); 3000 3001 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES 3002 if (m_settings.get_int(settings_pack::predictive_piece_announce)) 3003 { 3004 piece_index_t const piece = block_finished.piece_index; 3005 piece_picker::downloading_piece st; 3006 t->picker().piece_info(piece, st); 3007 3008 int const num_blocks = t->picker().blocks_in_piece(piece); 3009 if (st.requested > 0 && st.writing + st.finished + st.requested == num_blocks) 3010 { 3011 std::vector<torrent_peer*> d; 3012 t->picker().get_downloaders(d, piece); 3013 if (d.size() == 1) 3014 { 3015 // only make predictions if all remaining 3016 // blocks are requested from the same peer 3017 torrent_peer* peer = d[0]; 3018 if (peer->connection) 3019 { 3020 // we have a connection. now, what is the current 3021 // download rate from this peer, and how many blocks 3022 // do we have left to download? 3023 std::int64_t const rate = peer->connection->statistics().download_payload_rate(); 3024 std::int64_t const bytes_left = std::int64_t(st.requested) * t->block_size(); 3025 // the settings unit is milliseconds, so calculate the 3026 // number of milliseconds worth of bytes left in the piece 3027 if (rate > 1000 3028 && (bytes_left * 1000) / rate < m_settings.get_int(settings_pack::predictive_piece_announce)) 3029 { 3030 // we predict we will complete this piece very soon. 3031 t->predicted_have_piece(piece, int((bytes_left * 1000) / rate)); 3032 } 3033 } 3034 } 3035 } 3036 } 3037 #endif // TORRENT_DISABLE_PREDICTIVE_PIECES 3038 3039 TORRENT_ASSERT(picker.num_peers(block_finished) == 0); 3040 3041 #if TORRENT_USE_INVARIANT_CHECKS \ 3042 && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS 3043 t->check_invariant(); 3044 #endif 3045 3046 #if TORRENT_USE_ASSERTS 3047 piece_picker::downloading_piece pi; 3048 picker.piece_info(p.piece, pi); 3049 int num_blocks = picker.blocks_in_piece(p.piece); 3050 TORRENT_ASSERT(pi.writing + pi.finished + pi.requested <= num_blocks); 3051 TORRENT_ASSERT(picker.is_piece_finished(p.piece) == (pi.writing + pi.finished == num_blocks)); 3052 #endif 3053 3054 // did we just finish the piece? 3055 // this means all blocks are either written 3056 // to disk or are in the disk write cache 3057 if (picker.is_piece_finished(p.piece) && !was_finished) 3058 { 3059 #if TORRENT_USE_INVARIANT_CHECKS 3060 check_postcondition post_checker2_(t, false); 3061 #endif 3062 t->verify_piece(p.piece); 3063 } 3064 3065 check_graceful_pause(); 3066 3067 if (is_disconnecting()) return; 3068 3069 if (request_a_block(*t, *this)) 3070 m_counters.inc_stats_counter(counters::incoming_piece_picks); 3071 send_block_requests(); 3072 } 3073 check_graceful_pause()3074 void peer_connection::check_graceful_pause() 3075 { 3076 // TODO: 3 instead of having to ask the torrent whether it's in graceful 3077 // pause mode or not, the peers should keep that state (and the torrent 3078 // should update them when it enters graceful pause). When a peer enters 3079 // graceful pause mode, it should cancel all outstanding requests and 3080 // clear its request queue. 3081 std::shared_ptr<torrent> t = m_torrent.lock(); 3082 if (!t || !t->graceful_pause()) return; 3083 3084 if (m_outstanding_bytes > 0) return; 3085 3086 #ifndef TORRENT_DISABLE_LOGGING 3087 peer_log(peer_log_alert::info, "GRACEFUL_PAUSE", "NO MORE DOWNLOAD"); 3088 #endif 3089 disconnect(errors::torrent_paused, operation_t::bittorrent); 3090 } 3091 on_disk_write_complete(storage_error const & error,peer_request const & p,std::shared_ptr<torrent> t)3092 void peer_connection::on_disk_write_complete(storage_error const& error 3093 , peer_request const& p, std::shared_ptr<torrent> t) 3094 { 3095 TORRENT_ASSERT(is_single_thread()); 3096 #ifndef TORRENT_DISABLE_LOGGING 3097 if (should_log(peer_log_alert::info)) 3098 { 3099 peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE_COMPLETE", "piece: %d s: %x l: %x e: %s" 3100 , static_cast<int>(p.piece), p.start, p.length, error.ec.message().c_str()); 3101 } 3102 #endif 3103 3104 m_counters.inc_stats_counter(counters::queued_write_bytes, -p.length); 3105 m_outstanding_writing_bytes -= p.length; 3106 3107 TORRENT_ASSERT(m_outstanding_writing_bytes >= 0); 3108 3109 // every peer is entitled to allocate a disk buffer if it has no writes outstanding 3110 // see the comment in incoming_piece 3111 if (m_outstanding_writing_bytes == 0 3112 && m_channel_state[download_channel] & peer_info::bw_disk) 3113 { 3114 m_counters.inc_stats_counter(counters::num_peers_down_disk, -1); 3115 m_channel_state[download_channel] &= ~peer_info::bw_disk; 3116 } 3117 3118 INVARIANT_CHECK; 3119 3120 if (!t) 3121 { 3122 disconnect(error.ec, operation_t::file_write); 3123 return; 3124 } 3125 3126 // in case the outstanding bytes just dropped down 3127 // to allow to receive more data 3128 setup_receive(); 3129 3130 piece_block const block_finished(p.piece, p.start / t->block_size()); 3131 3132 if (error) 3133 { 3134 // we failed to write the piece to disk tell the piece picker 3135 // this will block any other peer from issuing requests 3136 // to this piece, until we've cleared it. 3137 if (error.ec == boost::asio::error::operation_aborted) 3138 { 3139 if (t->has_picker()) 3140 t->picker().mark_as_canceled(block_finished, nullptr); 3141 } 3142 else 3143 { 3144 // if any other peer has a busy request to this block, we need 3145 // to cancel it too 3146 t->cancel_block(block_finished); 3147 if (t->has_picker()) 3148 t->picker().write_failed(block_finished); 3149 3150 if (t->has_storage()) 3151 { 3152 // when this returns, all outstanding jobs to the 3153 // piece are done, and we can restore it, allowing 3154 // new requests to it 3155 m_disk_thread.async_clear_piece(t->storage(), p.piece 3156 , [t, block_finished] (piece_index_t pi) 3157 { t->wrap(&torrent::on_piece_fail_sync, pi, block_finished); }); 3158 } 3159 else 3160 { 3161 // is m_abort true? if so, we should probably just 3162 // exit this function early, no need to keep the picker 3163 // state up-to-date, right? 3164 t->on_piece_fail_sync(p.piece, block_finished); 3165 } 3166 } 3167 t->update_gauge(); 3168 // handle_disk_error may disconnect us 3169 t->handle_disk_error("write", error, this, torrent::disk_class::write); 3170 return; 3171 } 3172 3173 if (!t->has_picker()) return; 3174 3175 piece_picker& picker = t->picker(); 3176 3177 TORRENT_ASSERT(picker.num_peers(block_finished) == 0); 3178 3179 // std::fprintf(stderr, "peer_connection mark_as_finished peer: %p piece: %d block: %d\n" 3180 // , peer_info_struct(), block_finished.piece_index, block_finished.block_index); 3181 picker.mark_as_finished(block_finished, peer_info_struct()); 3182 3183 t->maybe_done_flushing(); 3184 3185 if (t->alerts().should_post<block_finished_alert>()) 3186 { 3187 t->alerts().emplace_alert<block_finished_alert>(t->get_handle(), 3188 remote(), pid(), block_finished.block_index 3189 , block_finished.piece_index); 3190 } 3191 3192 disconnect_if_redundant(); 3193 3194 if (m_disconnecting) return; 3195 3196 #if TORRENT_USE_ASSERTS 3197 if (t->has_picker()) 3198 { 3199 auto const& q = picker.get_download_queue(); 3200 3201 for (auto const& dp : q) 3202 { 3203 if (dp.index != block_finished.piece_index) continue; 3204 auto const info = picker.blocks_for_piece(dp); 3205 TORRENT_ASSERT(info[block_finished.block_index].state 3206 == piece_picker::block_info::state_finished); 3207 } 3208 } 3209 #endif 3210 if (t->is_aborted()) return; 3211 } 3212 3213 // ----------------------------- 3214 // ---------- CANCEL ----------- 3215 // ----------------------------- 3216 incoming_cancel(peer_request const & r)3217 void peer_connection::incoming_cancel(peer_request const& r) 3218 { 3219 TORRENT_ASSERT(is_single_thread()); 3220 INVARIANT_CHECK; 3221 3222 #ifndef TORRENT_DISABLE_EXTENSIONS 3223 for (auto const& e : m_extensions) 3224 { 3225 if (e->on_cancel(r)) return; 3226 } 3227 #endif 3228 if (is_disconnecting()) return; 3229 3230 #ifndef TORRENT_DISABLE_LOGGING 3231 peer_log(peer_log_alert::incoming_message, "CANCEL" 3232 , "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length); 3233 #endif 3234 3235 auto const i = std::find(m_requests.begin(), m_requests.end(), r); 3236 3237 if (i != m_requests.end()) 3238 { 3239 m_counters.inc_stats_counter(counters::cancelled_piece_requests); 3240 m_requests.erase(i); 3241 3242 if (m_requests.empty()) 3243 m_counters.inc_stats_counter(counters::num_peers_up_requests, -1); 3244 3245 #ifndef TORRENT_DISABLE_LOGGING 3246 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x cancelled" 3247 , static_cast<int>(r.piece), r.start , r.length); 3248 #endif 3249 write_reject_request(r); 3250 } 3251 else 3252 { 3253 // TODO: 2 since we throw away the queue entry once we issue 3254 // the disk job, this may happen. Instead, we should keep the 3255 // queue entry around, mark it as having been requested from 3256 // disk and once the disk job comes back, discard it if it has 3257 // been cancelled. Maybe even be able to cancel disk jobs? 3258 #ifndef TORRENT_DISABLE_LOGGING 3259 peer_log(peer_log_alert::info, "INVALID_CANCEL", "got cancel not in the queue"); 3260 #endif 3261 } 3262 } 3263 3264 // ----------------------------- 3265 // --------- DHT PORT ---------- 3266 // ----------------------------- 3267 incoming_dht_port(int const listen_port)3268 void peer_connection::incoming_dht_port(int const listen_port) 3269 { 3270 TORRENT_ASSERT(is_single_thread()); 3271 INVARIANT_CHECK; 3272 3273 #ifndef TORRENT_DISABLE_LOGGING 3274 peer_log(peer_log_alert::incoming_message, "DHT_PORT", "p: %d", listen_port); 3275 #endif 3276 #ifndef TORRENT_DISABLE_DHT 3277 m_ses.add_dht_node({m_remote.address(), std::uint16_t(listen_port)}); 3278 #else 3279 TORRENT_UNUSED(listen_port); 3280 #endif 3281 } 3282 3283 // ----------------------------- 3284 // --------- HAVE ALL ---------- 3285 // ----------------------------- 3286 incoming_have_all()3287 void peer_connection::incoming_have_all() 3288 { 3289 TORRENT_ASSERT(is_single_thread()); 3290 INVARIANT_CHECK; 3291 3292 std::shared_ptr<torrent> t = m_torrent.lock(); 3293 TORRENT_ASSERT(t); 3294 3295 // we cannot disconnect in a constructor, and 3296 // this function may end up doing that 3297 TORRENT_ASSERT(m_in_constructor == false); 3298 3299 #ifndef TORRENT_DISABLE_LOGGING 3300 peer_log(peer_log_alert::incoming_message, "HAVE_ALL"); 3301 #endif 3302 3303 #ifndef TORRENT_DISABLE_EXTENSIONS 3304 for (auto const& e : m_extensions) 3305 { 3306 if (e->on_have_all()) return; 3307 } 3308 #endif 3309 if (is_disconnecting()) return; 3310 3311 if (m_bitfield_received) 3312 t->peer_lost(m_have_piece, this); 3313 3314 m_have_all = true; 3315 3316 #ifndef TORRENT_DISABLE_LOGGING 3317 peer_log(peer_log_alert::info, "SEED", "this is a seed p: %p" 3318 , static_cast<void*>(m_peer_info)); 3319 #endif 3320 3321 t->set_seed(m_peer_info, true); 3322 m_upload_only = true; 3323 m_bitfield_received = true; 3324 3325 // if we don't have metadata yet 3326 // just remember the bitmask 3327 // don't update the piecepicker 3328 // (since it doesn't exist yet) 3329 if (!t->ready_for_connections()) 3330 { 3331 // assume seeds are interesting when we 3332 // don't even have the metadata 3333 t->peer_is_interesting(*this); 3334 3335 disconnect_if_redundant(); 3336 return; 3337 } 3338 3339 TORRENT_ASSERT(!m_have_piece.empty()); 3340 m_have_piece.set_all(); 3341 m_num_pieces = m_have_piece.size(); 3342 3343 t->peer_has_all(this); 3344 3345 #if TORRENT_USE_INVARIANT_CHECKS 3346 if (t && t->has_picker()) 3347 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 3348 #endif 3349 3350 TORRENT_ASSERT(m_have_piece.all_set()); 3351 TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size()); 3352 TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces()); 3353 3354 // if we're finished, we're not interested 3355 if (t->is_upload_only()) send_not_interested(); 3356 else t->peer_is_interesting(*this); 3357 3358 disconnect_if_redundant(); 3359 } 3360 3361 // ----------------------------- 3362 // --------- HAVE NONE --------- 3363 // ----------------------------- 3364 incoming_have_none()3365 void peer_connection::incoming_have_none() 3366 { 3367 TORRENT_ASSERT(is_single_thread()); 3368 INVARIANT_CHECK; 3369 3370 #ifndef TORRENT_DISABLE_LOGGING 3371 peer_log(peer_log_alert::incoming_message, "HAVE_NONE"); 3372 #endif 3373 3374 std::shared_ptr<torrent> t = m_torrent.lock(); 3375 TORRENT_ASSERT(t); 3376 3377 #ifndef TORRENT_DISABLE_EXTENSIONS 3378 for (auto const& e : m_extensions) 3379 { 3380 if (e->on_have_none()) return; 3381 } 3382 #endif 3383 if (is_disconnecting()) return; 3384 3385 if (m_bitfield_received) 3386 t->peer_lost(m_have_piece, this); 3387 3388 t->set_seed(m_peer_info, false); 3389 m_bitfield_received = true; 3390 m_have_all = false; 3391 3392 m_have_piece.clear_all(); 3393 m_num_pieces = 0; 3394 3395 TORRENT_ASSERT(!is_seed()); 3396 3397 // if the peer is ready to download stuff, it must have metadata 3398 m_has_metadata = true; 3399 3400 // we're never interested in a peer that doesn't have anything 3401 send_not_interested(); 3402 3403 TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections()); 3404 disconnect_if_redundant(); 3405 } 3406 3407 // ----------------------------- 3408 // ------- ALLOWED FAST -------- 3409 // ----------------------------- 3410 incoming_allowed_fast(piece_index_t const index)3411 void peer_connection::incoming_allowed_fast(piece_index_t const index) 3412 { 3413 TORRENT_ASSERT(is_single_thread()); 3414 INVARIANT_CHECK; 3415 3416 std::shared_ptr<torrent> t = m_torrent.lock(); 3417 TORRENT_ASSERT(t); 3418 3419 #ifndef TORRENT_DISABLE_LOGGING 3420 peer_log(peer_log_alert::incoming_message, "ALLOWED_FAST", "%d" 3421 , static_cast<int>(index)); 3422 #endif 3423 3424 #ifndef TORRENT_DISABLE_EXTENSIONS 3425 for (auto const& e : m_extensions) 3426 { 3427 if (e->on_allowed_fast(index)) return; 3428 } 3429 #endif 3430 if (is_disconnecting()) return; 3431 3432 if (index < piece_index_t(0)) 3433 { 3434 #ifndef TORRENT_DISABLE_LOGGING 3435 peer_log(peer_log_alert::incoming_message, "INVALID_ALLOWED_FAST" 3436 , "%d", static_cast<int>(index)); 3437 #endif 3438 return; 3439 } 3440 3441 if (t->valid_metadata()) 3442 { 3443 if (index >= m_have_piece.end_index()) 3444 { 3445 #ifndef TORRENT_DISABLE_LOGGING 3446 peer_log(peer_log_alert::incoming_message, "INVALID_ALLOWED_FAST" 3447 , "%d s: %d", static_cast<int>(index), m_have_piece.size()); 3448 #endif 3449 return; 3450 } 3451 3452 // if we already have the piece, we can 3453 // ignore this message 3454 if (t->have_piece(index)) 3455 return; 3456 } 3457 3458 // if we don't have the metadata, we'll verify 3459 // this piece index later 3460 m_allowed_fast.push_back(index); 3461 3462 // if the peer has the piece and we want 3463 // to download it, request it 3464 if (index < m_have_piece.end_index() 3465 && m_have_piece[index] 3466 && !t->has_piece_passed(index) 3467 && t->valid_metadata() 3468 && t->has_picker() 3469 && t->picker().piece_priority(index) > dont_download) 3470 { 3471 t->peer_is_interesting(*this); 3472 } 3473 } 3474 allowed_fast()3475 std::vector<piece_index_t> const& peer_connection::allowed_fast() 3476 { 3477 TORRENT_ASSERT(is_single_thread()); 3478 std::shared_ptr<torrent> t = m_torrent.lock(); 3479 TORRENT_ASSERT(t); 3480 3481 // TODO: sort the allowed fast set in priority order 3482 return m_allowed_fast; 3483 } 3484 can_request_time_critical() const3485 bool peer_connection::can_request_time_critical() const 3486 { 3487 TORRENT_ASSERT(is_single_thread()); 3488 if (has_peer_choked() || !is_interesting()) return false; 3489 if (int(m_download_queue.size()) + int(m_request_queue.size()) 3490 > m_desired_queue_size * 2) return false; 3491 if (on_parole()) return false; 3492 if (m_disconnecting) return false; 3493 std::shared_ptr<torrent> t = m_torrent.lock(); 3494 TORRENT_ASSERT(t); 3495 if (t->upload_mode()) return false; 3496 3497 // ignore snubbed peers, since they're not likely to return pieces in a 3498 // timely manner anyway 3499 if (m_snubbed) return false; 3500 return true; 3501 } 3502 make_time_critical(piece_block const & block)3503 bool peer_connection::make_time_critical(piece_block const& block) 3504 { 3505 TORRENT_ASSERT(is_single_thread()); 3506 auto const rit = std::find_if(m_request_queue.begin() 3507 , m_request_queue.end(), aux::has_block(block)); 3508 if (rit == m_request_queue.end()) return false; 3509 #if TORRENT_USE_ASSERTS 3510 std::shared_ptr<torrent> t = m_torrent.lock(); 3511 TORRENT_ASSERT(t); 3512 TORRENT_ASSERT(t->has_picker()); 3513 TORRENT_ASSERT(t->picker().is_requested(block)); 3514 #endif 3515 // ignore it if it's already time critical 3516 if (rit - m_request_queue.begin() < m_queued_time_critical) return false; 3517 pending_block b = *rit; 3518 m_request_queue.erase(rit); 3519 m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b); 3520 ++m_queued_time_critical; 3521 return true; 3522 } 3523 add_request(piece_block const & block,request_flags_t const flags)3524 bool peer_connection::add_request(piece_block const& block 3525 , request_flags_t const flags) 3526 { 3527 TORRENT_ASSERT(is_single_thread()); 3528 INVARIANT_CHECK; 3529 3530 std::shared_ptr<torrent> t = m_torrent.lock(); 3531 TORRENT_ASSERT(t); 3532 3533 TORRENT_ASSERT(!m_disconnecting); 3534 TORRENT_ASSERT(t->valid_metadata()); 3535 3536 TORRENT_ASSERT(block.block_index != piece_block::invalid.block_index); 3537 TORRENT_ASSERT(block.piece_index != piece_block::invalid.piece_index); 3538 TORRENT_ASSERT(block.piece_index < t->torrent_file().end_piece()); 3539 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index)); 3540 TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0)); 3541 TORRENT_ASSERT(!t->have_piece(block.piece_index)); 3542 TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end() 3543 , aux::has_block(block)) == m_download_queue.end()); 3544 TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end() 3545 , block) == m_request_queue.end()); 3546 3547 if (t->upload_mode()) 3548 { 3549 #ifndef TORRENT_DISABLE_LOGGING 3550 peer_log(peer_log_alert::info, "PIECE_PICKER" 3551 , "not_picking: %d,%d upload_mode" 3552 , static_cast<int>(block.piece_index), block.block_index); 3553 #endif 3554 return false; 3555 } 3556 if (m_disconnecting) 3557 { 3558 #ifndef TORRENT_DISABLE_LOGGING 3559 peer_log(peer_log_alert::info, "PIECE_PICKER" 3560 , "not_picking: %d,%d disconnecting" 3561 , static_cast<int>(block.piece_index), block.block_index); 3562 #endif 3563 return false; 3564 } 3565 3566 if ((flags & busy) && !(flags & time_critical)) 3567 { 3568 // this block is busy (i.e. it has been requested 3569 // from another peer already). Only allow one busy 3570 // request in the pipeline at the time 3571 // this rule does not apply to time critical pieces, 3572 // in which case we are allowed to pick more than one 3573 // busy blocks 3574 if (std::any_of(m_download_queue.begin(), m_download_queue.end() 3575 , [](pending_block const& i) { return i.busy; })) 3576 { 3577 #ifndef TORRENT_DISABLE_LOGGING 3578 peer_log(peer_log_alert::info, "PIECE_PICKER" 3579 , "not_picking: %d,%d already in download queue & busy" 3580 , static_cast<int>(block.piece_index), block.block_index); 3581 #endif 3582 return false; 3583 } 3584 3585 if (std::any_of(m_request_queue.begin(), m_request_queue.end() 3586 , [](pending_block const& i) { return i.busy; })) 3587 { 3588 #ifndef TORRENT_DISABLE_LOGGING 3589 peer_log(peer_log_alert::info, "PIECE_PICKER" 3590 , "not_picking: %d,%d already in request queue & busy" 3591 , static_cast<int>(block.piece_index), block.block_index); 3592 #endif 3593 return false; 3594 } 3595 } 3596 3597 if (!t->picker().mark_as_downloading(block, peer_info_struct() 3598 , picker_options())) 3599 { 3600 #ifndef TORRENT_DISABLE_LOGGING 3601 peer_log(peer_log_alert::info, "PIECE_PICKER" 3602 , "not_picking: %d,%d failed to mark_as_downloading" 3603 , static_cast<int>(block.piece_index), block.block_index); 3604 #endif 3605 return false; 3606 } 3607 3608 if (t->alerts().should_post<block_downloading_alert>()) 3609 { 3610 t->alerts().emplace_alert<block_downloading_alert>(t->get_handle() 3611 , remote(), pid(), block.block_index, block.piece_index); 3612 } 3613 3614 pending_block pb(block); 3615 pb.busy = (flags & busy) ? true : false; 3616 if (flags & time_critical) 3617 { 3618 m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical 3619 , pb); 3620 ++m_queued_time_critical; 3621 } 3622 else 3623 { 3624 m_request_queue.push_back(pb); 3625 } 3626 return true; 3627 } 3628 cancel_all_requests()3629 void peer_connection::cancel_all_requests() 3630 { 3631 TORRENT_ASSERT(is_single_thread()); 3632 INVARIANT_CHECK; 3633 3634 std::shared_ptr<torrent> t = m_torrent.lock(); 3635 // this peer might be disconnecting 3636 if (!t) return; 3637 3638 TORRENT_ASSERT(t->valid_metadata()); 3639 3640 #ifndef TORRENT_DISABLE_LOGGING 3641 peer_log(peer_log_alert::info, "CANCEL_ALL_REQUESTS"); 3642 #endif 3643 3644 while (!m_request_queue.empty()) 3645 { 3646 t->picker().abort_download(m_request_queue.back().block, peer_info_struct()); 3647 m_request_queue.pop_back(); 3648 } 3649 m_queued_time_critical = 0; 3650 3651 // make a local temporary copy of the download queue, since it 3652 // may be modified when we call write_cancel (for peers that don't 3653 // support the FAST extensions). 3654 std::vector<pending_block> temp_copy = m_download_queue; 3655 3656 for (auto const& pb : temp_copy) 3657 { 3658 piece_block const b = pb.block; 3659 3660 int const block_offset = b.block_index * t->block_size(); 3661 int const block_size 3662 = std::min(t->torrent_file().piece_size(b.piece_index)-block_offset, 3663 t->block_size()); 3664 TORRENT_ASSERT(block_size > 0); 3665 TORRENT_ASSERT(block_size <= t->block_size()); 3666 3667 // we can't cancel the piece if we've started receiving it 3668 if (m_receiving_block == b) continue; 3669 3670 peer_request r; 3671 r.piece = b.piece_index; 3672 r.start = block_offset; 3673 r.length = block_size; 3674 3675 #ifndef TORRENT_DISABLE_LOGGING 3676 peer_log(peer_log_alert::outgoing_message, "CANCEL" 3677 , "piece: %d s: %d l: %d b: %d" 3678 , static_cast<int>(b.piece_index), block_offset, block_size, b.block_index); 3679 #endif 3680 write_cancel(r); 3681 } 3682 } 3683 cancel_request(piece_block const & block,bool const force)3684 void peer_connection::cancel_request(piece_block const& block, bool const force) 3685 { 3686 TORRENT_ASSERT(is_single_thread()); 3687 INVARIANT_CHECK; 3688 3689 std::shared_ptr<torrent> t = m_torrent.lock(); 3690 // this peer might be disconnecting 3691 if (!t) return; 3692 3693 TORRENT_ASSERT(t->valid_metadata()); 3694 3695 TORRENT_ASSERT(block.block_index != piece_block::invalid.block_index); 3696 TORRENT_ASSERT(block.piece_index != piece_block::invalid.piece_index); 3697 TORRENT_ASSERT(block.piece_index < t->torrent_file().end_piece()); 3698 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index)); 3699 3700 // if all the peers that requested this block has been 3701 // cancelled, then just ignore the cancel. 3702 if (!t->picker().is_requested(block)) return; 3703 3704 auto const it = std::find_if(m_download_queue.begin(), m_download_queue.end() 3705 , aux::has_block(block)); 3706 if (it == m_download_queue.end()) 3707 { 3708 auto const rit = std::find_if(m_request_queue.begin() 3709 , m_request_queue.end(), aux::has_block(block)); 3710 3711 // when a multi block is received, it is cancelled 3712 // from all peers, so if this one hasn't requested 3713 // the block, just ignore to cancel it. 3714 if (rit == m_request_queue.end()) return; 3715 3716 if (rit - m_request_queue.begin() < m_queued_time_critical) 3717 --m_queued_time_critical; 3718 3719 t->picker().abort_download(block, peer_info_struct()); 3720 m_request_queue.erase(rit); 3721 // since we found it in the request queue, it means it hasn't been 3722 // sent yet, so we don't have to send a cancel. 3723 return; 3724 } 3725 3726 int const block_offset = block.block_index * t->block_size(); 3727 int const block_size 3728 = std::min(t->torrent_file().piece_size(block.piece_index) - block_offset, 3729 t->block_size()); 3730 TORRENT_ASSERT(block_size > 0); 3731 TORRENT_ASSERT(block_size <= t->block_size()); 3732 3733 it->not_wanted = true; 3734 3735 if (force) t->picker().abort_download(block, peer_info_struct()); 3736 3737 if (m_outstanding_bytes < block_size) return; 3738 3739 peer_request r; 3740 r.piece = block.piece_index; 3741 r.start = block_offset; 3742 r.length = block_size; 3743 3744 #ifndef TORRENT_DISABLE_LOGGING 3745 peer_log(peer_log_alert::outgoing_message, "CANCEL" 3746 , "piece: %d s: %d l: %d b: %d" 3747 , static_cast<int>(block.piece_index), block_offset, block_size, block.block_index); 3748 #endif 3749 write_cancel(r); 3750 } 3751 send_choke()3752 bool peer_connection::send_choke() 3753 { 3754 TORRENT_ASSERT(is_single_thread()); 3755 INVARIANT_CHECK; 3756 3757 TORRENT_ASSERT(!is_connecting()); 3758 3759 if (m_choked) 3760 { 3761 TORRENT_ASSERT(m_peer_info == nullptr 3762 || m_peer_info->optimistically_unchoked == false); 3763 return false; 3764 } 3765 3766 if (m_peer_info && m_peer_info->optimistically_unchoked) 3767 { 3768 m_peer_info->optimistically_unchoked = false; 3769 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1); 3770 } 3771 3772 m_suggest_pieces.clear(); 3773 m_suggest_pieces.shrink_to_fit(); 3774 3775 #ifndef TORRENT_DISABLE_LOGGING 3776 peer_log(peer_log_alert::outgoing_message, "CHOKE"); 3777 #endif 3778 write_choke(); 3779 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1); 3780 if (!ignore_unchoke_slots()) 3781 m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); 3782 m_choked = true; 3783 3784 m_last_choke = aux::time_now(); 3785 m_num_invalid_requests = 0; 3786 3787 // reject the requests we have in the queue 3788 // except the allowed fast pieces 3789 for (auto i = m_requests.begin(); i != m_requests.end();) 3790 { 3791 if (std::find(m_accept_fast.begin(), m_accept_fast.end(), i->piece) 3792 != m_accept_fast.end()) 3793 { 3794 ++i; 3795 continue; 3796 } 3797 peer_request const& r = *i; 3798 m_counters.inc_stats_counter(counters::choked_piece_requests); 3799 #ifndef TORRENT_DISABLE_LOGGING 3800 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE" 3801 , "piece: %d s: %d l: %d choking" 3802 , static_cast<int>(r.piece), r.start , r.length); 3803 #endif 3804 write_reject_request(r); 3805 i = m_requests.erase(i); 3806 3807 if (m_requests.empty()) 3808 m_counters.inc_stats_counter(counters::num_peers_up_requests, -1); 3809 } 3810 return true; 3811 } 3812 send_unchoke()3813 bool peer_connection::send_unchoke() 3814 { 3815 TORRENT_ASSERT(is_single_thread()); 3816 INVARIANT_CHECK; 3817 3818 if (!m_choked) return false; 3819 std::shared_ptr<torrent> t = m_torrent.lock(); 3820 if (!t->ready_for_connections()) return false; 3821 3822 if (m_settings.get_int(settings_pack::suggest_mode) 3823 == settings_pack::suggest_read_cache) 3824 { 3825 // immediately before unchoking this peer, we should send some 3826 // suggested pieces for it to request 3827 send_piece_suggestions(2); 3828 } 3829 3830 m_last_unchoke = aux::time_now(); 3831 write_unchoke(); 3832 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all); 3833 if (!ignore_unchoke_slots()) 3834 m_counters.inc_stats_counter(counters::num_peers_up_unchoked); 3835 m_choked = false; 3836 3837 m_uploaded_at_last_unchoke = m_statistics.total_payload_upload(); 3838 3839 #ifndef TORRENT_DISABLE_LOGGING 3840 peer_log(peer_log_alert::outgoing_message, "UNCHOKE"); 3841 #endif 3842 return true; 3843 } 3844 send_interested()3845 void peer_connection::send_interested() 3846 { 3847 TORRENT_ASSERT(is_single_thread()); 3848 if (m_interesting) return; 3849 std::shared_ptr<torrent> t = m_torrent.lock(); 3850 if (!t->ready_for_connections()) return; 3851 if (!m_interesting) 3852 { 3853 m_interesting = true; 3854 m_counters.inc_stats_counter(counters::num_peers_down_interested); 3855 } 3856 write_interested(); 3857 3858 #ifndef TORRENT_DISABLE_LOGGING 3859 peer_log(peer_log_alert::outgoing_message, "INTERESTED"); 3860 #endif 3861 } 3862 send_not_interested()3863 void peer_connection::send_not_interested() 3864 { 3865 TORRENT_ASSERT(is_single_thread()); 3866 // we cannot disconnect in a constructor, and 3867 // this function may end up doing that 3868 TORRENT_ASSERT(m_in_constructor == false); 3869 3870 if (!m_interesting) 3871 { 3872 disconnect_if_redundant(); 3873 return; 3874 } 3875 3876 std::shared_ptr<torrent> t = m_torrent.lock(); 3877 if (!t->ready_for_connections()) return; 3878 if (m_interesting) 3879 { 3880 m_interesting = false; 3881 m_became_uninteresting = aux::time_now(); 3882 m_counters.inc_stats_counter(counters::num_peers_down_interested, -1); 3883 } 3884 3885 m_slow_start = false; 3886 3887 disconnect_if_redundant(); 3888 if (m_disconnecting) return; 3889 3890 write_not_interested(); 3891 3892 #ifndef TORRENT_DISABLE_LOGGING 3893 if (should_log(peer_log_alert::outgoing_message)) 3894 { 3895 peer_log(peer_log_alert::outgoing_message, "NOT_INTERESTED"); 3896 } 3897 #endif 3898 } 3899 send_upload_only(bool const enabled)3900 void peer_connection::send_upload_only(bool const enabled) 3901 { 3902 TORRENT_ASSERT(is_single_thread()); 3903 if (m_connecting || in_handshake()) return; 3904 3905 #ifndef TORRENT_DISABLE_LOGGING 3906 if (should_log(peer_log_alert::outgoing_message)) 3907 { 3908 peer_log(peer_log_alert::outgoing_message, "UPLOAD_ONLY", "%d" 3909 , int(enabled)); 3910 } 3911 #endif 3912 3913 write_upload_only(enabled); 3914 } 3915 send_piece_suggestions(int const num)3916 void peer_connection::send_piece_suggestions(int const num) 3917 { 3918 std::shared_ptr<torrent> t = m_torrent.lock(); 3919 TORRENT_ASSERT(t); 3920 3921 int const new_suggestions = t->get_suggest_pieces(m_suggest_pieces 3922 , m_have_piece, num); 3923 3924 // higher priority pieces are farther back in the vector, the last 3925 // suggested piece to be received is the highest priority, so send the 3926 // highest priority piece last. 3927 for (auto i = m_suggest_pieces.end() - new_suggestions; 3928 i != m_suggest_pieces.end(); ++i) 3929 { 3930 send_suggest(*i); 3931 } 3932 int const max = m_settings.get_int(settings_pack::max_suggest_pieces); 3933 if (m_suggest_pieces.end_index() > max) 3934 { 3935 int const to_erase = m_suggest_pieces.end_index() - max; 3936 m_suggest_pieces.erase(m_suggest_pieces.begin() 3937 , m_suggest_pieces.begin() + to_erase); 3938 } 3939 } 3940 send_suggest(piece_index_t const piece)3941 void peer_connection::send_suggest(piece_index_t const piece) 3942 { 3943 TORRENT_ASSERT(is_single_thread()); 3944 if (m_connecting || in_handshake()) return; 3945 3946 // don't suggest a piece that the peer already has 3947 if (has_piece(piece)) return; 3948 3949 // we cannot suggest a piece we don't have! 3950 #if TORRENT_USE_ASSERTS 3951 { 3952 std::shared_ptr<torrent> t = m_torrent.lock(); 3953 TORRENT_ASSERT(t); 3954 TORRENT_ASSERT(t->has_piece_passed(piece)); 3955 TORRENT_ASSERT(piece < t->torrent_file().end_piece()); 3956 } 3957 #endif 3958 3959 write_suggest(piece); 3960 } 3961 send_block_requests()3962 void peer_connection::send_block_requests() 3963 { 3964 TORRENT_ASSERT(is_single_thread()); 3965 INVARIANT_CHECK; 3966 3967 std::shared_ptr<torrent> t = m_torrent.lock(); 3968 TORRENT_ASSERT(t); 3969 3970 if (m_disconnecting) return; 3971 3972 // TODO: 3 once peers are properly put in graceful pause mode, they can 3973 // cancel all outstanding requests and this test can be removed. 3974 if (t->graceful_pause()) return; 3975 3976 // we can't download pieces in these states 3977 if (t->state() == torrent_status::checking_files 3978 || t->state() == torrent_status::checking_resume_data 3979 || t->state() == torrent_status::downloading_metadata) 3980 return; 3981 3982 if (int(m_download_queue.size()) >= m_desired_queue_size 3983 || t->upload_mode()) return; 3984 3985 bool const empty_download_queue = m_download_queue.empty(); 3986 3987 while (!m_request_queue.empty() 3988 && (int(m_download_queue.size()) < m_desired_queue_size 3989 || m_queued_time_critical > 0)) 3990 { 3991 pending_block block = m_request_queue.front(); 3992 3993 m_request_queue.erase(m_request_queue.begin()); 3994 if (m_queued_time_critical) --m_queued_time_critical; 3995 3996 // if we're a seed, we don't have a piece picker 3997 // so we don't have to worry about invariants getting 3998 // out of sync with it 3999 if (!t->has_picker()) continue; 4000 4001 // this can happen if a block times out, is re-requested and 4002 // then arrives "unexpectedly" 4003 if (t->picker().is_downloaded(block.block)) 4004 { 4005 t->picker().abort_download(block.block, peer_info_struct()); 4006 continue; 4007 } 4008 4009 int block_offset = block.block.block_index * t->block_size(); 4010 int bs = std::min(t->torrent_file().piece_size( 4011 block.block.piece_index) - block_offset, t->block_size()); 4012 TORRENT_ASSERT(bs > 0); 4013 TORRENT_ASSERT(bs <= t->block_size()); 4014 4015 peer_request r; 4016 r.piece = block.block.piece_index; 4017 r.start = block_offset; 4018 r.length = bs; 4019 4020 if (m_download_queue.empty()) 4021 m_counters.inc_stats_counter(counters::num_peers_down_requests); 4022 4023 TORRENT_ASSERT(verify_piece(t->to_req(block.block))); 4024 block.send_buffer_offset = aux::numeric_cast<std::uint32_t>(m_send_buffer.size()); 4025 m_download_queue.push_back(block); 4026 m_outstanding_bytes += bs; 4027 #if TORRENT_USE_INVARIANT_CHECKS 4028 check_invariant(); 4029 #endif 4030 4031 // if we are requesting large blocks, merge the smaller 4032 // blocks that are in the same piece into larger requests 4033 if (m_request_large_blocks) 4034 { 4035 int const blocks_per_piece = t->torrent_file().piece_length() / t->block_size(); 4036 4037 while (!m_request_queue.empty()) 4038 { 4039 // check to see if this block is connected to the previous one 4040 // if it is, merge them, otherwise, break this merge loop 4041 pending_block const& front = m_request_queue.front(); 4042 if (static_cast<int>(front.block.piece_index) * blocks_per_piece + front.block.block_index 4043 != static_cast<int>(block.block.piece_index) * blocks_per_piece + block.block.block_index + 1) 4044 break; 4045 block = m_request_queue.front(); 4046 m_request_queue.erase(m_request_queue.begin()); 4047 TORRENT_ASSERT(verify_piece(t->to_req(block.block))); 4048 4049 if (m_download_queue.empty()) 4050 m_counters.inc_stats_counter(counters::num_peers_down_requests); 4051 4052 block.send_buffer_offset = aux::numeric_cast<std::uint32_t>(m_send_buffer.size()); 4053 m_download_queue.push_back(block); 4054 if (m_queued_time_critical) --m_queued_time_critical; 4055 4056 block_offset = block.block.block_index * t->block_size(); 4057 bs = std::min(t->torrent_file().piece_size( 4058 block.block.piece_index) - block_offset, t->block_size()); 4059 TORRENT_ASSERT(bs > 0); 4060 TORRENT_ASSERT(bs <= t->block_size()); 4061 4062 r.length += bs; 4063 m_outstanding_bytes += bs; 4064 #if TORRENT_USE_INVARIANT_CHECKS 4065 check_invariant(); 4066 #endif 4067 } 4068 4069 #ifndef TORRENT_DISABLE_LOGGING 4070 peer_log(peer_log_alert::info, "MERGING_REQUESTS" 4071 , "piece: %d start: %d length: %d", static_cast<int>(r.piece) 4072 , r.start, r.length); 4073 #endif 4074 4075 } 4076 4077 // the verification will fail for coalesced blocks 4078 TORRENT_ASSERT(verify_piece(r) || m_request_large_blocks); 4079 4080 #ifndef TORRENT_DISABLE_EXTENSIONS 4081 bool handled = false; 4082 for (auto const& e : m_extensions) 4083 { 4084 handled = e->write_request(r); 4085 if (handled) break; 4086 } 4087 if (is_disconnecting()) return; 4088 if (!handled) 4089 #endif 4090 { 4091 write_request(r); 4092 m_last_request = aux::time_now(); 4093 } 4094 4095 #ifndef TORRENT_DISABLE_LOGGING 4096 if (should_log(peer_log_alert::outgoing_message)) 4097 { 4098 peer_log(peer_log_alert::outgoing_message, "REQUEST" 4099 , "piece: %d s: %x l: %x ds: %dB/s dqs: %d rqs: %d blk: %s" 4100 , static_cast<int>(r.piece), r.start, r.length, statistics().download_rate() 4101 , int(m_desired_queue_size), int(m_download_queue.size()) 4102 , m_request_large_blocks?"large":"single"); 4103 } 4104 #endif 4105 } 4106 m_last_piece = aux::time_now(); 4107 4108 if (!m_download_queue.empty() 4109 && empty_download_queue) 4110 { 4111 // This means we just added a request to this connection that 4112 // previously did not have a request. That's when we start the 4113 // request timeout. 4114 m_requested = aux::time_now(); 4115 } 4116 } 4117 connect_failed(error_code const & e)4118 void peer_connection::connect_failed(error_code const& e) 4119 { 4120 TORRENT_ASSERT(is_single_thread()); 4121 TORRENT_ASSERT(e); 4122 4123 #ifndef TORRENT_DISABLE_LOGGING 4124 if (should_log(peer_log_alert::info)) 4125 { 4126 peer_log(peer_log_alert::info, "CONNECTION FAILED" 4127 , "%s %s", print_endpoint(m_remote).c_str(), print_error(e).c_str()); 4128 } 4129 #endif 4130 #ifndef TORRENT_DISABLE_LOGGING 4131 if (m_ses.should_log()) 4132 m_ses.session_log("CONNECTION FAILED: %s", print_endpoint(m_remote).c_str()); 4133 #endif 4134 4135 m_counters.inc_stats_counter(counters::connect_timeouts); 4136 4137 std::shared_ptr<torrent> t = m_torrent.lock(); 4138 TORRENT_ASSERT(!m_connecting || t); 4139 if (m_connecting) 4140 { 4141 m_counters.inc_stats_counter(counters::num_peers_half_open, -1); 4142 if (t && m_peer_info) t->dec_num_connecting(m_peer_info); 4143 m_connecting = false; 4144 } 4145 4146 // a connection attempt using uTP just failed 4147 // mark this peer as not supporting uTP 4148 // we'll never try it again (unless we're trying holepunch) 4149 if (is_utp(*m_socket) 4150 && m_peer_info 4151 && m_peer_info->supports_utp 4152 && !m_holepunch_mode) 4153 { 4154 m_peer_info->supports_utp = false; 4155 // reconnect immediately using TCP 4156 fast_reconnect(true); 4157 disconnect(e, operation_t::connect, normal); 4158 if (t && m_peer_info) 4159 { 4160 std::weak_ptr<torrent> weak_t = t; 4161 std::weak_ptr<peer_connection> weak_self = shared_from_this(); 4162 4163 // we can't touch m_connections here, since we're likely looping 4164 // over it. So defer the actual reconnection to after we've handled 4165 // the existing message queue 4166 m_ses.get_io_service().post([weak_t, weak_self]() 4167 { 4168 std::shared_ptr<torrent> tor = weak_t.lock(); 4169 std::shared_ptr<peer_connection> p = weak_self.lock(); 4170 if (tor && p) 4171 { 4172 torrent_peer* pi = p->peer_info_struct(); 4173 tor->connect_to_peer(pi, true); 4174 } 4175 }); 4176 } 4177 return; 4178 } 4179 4180 if (m_holepunch_mode) 4181 fast_reconnect(true); 4182 4183 #ifndef TORRENT_DISABLE_EXTENSIONS 4184 if ((!is_utp(*m_socket) 4185 || !m_settings.get_bool(settings_pack::enable_outgoing_tcp)) 4186 && m_peer_info 4187 && m_peer_info->supports_holepunch 4188 && !m_holepunch_mode) 4189 { 4190 // see if we can try a holepunch 4191 bt_peer_connection* p = t->find_introducer(remote()); 4192 if (p) 4193 p->write_holepunch_msg(bt_peer_connection::hp_message::rendezvous, remote()); 4194 } 4195 #endif 4196 4197 disconnect(e, operation_t::connect, failure); 4198 } 4199 4200 // the error argument defaults to 0, which means deliberate disconnect 4201 // 1 means unexpected disconnect/error 4202 // 2 protocol error (client sent something invalid) disconnect(error_code const & ec,operation_t const op,disconnect_severity_t const error)4203 void peer_connection::disconnect(error_code const& ec 4204 , operation_t const op, disconnect_severity_t const error) 4205 { 4206 TORRENT_ASSERT(is_single_thread()); 4207 #if TORRENT_USE_ASSERTS 4208 m_disconnect_started = true; 4209 #endif 4210 4211 if (m_disconnecting) return; 4212 4213 m_socket->set_close_reason(error_to_close_reason(ec)); 4214 close_reason_t const close_reason = m_socket->get_close_reason(); 4215 #ifndef TORRENT_DISABLE_LOGGING 4216 if (close_reason != close_reason_t::none) 4217 { 4218 peer_log(peer_log_alert::info, "CLOSE_REASON", "%d", int(close_reason)); 4219 } 4220 #endif 4221 4222 // while being disconnected, it's possible that our torrent_peer 4223 // pointer gets cleared. Make sure we save it to be able to keep 4224 // proper books in the piece_picker (when debugging is enabled) 4225 torrent_peer* self_peer = peer_info_struct(); 4226 4227 #ifndef TORRENT_DISABLE_LOGGING 4228 if (should_log(peer_log_alert::info)) try 4229 { 4230 static aux::array<char const*, 3, disconnect_severity_t> const str{{{ 4231 "CONNECTION_CLOSED", "CONNECTION_FAILED", "PEER_ERROR"}}}; 4232 peer_log(peer_log_alert::info, str[error], "op: %d %s" 4233 , static_cast<int>(op), print_error(ec).c_str()); 4234 4235 if (ec == boost::asio::error::eof 4236 && !in_handshake() 4237 && !is_connecting() 4238 && aux::time_now() - connected_time() < seconds(15)) 4239 { 4240 peer_log(peer_log_alert::info, "SHORT_LIVED_DISCONNECT", ""); 4241 } 4242 } 4243 catch (std::exception const& err) 4244 { 4245 peer_log(peer_log_alert::info, "PEER_ERROR" ,"op: %d ERROR: unknown error (failed with exception) %s" 4246 , static_cast<int>(op), err.what()); 4247 } 4248 #endif 4249 4250 if (!(m_channel_state[upload_channel] & peer_info::bw_network)) 4251 { 4252 // make sure we free up all send buffers that are owned 4253 // by the disk thread 4254 m_send_buffer.clear(); 4255 } 4256 4257 // we cannot do this in a constructor 4258 TORRENT_ASSERT(m_in_constructor == false); 4259 if (error > normal) 4260 { 4261 m_failed = true; 4262 } 4263 4264 if (m_connected) 4265 m_counters.inc_stats_counter(counters::num_peers_connected, -1); 4266 m_connected = false; 4267 4268 // for incoming connections, we get invalid argument errors 4269 // when asking for the remote endpoint and the socket already 4270 // closed, which is an edge case, but possible to happen when 4271 // a peer makes a TCP and uTP connection in parallel. 4272 // for outgoing connections however, why would we get this? 4273 // TORRENT_ASSERT(ec != error::invalid_argument || !m_outgoing); 4274 4275 m_counters.inc_stats_counter(counters::disconnected_peers); 4276 if (error == peer_error) m_counters.inc_stats_counter(counters::error_peers); 4277 4278 if (ec == error::connection_reset) 4279 m_counters.inc_stats_counter(counters::connreset_peers); 4280 else if (ec == error::eof) 4281 m_counters.inc_stats_counter(counters::eof_peers); 4282 else if (ec == error::connection_refused) 4283 m_counters.inc_stats_counter(counters::connrefused_peers); 4284 else if (ec == error::connection_aborted) 4285 m_counters.inc_stats_counter(counters::connaborted_peers); 4286 else if (ec == error::not_connected) 4287 m_counters.inc_stats_counter(counters::notconnected_peers); 4288 else if (ec == error::no_permission) 4289 m_counters.inc_stats_counter(counters::perm_peers); 4290 else if (ec == error::no_buffer_space) 4291 m_counters.inc_stats_counter(counters::buffer_peers); 4292 else if (ec == error::host_unreachable) 4293 m_counters.inc_stats_counter(counters::unreachable_peers); 4294 else if (ec == error::broken_pipe) 4295 m_counters.inc_stats_counter(counters::broken_pipe_peers); 4296 else if (ec == error::address_in_use) 4297 m_counters.inc_stats_counter(counters::addrinuse_peers); 4298 else if (ec == error::access_denied) 4299 m_counters.inc_stats_counter(counters::no_access_peers); 4300 else if (ec == error::invalid_argument) 4301 m_counters.inc_stats_counter(counters::invalid_arg_peers); 4302 else if (ec == error::operation_aborted) 4303 m_counters.inc_stats_counter(counters::aborted_peers); 4304 else if (ec == errors::upload_upload_connection 4305 || ec == errors::uninteresting_upload_peer 4306 || ec == errors::torrent_aborted 4307 || ec == errors::self_connection 4308 || ec == errors::torrent_paused) 4309 m_counters.inc_stats_counter(counters::uninteresting_peers); 4310 4311 if (ec == errors::timed_out 4312 || ec == error::timed_out) 4313 m_counters.inc_stats_counter(counters::transport_timeout_peers); 4314 4315 if (ec == errors::timed_out_inactivity 4316 || ec == errors::timed_out_no_request 4317 || ec == errors::timed_out_no_interest) 4318 m_counters.inc_stats_counter(counters::timeout_peers); 4319 4320 if (ec == errors::no_memory) 4321 m_counters.inc_stats_counter(counters::no_memory_peers); 4322 4323 if (ec == errors::too_many_connections) 4324 m_counters.inc_stats_counter(counters::too_many_peers); 4325 4326 if (ec == errors::timed_out_no_handshake) 4327 m_counters.inc_stats_counter(counters::connect_timeouts); 4328 4329 if (error > normal) 4330 { 4331 if (is_utp(*m_socket)) m_counters.inc_stats_counter(counters::error_utp_peers); 4332 else m_counters.inc_stats_counter(counters::error_tcp_peers); 4333 4334 if (m_outgoing) m_counters.inc_stats_counter(counters::error_outgoing_peers); 4335 else m_counters.inc_stats_counter(counters::error_incoming_peers); 4336 4337 #if !defined TORRENT_DISABLE_ENCRYPTION 4338 if (type() == connection_type::bittorrent && op != operation_t::connect) 4339 { 4340 auto* bt = static_cast<bt_peer_connection*>(this); 4341 if (bt->supports_encryption()) m_counters.inc_stats_counter( 4342 counters::error_encrypted_peers); 4343 if (bt->rc4_encrypted() && bt->supports_encryption()) 4344 m_counters.inc_stats_counter(counters::error_rc4_peers); 4345 } 4346 #endif // TORRENT_DISABLE_ENCRYPTION 4347 } 4348 4349 std::shared_ptr<peer_connection> me(self()); 4350 4351 INVARIANT_CHECK; 4352 4353 if (m_channel_state[upload_channel] & peer_info::bw_disk) 4354 { 4355 m_counters.inc_stats_counter(counters::num_peers_up_disk, -1); 4356 m_channel_state[upload_channel] &= ~peer_info::bw_disk; 4357 } 4358 if (m_channel_state[download_channel] & peer_info::bw_disk) 4359 { 4360 m_counters.inc_stats_counter(counters::num_peers_down_disk, -1); 4361 m_channel_state[download_channel] &= ~peer_info::bw_disk; 4362 } 4363 4364 std::shared_ptr<torrent> t = m_torrent.lock(); 4365 4366 // don't try to connect to ourself again 4367 if (ec == errors::self_connection && m_peer_info && t) 4368 t->ban_peer(m_peer_info); 4369 4370 if (m_connecting) 4371 { 4372 m_counters.inc_stats_counter(counters::num_peers_half_open, -1); 4373 if (t) t->dec_num_connecting(m_peer_info); 4374 m_connecting = false; 4375 } 4376 4377 torrent_handle handle; 4378 if (t) handle = t->get_handle(); 4379 4380 #ifndef TORRENT_DISABLE_EXTENSIONS 4381 for (auto const& e : m_extensions) 4382 { 4383 e->on_disconnect(ec); 4384 } 4385 #endif 4386 4387 if (ec == error::address_in_use 4388 && m_settings.get_int(settings_pack::outgoing_port) != 0 4389 && t) 4390 { 4391 if (t->alerts().should_post<performance_alert>()) 4392 t->alerts().emplace_alert<performance_alert>( 4393 handle, performance_alert::too_few_outgoing_ports); 4394 } 4395 4396 m_disconnecting = true; 4397 4398 if (t) 4399 { 4400 if (ec) 4401 { 4402 if ((error > failure || ec.category() == socks_category()) 4403 && t->alerts().should_post<peer_error_alert>()) 4404 { 4405 t->alerts().emplace_alert<peer_error_alert>(handle, remote() 4406 , pid(), op, ec); 4407 } 4408 4409 if (error <= failure && t->alerts().should_post<peer_disconnected_alert>()) 4410 { 4411 t->alerts().emplace_alert<peer_disconnected_alert>(handle 4412 , remote(), pid(), op, m_socket->type(), ec, close_reason); 4413 } 4414 } 4415 4416 // make sure we keep all the stats! 4417 if (!m_ignore_stats) 4418 { 4419 // report any partially received payload as redundant 4420 piece_block_progress pbp = downloading_piece_progress(); 4421 if (pbp.piece_index != piece_block_progress::invalid_index 4422 && pbp.bytes_downloaded > 0 4423 && pbp.bytes_downloaded < pbp.full_block_bytes) 4424 { 4425 t->add_redundant_bytes(pbp.bytes_downloaded, waste_reason::piece_closing); 4426 } 4427 } 4428 4429 if (t->has_picker()) 4430 { 4431 clear_download_queue(); 4432 piece_picker& picker = t->picker(); 4433 while (!m_request_queue.empty()) 4434 { 4435 pending_block const& qe = m_request_queue.back(); 4436 if (!qe.timed_out && !qe.not_wanted) 4437 picker.abort_download(qe.block, self_peer); 4438 m_request_queue.pop_back(); 4439 } 4440 } 4441 else 4442 { 4443 m_download_queue.clear(); 4444 m_request_queue.clear(); 4445 m_outstanding_bytes = 0; 4446 } 4447 m_queued_time_critical = 0; 4448 4449 #if TORRENT_USE_INVARIANT_CHECKS 4450 try { check_invariant(); } catch (std::exception const&) {} 4451 #endif 4452 t->remove_peer(self()); 4453 4454 // we need to do this here to maintain accurate accounting of number of 4455 // unchoke slots. Ideally the updating of choked state and the 4456 // accounting should be tighter 4457 if (!m_choked) 4458 { 4459 m_choked = true; 4460 m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1); 4461 if (!ignore_unchoke_slots()) 4462 m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1); 4463 } 4464 } 4465 else 4466 { 4467 TORRENT_ASSERT(m_download_queue.empty()); 4468 TORRENT_ASSERT(m_request_queue.empty()); 4469 m_ses.close_connection(this); 4470 } 4471 4472 async_shutdown(*m_socket, m_socket); 4473 } 4474 ignore_unchoke_slots() const4475 bool peer_connection::ignore_unchoke_slots() const 4476 { 4477 TORRENT_ASSERT(is_single_thread()); 4478 if (num_classes() == 0) return true; 4479 4480 if (m_ses.ignore_unchoke_slots_set(*this)) return true; 4481 std::shared_ptr<torrent> t = m_torrent.lock(); 4482 if (t && m_ses.ignore_unchoke_slots_set(*t)) return true; 4483 return false; 4484 } 4485 on_local_network() const4486 bool peer_connection::on_local_network() const 4487 { 4488 TORRENT_ASSERT(is_single_thread()); 4489 return is_local(m_remote.address()) 4490 || is_loopback(m_remote.address()); 4491 } 4492 request_timeout() const4493 int peer_connection::request_timeout() const 4494 { 4495 const int deviation = m_request_time.avg_deviation(); 4496 const int avg = m_request_time.mean(); 4497 4498 int ret; 4499 if (m_request_time.num_samples() < 2) 4500 { 4501 if (m_request_time.num_samples() == 0) 4502 return m_settings.get_int(settings_pack::request_timeout); 4503 4504 ret = avg + avg / 5; 4505 } 4506 else 4507 { 4508 ret = avg + deviation * 4; 4509 } 4510 4511 // ret is milliseconds, the return value is seconds. Convert to 4512 // seconds and round up 4513 ret = std::min((ret + 999) / 1000 4514 , m_settings.get_int(settings_pack::request_timeout)); 4515 4516 // timeouts should never be less than 2 seconds. The granularity is whole 4517 // seconds, and only checked once per second. 2 is the minimum to avoid 4518 // being considered timed out instantly 4519 return std::max(2, ret); 4520 } 4521 get_peer_info(peer_info & p) const4522 void peer_connection::get_peer_info(peer_info& p) const 4523 { 4524 TORRENT_ASSERT(is_single_thread()); 4525 TORRENT_ASSERT(!associated_torrent().expired()); 4526 4527 time_point const now = aux::time_now(); 4528 4529 p.download_rate_peak = m_download_rate_peak; 4530 p.upload_rate_peak = m_upload_rate_peak; 4531 p.rtt = m_request_time.mean(); 4532 p.down_speed = statistics().download_rate(); 4533 p.up_speed = statistics().upload_rate(); 4534 p.payload_down_speed = statistics().download_payload_rate(); 4535 p.payload_up_speed = statistics().upload_payload_rate(); 4536 p.pid = pid(); 4537 p.ip = remote(); 4538 p.pending_disk_bytes = m_outstanding_writing_bytes; 4539 p.pending_disk_read_bytes = m_reading_bytes; 4540 p.send_quota = m_quota[upload_channel]; 4541 p.receive_quota = m_quota[download_channel]; 4542 p.num_pieces = m_num_pieces; 4543 if (m_download_queue.empty()) p.request_timeout = -1; 4544 else p.request_timeout = int(total_seconds(m_requested - now) 4545 + request_timeout()); 4546 4547 p.download_queue_time = download_queue_time(); 4548 p.queue_bytes = m_outstanding_bytes; 4549 4550 p.total_download = statistics().total_payload_download(); 4551 p.total_upload = statistics().total_payload_upload(); 4552 #if TORRENT_ABI_VERSION == 1 4553 p.upload_limit = -1; 4554 p.download_limit = -1; 4555 p.load_balancing = 0; 4556 #endif 4557 4558 p.download_queue_length = int(download_queue().size() + m_request_queue.size()); 4559 p.requests_in_buffer = int(std::count_if(m_download_queue.begin() 4560 , m_download_queue.end() 4561 , &pending_block_in_buffer)); 4562 4563 p.target_dl_queue_length = desired_queue_size(); 4564 p.upload_queue_length = int(upload_queue().size()); 4565 p.timed_out_requests = 0; 4566 p.busy_requests = 0; 4567 for (auto const& pb : m_download_queue) 4568 { 4569 if (pb.timed_out) ++p.timed_out_requests; 4570 if (pb.busy) ++p.busy_requests; 4571 } 4572 4573 piece_block_progress const ret = downloading_piece_progress(); 4574 if (ret.piece_index != piece_block_progress::invalid_index) 4575 { 4576 p.downloading_piece_index = ret.piece_index; 4577 p.downloading_block_index = ret.block_index; 4578 p.downloading_progress = ret.bytes_downloaded; 4579 p.downloading_total = ret.full_block_bytes; 4580 } 4581 else 4582 { 4583 p.downloading_piece_index = piece_index_t(-1); 4584 p.downloading_block_index = -1; 4585 p.downloading_progress = 0; 4586 p.downloading_total = 0; 4587 } 4588 4589 p.pieces = get_bitfield(); 4590 p.last_request = now - m_last_request; 4591 p.last_active = now - std::max(m_last_sent, m_last_receive); 4592 4593 // this will set the flags so that we can update them later 4594 p.flags = {}; 4595 get_specific_peer_info(p); 4596 4597 if (m_snubbed) p.flags |= peer_info::snubbed; 4598 if (m_upload_only) p.flags |= peer_info::upload_only; 4599 if (m_endgame_mode) p.flags |= peer_info::endgame_mode; 4600 if (m_holepunch_mode) p.flags |= peer_info::holepunched; 4601 if (peer_info_struct()) 4602 { 4603 torrent_peer* pi = peer_info_struct(); 4604 TORRENT_ASSERT(pi->in_use); 4605 p.source = peer_source_flags_t(pi->source); 4606 p.failcount = pi->failcount; 4607 p.num_hashfails = pi->hashfails; 4608 if (pi->on_parole) p.flags |= peer_info::on_parole; 4609 if (pi->optimistically_unchoked) p.flags |= peer_info::optimistic_unchoke; 4610 if (pi->seed) p.flags |= peer_info::seed; 4611 } 4612 else 4613 { 4614 if (is_seed()) p.flags |= peer_info::seed; 4615 p.source = {}; 4616 p.failcount = 0; 4617 p.num_hashfails = 0; 4618 } 4619 4620 #if TORRENT_ABI_VERSION == 1 4621 p.remote_dl_rate = 0; 4622 #endif 4623 p.send_buffer_size = m_send_buffer.capacity(); 4624 p.used_send_buffer = m_send_buffer.size(); 4625 p.receive_buffer_size = m_recv_buffer.capacity(); 4626 p.used_receive_buffer = m_recv_buffer.pos(); 4627 p.receive_buffer_watermark = m_recv_buffer.watermark(); 4628 p.write_state = m_channel_state[upload_channel]; 4629 p.read_state = m_channel_state[download_channel]; 4630 4631 // pieces may be empty if we don't have metadata yet 4632 if (p.pieces.empty()) 4633 { 4634 p.progress = 0.f; 4635 p.progress_ppm = 0; 4636 } 4637 else 4638 { 4639 #if TORRENT_NO_FPU 4640 p.progress = 0.f; 4641 #else 4642 p.progress = float(p.pieces.count()) / float(p.pieces.size()); 4643 #endif 4644 p.progress_ppm = int(std::int64_t(p.pieces.count()) * 1000000 / p.pieces.size()); 4645 } 4646 4647 #if TORRENT_ABI_VERSION == 1 4648 p.estimated_reciprocation_rate = m_est_reciprocation_rate; 4649 #endif 4650 4651 error_code ec; 4652 p.local_endpoint = get_socket()->local_endpoint(ec); 4653 } 4654 4655 #ifndef TORRENT_DISABLE_SUPERSEEDING 4656 // TODO: 3 new_piece should be an optional<piece_index_t>. piece index -1 4657 // should not be allowed superseed_piece(piece_index_t const replace_piece,piece_index_t const new_piece)4658 void peer_connection::superseed_piece(piece_index_t const replace_piece 4659 , piece_index_t const new_piece) 4660 { 4661 TORRENT_ASSERT(is_single_thread()); 4662 4663 if (is_connecting()) return; 4664 if (in_handshake()) return; 4665 4666 if (new_piece == piece_index_t(-1)) 4667 { 4668 if (m_superseed_piece[0] == piece_index_t(-1)) return; 4669 m_superseed_piece[0] = piece_index_t(-1); 4670 m_superseed_piece[1] = piece_index_t(-1); 4671 4672 #ifndef TORRENT_DISABLE_LOGGING 4673 peer_log(peer_log_alert::info, "SUPER_SEEDING", "ending"); 4674 #endif 4675 std::shared_ptr<torrent> t = m_torrent.lock(); 4676 TORRENT_ASSERT(t); 4677 4678 // this will either send a full bitfield or 4679 // a have-all message, effectively terminating 4680 // super-seeding, since the peer may pick any piece 4681 write_bitfield(); 4682 4683 return; 4684 } 4685 4686 TORRENT_ASSERT(!has_piece(new_piece)); 4687 4688 #ifndef TORRENT_DISABLE_LOGGING 4689 peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d (super seed)" 4690 , static_cast<int>(new_piece)); 4691 #endif 4692 write_have(new_piece); 4693 4694 if (replace_piece >= piece_index_t(0)) 4695 { 4696 // move the piece we're replacing to the tail 4697 if (m_superseed_piece[0] == replace_piece) 4698 std::swap(m_superseed_piece[0], m_superseed_piece[1]); 4699 } 4700 4701 m_superseed_piece[1] = m_superseed_piece[0]; 4702 m_superseed_piece[0] = new_piece; 4703 } 4704 #endif // TORRENT_DISABLE_SUPERSEEDING 4705 max_out_request_queue(int s)4706 void peer_connection::max_out_request_queue(int s) 4707 { 4708 #ifndef TORRENT_DISABLE_LOGGING 4709 peer_log(peer_log_alert::info, "MAX_OUT_QUEUE_SIZE", "%d -> %d" 4710 , m_max_out_request_queue, s); 4711 #endif 4712 m_max_out_request_queue = s; 4713 } 4714 max_out_request_queue() const4715 int peer_connection::max_out_request_queue() const 4716 { 4717 return m_max_out_request_queue; 4718 } 4719 update_desired_queue_size()4720 void peer_connection::update_desired_queue_size() 4721 { 4722 TORRENT_ASSERT(is_single_thread()); 4723 if (m_snubbed) 4724 { 4725 m_desired_queue_size = 1; 4726 return; 4727 } 4728 4729 #ifndef TORRENT_DISABLE_LOGGING 4730 int const previous_queue_size = m_desired_queue_size; 4731 #endif 4732 4733 int const download_rate = statistics().download_payload_rate(); 4734 4735 // the desired download queue size 4736 int const queue_time = m_settings.get_int(settings_pack::request_queue_time); 4737 4738 // when we're in slow-start mode we increase the desired queue size every 4739 // time we receive a piece, no need to adjust it here (other than 4740 // enforcing the upper limit) 4741 if (!m_slow_start) 4742 { 4743 // (if the latency is more than this, the download will stall) 4744 // so, the queue size is queue_time * down_rate / 16 kiB 4745 // (16 kB is the size of each request) 4746 // the minimum number of requests is 2 and the maximum is 48 4747 // the block size doesn't have to be 16. So we first query the 4748 // torrent for it 4749 std::shared_ptr<torrent> t = m_torrent.lock(); 4750 int const bs = t->block_size(); 4751 4752 TORRENT_ASSERT(bs > 0); 4753 4754 m_desired_queue_size = std::uint16_t(queue_time * download_rate / bs); 4755 } 4756 4757 if (m_desired_queue_size > m_max_out_request_queue) 4758 m_desired_queue_size = std::uint16_t(m_max_out_request_queue); 4759 if (m_desired_queue_size < min_request_queue) 4760 m_desired_queue_size = min_request_queue; 4761 4762 #ifndef TORRENT_DISABLE_LOGGING 4763 if (previous_queue_size != m_desired_queue_size) 4764 { 4765 peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE" 4766 , "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d" 4767 , m_desired_queue_size, m_max_out_request_queue 4768 , download_rate, queue_time, int(m_snubbed), int(m_slow_start)); 4769 } 4770 #endif 4771 } 4772 second_tick(int const tick_interval_ms)4773 void peer_connection::second_tick(int const tick_interval_ms) 4774 { 4775 TORRENT_ASSERT(is_single_thread()); 4776 time_point const now = aux::time_now(); 4777 std::shared_ptr<peer_connection> me(self()); 4778 4779 // the invariant check must be run before me is destructed 4780 // in case the peer got disconnected 4781 INVARIANT_CHECK; 4782 4783 std::shared_ptr<torrent> t = m_torrent.lock(); 4784 4785 int warning = 0; 4786 // drain the IP overhead from the bandwidth limiters 4787 if (m_settings.get_bool(settings_pack::rate_limit_ip_overhead) && t) 4788 { 4789 warning |= m_ses.use_quota_overhead(*this, m_statistics.download_ip_overhead() 4790 , m_statistics.upload_ip_overhead()); 4791 warning |= m_ses.use_quota_overhead(*t, m_statistics.download_ip_overhead() 4792 , m_statistics.upload_ip_overhead()); 4793 } 4794 4795 if (warning && t->alerts().should_post<performance_alert>()) 4796 { 4797 for (int channel = 0; channel < 2; ++channel) 4798 { 4799 if ((warning & (1 << channel)) == 0) continue; 4800 t->alerts().emplace_alert<performance_alert>(t->get_handle() 4801 , channel == peer_connection::download_channel 4802 ? performance_alert::download_limit_too_low 4803 : performance_alert::upload_limit_too_low); 4804 } 4805 } 4806 4807 if (!t || m_disconnecting) 4808 { 4809 TORRENT_ASSERT(t || !m_connecting); 4810 if (m_connecting) 4811 { 4812 m_counters.inc_stats_counter(counters::num_peers_half_open, -1); 4813 if (t) t->dec_num_connecting(m_peer_info); 4814 m_connecting = false; 4815 } 4816 disconnect(errors::torrent_aborted, operation_t::bittorrent); 4817 return; 4818 } 4819 4820 if (m_endgame_mode 4821 && m_interesting 4822 && m_download_queue.empty() 4823 && m_request_queue.empty() 4824 && now - seconds(5) >= m_last_request) 4825 { 4826 // this happens when we're in strict end-game 4827 // mode and the peer could not request any blocks 4828 // because they were all taken but there were still 4829 // unrequested blocks. Now, 5 seconds later, there 4830 // might not be any unrequested blocks anymore, so 4831 // we should try to pick another block to see 4832 // if we can pick a busy one 4833 m_last_request = now; 4834 if (request_a_block(*t, *this)) 4835 m_counters.inc_stats_counter(counters::end_game_piece_picks); 4836 if (m_disconnecting) return; 4837 send_block_requests(); 4838 } 4839 4840 #ifndef TORRENT_DISABLE_SUPERSEEDING 4841 if (t->super_seeding() 4842 && t->ready_for_connections() 4843 && !m_peer_interested 4844 && m_became_uninterested + seconds(10) < now) 4845 { 4846 // maybe we need to try another piece, to see if the peer 4847 // become interested in us then 4848 superseed_piece(piece_index_t(-1), t->get_piece_to_super_seed(m_have_piece)); 4849 } 4850 #endif 4851 4852 on_tick(); 4853 if (is_disconnecting()) return; 4854 4855 #ifndef TORRENT_DISABLE_EXTENSIONS 4856 for (auto const& e : m_extensions) 4857 { 4858 e->tick(); 4859 } 4860 if (is_disconnecting()) return; 4861 #endif 4862 4863 // if the peer hasn't said a thing for a certain 4864 // time, it is considered to have timed out 4865 time_duration d = now - m_last_receive; 4866 4867 if (m_connecting) 4868 { 4869 int connect_timeout = m_settings.get_int(settings_pack::peer_connect_timeout); 4870 if (m_peer_info) connect_timeout += 3 * m_peer_info->failcount; 4871 4872 // SSL and i2p handshakes are slow 4873 if (is_ssl(*m_socket)) 4874 connect_timeout += 10; 4875 4876 #if TORRENT_USE_I2P 4877 if (is_i2p(*m_socket)) 4878 connect_timeout += 20; 4879 #endif 4880 4881 if (d > seconds(connect_timeout) 4882 && can_disconnect(errors::timed_out)) 4883 { 4884 #ifndef TORRENT_DISABLE_LOGGING 4885 peer_log(peer_log_alert::info, "CONNECT_FAILED", "waited %d seconds" 4886 , int(total_seconds(d))); 4887 #endif 4888 connect_failed(errors::timed_out); 4889 return; 4890 } 4891 } 4892 4893 // if the bw_network flag isn't set, it means we are not even trying to 4894 // read from this peer's socket. Most likely because we're applying a 4895 // rate limit. If the peer is "slow" because we are rate limiting it, 4896 // don't enforce timeouts. However, as soon as we *do* read from the 4897 // socket, we expect to receive data, and not have timed out. Then we 4898 // can enforce the timeouts. 4899 bool const reading_socket = bool(m_channel_state[download_channel] & peer_info::bw_network); 4900 4901 // TODO: 2 use a deadline_timer for timeouts. Don't rely on second_tick()! 4902 // Hook this up to connect timeout as well. This would improve performance 4903 // because of less work in second_tick(), and might let use remove ticking 4904 // entirely eventually 4905 if (reading_socket && d > seconds(timeout()) && !m_connecting && m_reading_bytes == 0 4906 && can_disconnect(errors::timed_out_inactivity)) 4907 { 4908 #ifndef TORRENT_DISABLE_LOGGING 4909 peer_log(peer_log_alert::info, "LAST_ACTIVITY", "%d seconds ago" 4910 , int(total_seconds(d))); 4911 #endif 4912 disconnect(errors::timed_out_inactivity, operation_t::bittorrent); 4913 return; 4914 } 4915 4916 // do not stall waiting for a handshake 4917 int timeout = m_settings.get_int (settings_pack::handshake_timeout); 4918 #if TORRENT_USE_I2P 4919 timeout *= is_i2p(*m_socket) ? 4 : 1; 4920 #endif 4921 if (reading_socket 4922 && !m_connecting 4923 && in_handshake() 4924 && d > seconds(timeout)) 4925 { 4926 #ifndef TORRENT_DISABLE_LOGGING 4927 peer_log(peer_log_alert::info, "NO_HANDSHAKE", "waited %d seconds" 4928 , int(total_seconds(d))); 4929 #endif 4930 disconnect(errors::timed_out_no_handshake, operation_t::bittorrent); 4931 return; 4932 } 4933 4934 // disconnect peers that we unchoked, but they didn't send a request in 4935 // the last 60 seconds, and we haven't been working on servicing a request 4936 // for more than 60 seconds. 4937 // but only if we're a seed 4938 d = now - std::max(std::max(m_last_unchoke, m_last_incoming_request) 4939 , m_last_sent_payload); 4940 4941 if (reading_socket 4942 && !m_connecting 4943 && m_requests.empty() 4944 && m_reading_bytes == 0 4945 && !m_choked 4946 && m_peer_interested 4947 && t && t->is_upload_only() 4948 && d > seconds(60) 4949 && can_disconnect(errors::timed_out_no_request)) 4950 { 4951 #ifndef TORRENT_DISABLE_LOGGING 4952 peer_log(peer_log_alert::info, "NO_REQUEST", "waited %d seconds" 4953 , int(total_seconds(d))); 4954 #endif 4955 disconnect(errors::timed_out_no_request, operation_t::bittorrent); 4956 return; 4957 } 4958 4959 // if the peer hasn't become interested and we haven't 4960 // become interested in the peer for 10 minutes, it 4961 // has also timed out. 4962 time_duration const d1 = now - m_became_uninterested; 4963 time_duration const d2 = now - m_became_uninteresting; 4964 time_duration const time_limit = seconds( 4965 m_settings.get_int(settings_pack::inactivity_timeout)); 4966 4967 // if we are close enough to the limit, consider the peer connection 4968 // list full. This will enable the inactive timeout 4969 bool const max_session_conns = m_ses.num_connections() 4970 >= m_settings.get_int(settings_pack::connections_limit) - 5; 4971 bool const max_torrent_conns = t && t->num_peers() 4972 >= t->max_connections() - 5; 4973 4974 // don't bother disconnect peers we haven't been interested 4975 // in (and that hasn't been interested in us) for a while 4976 // unless we have used up all our connection slots 4977 if (reading_socket 4978 && !m_interesting 4979 && !m_peer_interested 4980 && d1 > time_limit 4981 && d2 > time_limit 4982 && (max_session_conns || max_torrent_conns) 4983 && can_disconnect(errors::timed_out_no_interest)) 4984 { 4985 #ifndef TORRENT_DISABLE_LOGGING 4986 if (should_log(peer_log_alert::info)) 4987 { 4988 peer_log(peer_log_alert::info, "MUTUAL_NO_INTEREST", "t1: %d t2: %d" 4989 , int(total_seconds(d1)), int(total_seconds(d2))); 4990 } 4991 #endif 4992 disconnect(errors::timed_out_no_interest, operation_t::bittorrent); 4993 return; 4994 } 4995 4996 if (reading_socket 4997 && !m_download_queue.empty() 4998 && m_quota[download_channel] > 0 4999 && now > m_requested + seconds(request_timeout())) 5000 { 5001 snub_peer(); 5002 } 5003 5004 // if we haven't sent something in too long, send a keep-alive 5005 keep_alive(); 5006 5007 // if our download rate isn't increasing significantly anymore, end slow 5008 // start. The 10kB is to have some slack here. 5009 // we can't do this when we're choked, because we aren't sending any 5010 // requests yet, so there hasn't been an opportunity to ramp up the 5011 // connection yet. 5012 if (m_slow_start 5013 && !m_peer_choked 5014 && m_downloaded_last_second > 0 5015 && m_downloaded_last_second + 5000 5016 >= m_statistics.last_payload_downloaded()) 5017 { 5018 m_slow_start = false; 5019 #ifndef TORRENT_DISABLE_LOGGING 5020 if (should_log(peer_log_alert::info)) 5021 { 5022 peer_log(peer_log_alert::info, "SLOW_START", "exit slow start: " 5023 "prev-dl: %d dl: %d" 5024 , int(m_downloaded_last_second) 5025 , m_statistics.last_payload_downloaded()); 5026 } 5027 #endif 5028 } 5029 m_downloaded_last_second = m_statistics.last_payload_downloaded(); 5030 m_uploaded_last_second = m_statistics.last_payload_uploaded(); 5031 5032 m_statistics.second_tick(tick_interval_ms); 5033 5034 if (m_statistics.upload_payload_rate() > m_upload_rate_peak) 5035 { 5036 m_upload_rate_peak = m_statistics.upload_payload_rate(); 5037 } 5038 if (m_statistics.download_payload_rate() > m_download_rate_peak) 5039 { 5040 m_download_rate_peak = m_statistics.download_payload_rate(); 5041 } 5042 if (is_disconnecting()) return; 5043 5044 if (!t->ready_for_connections()) return; 5045 5046 update_desired_queue_size(); 5047 5048 if (m_desired_queue_size == m_max_out_request_queue 5049 && t->alerts().should_post<performance_alert>()) 5050 { 5051 t->alerts().emplace_alert<performance_alert>(t->get_handle() 5052 , performance_alert::outstanding_request_limit_reached); 5053 } 5054 5055 int const piece_timeout = m_settings.get_int(settings_pack::piece_timeout); 5056 5057 if (!m_download_queue.empty() 5058 && m_quota[download_channel] > 0 5059 && now - m_last_piece > seconds(piece_timeout)) 5060 { 5061 // this peer isn't sending the pieces we've 5062 // requested (this has been observed by BitComet) 5063 // in this case we'll clear our download queue and 5064 // re-request the blocks. 5065 #ifndef TORRENT_DISABLE_LOGGING 5066 if (should_log(peer_log_alert::info)) 5067 { 5068 peer_log(peer_log_alert::info, "PIECE_REQUEST_TIMED_OUT" 5069 , "%d time: %d to: %d" 5070 , int(m_download_queue.size()), int(total_seconds(now - m_last_piece)) 5071 , piece_timeout); 5072 } 5073 #endif 5074 5075 snub_peer(); 5076 } 5077 5078 fill_send_buffer(); 5079 } 5080 snub_peer()5081 void peer_connection::snub_peer() 5082 { 5083 TORRENT_ASSERT(is_single_thread()); 5084 INVARIANT_CHECK; 5085 5086 std::shared_ptr<torrent> t = m_torrent.lock(); 5087 TORRENT_ASSERT(t); 5088 5089 if (!m_snubbed) 5090 { 5091 m_snubbed = true; 5092 m_slow_start = false; 5093 if (t->alerts().should_post<peer_snubbed_alert>()) 5094 { 5095 t->alerts().emplace_alert<peer_snubbed_alert>(t->get_handle() 5096 , m_remote, m_peer_id); 5097 } 5098 } 5099 m_desired_queue_size = 1; 5100 5101 if (on_parole()) return; 5102 5103 if (!t->has_picker()) return; 5104 piece_picker& picker = t->picker(); 5105 5106 // first, if we have any unsent requests, just 5107 // wipe those out 5108 while (!m_request_queue.empty()) 5109 { 5110 t->picker().abort_download(m_request_queue.back().block, peer_info_struct()); 5111 m_request_queue.pop_back(); 5112 } 5113 m_queued_time_critical = 0; 5114 5115 TORRENT_ASSERT(!m_download_queue.empty()); 5116 5117 // time out the last request eligible 5118 // block in the queue 5119 int i = int(m_download_queue.size()) - 1; 5120 for (; i >= 0; --i) 5121 { 5122 if (!m_download_queue[i].timed_out 5123 && !m_download_queue[i].not_wanted) 5124 break; 5125 } 5126 5127 if (i >= 0) 5128 { 5129 pending_block& qe = m_download_queue[i]; 5130 piece_block const r = qe.block; 5131 5132 // only cancel a request if it blocks the piece from being completed 5133 // (i.e. no free blocks to request from it) 5134 piece_picker::downloading_piece p; 5135 picker.piece_info(qe.block.piece_index, p); 5136 int const free_blocks = picker.blocks_in_piece(qe.block.piece_index) 5137 - p.finished - p.writing - p.requested; 5138 5139 // if there are still blocks available for other peers to pick, we're 5140 // still not holding up the completion of the piece and there's no 5141 // need to cancel the requests. For more information, see: 5142 // http://blog.libtorrent.org/2011/11/block-request-time-outs/ 5143 if (free_blocks > 0) 5144 { 5145 send_block_requests(); 5146 return; 5147 } 5148 5149 if (t->alerts().should_post<block_timeout_alert>()) 5150 { 5151 t->alerts().emplace_alert<block_timeout_alert>(t->get_handle() 5152 , remote(), pid(), qe.block.block_index 5153 , qe.block.piece_index); 5154 } 5155 5156 // request a new block before removing the previous 5157 // one, in order to prevent it from 5158 // picking the same block again, stalling the 5159 // same piece indefinitely. 5160 m_desired_queue_size = 2; 5161 if (request_a_block(*t, *this)) 5162 m_counters.inc_stats_counter(counters::snubbed_piece_picks); 5163 5164 // the block we just picked (potentially) 5165 // hasn't been put in m_download_queue yet. 5166 // it's in m_request_queue and will be sent 5167 // once send_block_requests() is called. 5168 5169 m_desired_queue_size = 1; 5170 5171 qe.timed_out = true; 5172 picker.abort_download(r, peer_info_struct()); 5173 } 5174 5175 send_block_requests(); 5176 } 5177 fill_send_buffer()5178 void peer_connection::fill_send_buffer() 5179 { 5180 TORRENT_ASSERT(is_single_thread()); 5181 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS 5182 INVARIANT_CHECK; 5183 #endif 5184 5185 #ifndef TORRENT_DISABLE_SHARE_MODE 5186 bool sent_a_piece = false; 5187 #endif 5188 std::shared_ptr<torrent> t = m_torrent.lock(); 5189 if (!t || t->is_aborted() || m_requests.empty()) return; 5190 5191 // only add new piece-chunks if the send buffer is small enough 5192 // otherwise there will be no end to how large it will be! 5193 5194 int buffer_size_watermark = int(std::int64_t(m_uploaded_last_second) 5195 * m_settings.get_int(settings_pack::send_buffer_watermark_factor) / 100); 5196 5197 if (buffer_size_watermark < m_settings.get_int(settings_pack::send_buffer_low_watermark)) 5198 { 5199 buffer_size_watermark = m_settings.get_int(settings_pack::send_buffer_low_watermark); 5200 } 5201 else if (buffer_size_watermark > m_settings.get_int(settings_pack::send_buffer_watermark)) 5202 { 5203 buffer_size_watermark = m_settings.get_int(settings_pack::send_buffer_watermark); 5204 } 5205 5206 #ifndef TORRENT_DISABLE_LOGGING 5207 if (should_log(peer_log_alert::outgoing)) 5208 { 5209 peer_log(peer_log_alert::outgoing, "SEND_BUFFER_WATERMARK" 5210 , "current watermark: %d max: %d min: %d factor: %d uploaded: %d B/s" 5211 , buffer_size_watermark 5212 , m_ses.settings().get_int(settings_pack::send_buffer_watermark) 5213 , m_ses.settings().get_int(settings_pack::send_buffer_low_watermark) 5214 , m_ses.settings().get_int(settings_pack::send_buffer_watermark_factor) 5215 , int(m_uploaded_last_second)); 5216 } 5217 #endif 5218 5219 // don't just pop the front element here, since in seed mode one request may 5220 // be blocked because we have to verify the hash first, so keep going with the 5221 // next request. However, only let each peer have one hash verification outstanding 5222 // at any given time 5223 for (int i = 0; i < int(m_requests.size()) 5224 && (send_buffer_size() + m_reading_bytes < buffer_size_watermark); ++i) 5225 { 5226 TORRENT_ASSERT(t->ready_for_connections()); 5227 peer_request& r = m_requests[i]; 5228 5229 TORRENT_ASSERT(r.piece >= piece_index_t(0)); 5230 TORRENT_ASSERT(r.piece < piece_index_t(m_have_piece.size())); 5231 TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece)); 5232 TORRENT_ASSERT(r.length > 0 && r.start >= 0); 5233 5234 if (t->is_deleted()) 5235 { 5236 #ifndef TORRENT_DISABLE_LOGGING 5237 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE" 5238 , "piece: %d s: %x l: %x torrent deleted" 5239 , static_cast<int>(r.piece), r.start , r.length); 5240 #endif 5241 write_reject_request(r); 5242 continue; 5243 } 5244 5245 bool const seed_mode = t->seed_mode(); 5246 5247 if (seed_mode 5248 && !t->verified_piece(r.piece) 5249 && !m_settings.get_bool(settings_pack::disable_hash_checks)) 5250 { 5251 // we're still verifying the hash of this piece 5252 // so we can't return it yet. 5253 if (t->verifying_piece(r.piece)) continue; 5254 5255 // only have three outstanding hash check per peer 5256 if (m_outstanding_piece_verification >= 3) continue; 5257 5258 ++m_outstanding_piece_verification; 5259 5260 #ifndef TORRENT_DISABLE_LOGGING 5261 peer_log(peer_log_alert::info, "SEED_MODE_FILE_ASYNC_HASH" 5262 , "piece: %d", static_cast<int>(r.piece)); 5263 #endif 5264 // this means we're in seed mode and we haven't yet 5265 // verified this piece (r.piece) 5266 auto conn = self(); 5267 m_disk_thread.async_hash(t->storage(), r.piece, {} 5268 , [conn](piece_index_t p, sha1_hash const& ph, storage_error const& e) { 5269 conn->wrap(&peer_connection::on_seed_mode_hashed, p, ph, e); }); 5270 t->verifying(r.piece); 5271 continue; 5272 } 5273 5274 if (!t->has_piece_passed(r.piece) && !seed_mode) 5275 { 5276 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES 5277 // we don't have this piece yet, but we anticipate to have 5278 // it very soon, so we have told our peers we have it. 5279 // hold off on sending it. If the piece fails later 5280 // we will reject this request 5281 if (t->is_predictive_piece(r.piece)) continue; 5282 #endif 5283 #ifndef TORRENT_DISABLE_LOGGING 5284 peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE" 5285 , "piece: %d s: %x l: %x piece not passed hash check" 5286 , static_cast<int>(r.piece), r.start , r.length); 5287 #endif 5288 write_reject_request(r); 5289 } 5290 else 5291 { 5292 #ifndef TORRENT_DISABLE_LOGGING 5293 peer_log(peer_log_alert::info, "FILE_ASYNC_READ" 5294 , "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length); 5295 #endif 5296 m_reading_bytes += r.length; 5297 #ifndef TORRENT_DISABLE_SHARE_MODE 5298 sent_a_piece = true; 5299 #endif 5300 5301 // the callback function may be called immediately, instead of being posted 5302 5303 TORRENT_ASSERT(t->valid_metadata()); 5304 TORRENT_ASSERT(r.piece >= piece_index_t(0)); 5305 TORRENT_ASSERT(r.piece < t->torrent_file().end_piece()); 5306 5307 auto conn = self(); 5308 m_disk_thread.async_read(t->storage(), r 5309 , [conn, r](disk_buffer_holder buf, disk_job_flags_t f, storage_error const& ec) 5310 { conn->wrap(&peer_connection::on_disk_read_complete, std::move(buf), f, ec, r, clock_type::now()); }); 5311 } 5312 m_last_sent_payload = clock_type::now(); 5313 m_requests.erase(m_requests.begin() + i); 5314 5315 if (m_requests.empty()) 5316 m_counters.inc_stats_counter(counters::num_peers_up_requests, -1); 5317 5318 --i; 5319 } 5320 5321 #ifndef TORRENT_DISABLE_SHARE_MODE 5322 if (t->share_mode() && sent_a_piece) 5323 t->recalc_share_mode(); 5324 #endif 5325 } 5326 5327 // this is called when a previously unchecked piece has been 5328 // checked, while in seed-mode on_seed_mode_hashed(piece_index_t const piece,sha1_hash const & piece_hash,storage_error const & error)5329 void peer_connection::on_seed_mode_hashed(piece_index_t const piece 5330 , sha1_hash const& piece_hash, storage_error const& error) 5331 { 5332 TORRENT_ASSERT(is_single_thread()); 5333 INVARIANT_CHECK; 5334 5335 std::shared_ptr<torrent> t = m_torrent.lock(); 5336 5337 TORRENT_ASSERT(m_outstanding_piece_verification > 0); 5338 --m_outstanding_piece_verification; 5339 5340 if (!t || t->is_aborted()) return; 5341 5342 if (error) 5343 { 5344 t->handle_disk_error("hash", error, this); 5345 t->leave_seed_mode(torrent::seed_mode_t::check_files); 5346 return; 5347 } 5348 5349 // we're using the piece hashes here, we need the torrent to be loaded 5350 if (!m_settings.get_bool(settings_pack::disable_hash_checks) 5351 && piece_hash != t->torrent_file().hash_for_piece(piece)) 5352 { 5353 #ifndef TORRENT_DISABLE_LOGGING 5354 peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH" 5355 , "piece: %d failed", static_cast<int>(piece)); 5356 #endif 5357 5358 t->leave_seed_mode(torrent::seed_mode_t::check_files); 5359 } 5360 else 5361 { 5362 if (t->seed_mode()) 5363 { 5364 TORRENT_ASSERT(t->verifying_piece(piece)); 5365 t->verified(piece); 5366 } 5367 5368 #ifndef TORRENT_DISABLE_LOGGING 5369 peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH" 5370 , "piece: %d passed", static_cast<int>(piece)); 5371 #endif 5372 if (t->seed_mode() && t->all_verified()) 5373 t->leave_seed_mode(torrent::seed_mode_t::skip_checking); 5374 } 5375 5376 // try to service the requests again, now that the piece 5377 // has been verified 5378 fill_send_buffer(); 5379 } 5380 on_disk_read_complete(disk_buffer_holder buffer,disk_job_flags_t const flags,storage_error const & error,peer_request const & r,time_point const issue_time)5381 void peer_connection::on_disk_read_complete(disk_buffer_holder buffer 5382 , disk_job_flags_t const flags, storage_error const& error 5383 , peer_request const& r, time_point const issue_time) 5384 { 5385 TORRENT_ASSERT(is_single_thread()); 5386 // return value: 5387 // 0: success, piece passed hash check 5388 // -1: disk failure 5389 5390 int const disk_rtt = int(total_microseconds(clock_type::now() - issue_time)); 5391 5392 #ifndef TORRENT_DISABLE_LOGGING 5393 if (should_log(peer_log_alert::info)) 5394 { 5395 peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE" 5396 , "piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us" 5397 , static_cast<int>(r.piece), r.start, r.length 5398 , static_cast<void*>(buffer.get()) 5399 , ((flags & disk_interface::cache_hit) ? "cache hit" : "cache miss") 5400 , error.ec.message().c_str(), disk_rtt); 5401 } 5402 #endif 5403 5404 m_reading_bytes -= r.length; 5405 5406 std::shared_ptr<torrent> t = m_torrent.lock(); 5407 if (error) 5408 { 5409 if (!t) 5410 { 5411 disconnect(error.ec, operation_t::file_read); 5412 return; 5413 } 5414 5415 write_dont_have(r.piece); 5416 write_reject_request(r); 5417 if (t->alerts().should_post<file_error_alert>()) 5418 t->alerts().emplace_alert<file_error_alert>(error.ec 5419 , t->resolve_filename(error.file()) 5420 , error.operation, t->get_handle()); 5421 5422 ++m_disk_read_failures; 5423 if (m_disk_read_failures > 100) disconnect(error.ec, operation_t::file_read); 5424 return; 5425 } 5426 5427 // we're only interested in failures in a row. 5428 // if we every now and then successfully send a 5429 // block, the peer is still useful 5430 m_disk_read_failures = 0; 5431 5432 if (t && m_settings.get_int(settings_pack::suggest_mode) 5433 == settings_pack::suggest_read_cache) 5434 { 5435 // tell the torrent that we just read a block from this piece. 5436 // if this piece is low-availability, it's now a candidate for being 5437 // suggested to other peers 5438 t->add_suggest_piece(r.piece); 5439 } 5440 5441 if (m_disconnecting) return; 5442 5443 if (!t) 5444 { 5445 disconnect(error.ec, operation_t::file_read); 5446 return; 5447 } 5448 5449 #ifndef TORRENT_DISABLE_LOGGING 5450 peer_log(peer_log_alert::outgoing_message 5451 , "PIECE", "piece: %d s: %x l: %x" 5452 , static_cast<int>(r.piece), r.start, r.length); 5453 #endif 5454 5455 m_counters.blend_stats_counter(counters::request_latency, disk_rtt, 5); 5456 5457 // we probably just pulled this piece into the cache. 5458 // if it's rare enough to make it into the suggested piece 5459 // push another piece out 5460 if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache 5461 && !(flags & disk_interface::cache_hit)) 5462 { 5463 t->add_suggest_piece(r.piece); 5464 } 5465 write_piece(r, std::move(buffer)); 5466 } 5467 assign_bandwidth(int const channel,int const amount)5468 void peer_connection::assign_bandwidth(int const channel, int const amount) 5469 { 5470 TORRENT_ASSERT(is_single_thread()); 5471 #ifndef TORRENT_DISABLE_LOGGING 5472 peer_log(channel == upload_channel 5473 ? peer_log_alert::outgoing : peer_log_alert::incoming 5474 , "ASSIGN_BANDWIDTH", "bytes: %d", amount); 5475 #endif 5476 5477 TORRENT_ASSERT(amount > 0 || is_disconnecting()); 5478 m_quota[channel] += amount; 5479 TORRENT_ASSERT(m_channel_state[channel] & peer_info::bw_limit); 5480 m_channel_state[channel] &= ~peer_info::bw_limit; 5481 5482 #if TORRENT_USE_INVARIANT_CHECKS 5483 check_invariant(); 5484 #endif 5485 5486 if (is_disconnecting()) return; 5487 if (channel == upload_channel) 5488 { 5489 setup_send(); 5490 } 5491 else if (channel == download_channel) 5492 { 5493 setup_receive(); 5494 } 5495 } 5496 5497 // the number of bytes we expect to receive, or want to send 5498 // channel either refer to upload or download. This is used 5499 // by the rate limiter to allocate quota for this peer wanted_transfer(int const channel)5500 int peer_connection::wanted_transfer(int const channel) 5501 { 5502 TORRENT_ASSERT(is_single_thread()); 5503 5504 const int tick_interval = std::max(1, m_settings.get_int(settings_pack::tick_interval)); 5505 5506 if (channel == download_channel) 5507 { 5508 std::int64_t const download_rate = std::int64_t(m_statistics.download_rate()) * 3 / 2; 5509 return std::max({m_outstanding_bytes + 30 5510 , m_recv_buffer.packet_bytes_remaining() + 30 5511 , int(download_rate * tick_interval / 1000)}); 5512 } 5513 else 5514 { 5515 std::int64_t const upload_rate = std::int64_t(m_statistics.upload_rate()) * 2; 5516 return std::max({m_reading_bytes 5517 , m_send_buffer.size() 5518 , int(upload_rate * tick_interval / 1000)}); 5519 } 5520 } 5521 request_bandwidth(int const channel,int bytes)5522 int peer_connection::request_bandwidth(int const channel, int bytes) 5523 { 5524 TORRENT_ASSERT(is_single_thread()); 5525 INVARIANT_CHECK; 5526 5527 // we can only have one outstanding bandwidth request at a time 5528 if (m_channel_state[channel] & peer_info::bw_limit) return 0; 5529 5530 std::shared_ptr<torrent> t = m_torrent.lock(); 5531 5532 bytes = std::max(wanted_transfer(channel), bytes); 5533 5534 // we already have enough quota 5535 if (m_quota[channel] >= bytes) return 0; 5536 5537 // deduct the bytes we already have quota for 5538 bytes -= m_quota[channel]; 5539 5540 int const priority = get_priority(channel); 5541 5542 int const max_channels = num_classes() + (t ? t->num_classes() : 0) + 2; 5543 TORRENT_ALLOCA(channels, bandwidth_channel*, max_channels); 5544 5545 // collect the pointers to all bandwidth channels 5546 // that apply to this torrent 5547 int c = 0; 5548 5549 c += m_ses.copy_pertinent_channels(*this, channel 5550 , channels.subspan(c).data(), max_channels - c); 5551 if (t) 5552 { 5553 c += m_ses.copy_pertinent_channels(*t, channel 5554 , channels.subspan(c).data(), max_channels - c); 5555 } 5556 5557 #if TORRENT_USE_ASSERTS 5558 // make sure we don't have duplicates 5559 std::set<bandwidth_channel*> unique_classes; 5560 for (int i = 0; i < c; ++i) 5561 { 5562 TORRENT_ASSERT(unique_classes.count(channels[i]) == 0); 5563 unique_classes.insert(channels[i]); 5564 } 5565 #endif 5566 5567 TORRENT_ASSERT(!(m_channel_state[channel] & peer_info::bw_limit)); 5568 5569 bandwidth_manager* manager = m_ses.get_bandwidth_manager(channel); 5570 5571 int const ret = manager->request_bandwidth(self() 5572 , bytes, priority, channels.data(), c); 5573 5574 if (ret == 0) 5575 { 5576 #ifndef TORRENT_DISABLE_LOGGING 5577 auto const dir = channel == download_channel ? peer_log_alert::incoming 5578 : peer_log_alert::outgoing; 5579 if (should_log(dir)) 5580 { 5581 peer_log(dir, 5582 "REQUEST_BANDWIDTH", "bytes: %d quota: %d wanted_transfer: %d " 5583 "prio: %d num_channels: %d", bytes, m_quota[channel] 5584 , wanted_transfer(channel), priority, c); 5585 } 5586 #endif 5587 m_channel_state[channel] |= peer_info::bw_limit; 5588 } 5589 else 5590 { 5591 m_quota[channel] += ret; 5592 } 5593 5594 return ret; 5595 } 5596 setup_send()5597 void peer_connection::setup_send() 5598 { 5599 TORRENT_ASSERT(is_single_thread()); 5600 5601 if (m_disconnecting || m_send_buffer.empty()) return; 5602 5603 // we may want to request more quota at this point 5604 request_bandwidth(upload_channel); 5605 5606 // if we already have an outstanding send operation, don't issue another 5607 // one, instead accrue more send buffer to coalesce for the next write 5608 if (m_channel_state[upload_channel] & peer_info::bw_network) 5609 { 5610 #ifndef TORRENT_DISABLE_LOGGING 5611 peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d" 5612 , m_send_buffer.size()); 5613 #endif 5614 return; 5615 } 5616 5617 if (m_send_barrier == 0) 5618 { 5619 std::vector<span<char>> vec; 5620 // limit outgoing crypto messages to 1MB 5621 int const send_bytes = std::min(m_send_buffer.size(), 1024 * 1024); 5622 m_send_buffer.build_mutable_iovec(send_bytes, vec); 5623 int next_barrier; 5624 span<span<char const>> inject_vec; 5625 std::tie(next_barrier, inject_vec) = hit_send_barrier(vec); 5626 for (auto i = inject_vec.rbegin(); i != inject_vec.rend(); ++i) 5627 { 5628 // this const_cast is a here because chained_buffer need to be 5629 // fixed. 5630 auto* ptr = const_cast<char*>(i->data()); 5631 m_send_buffer.prepend_buffer(span<char>(ptr, i->size()) 5632 , static_cast<int>(i->size())); 5633 } 5634 set_send_barrier(next_barrier); 5635 } 5636 5637 if ((m_quota[upload_channel] == 0 || m_send_barrier == 0) 5638 && !m_send_buffer.empty() 5639 && !m_connecting) 5640 { 5641 return; 5642 } 5643 5644 int const quota_left = m_quota[upload_channel]; 5645 if (m_send_buffer.empty() 5646 && m_reading_bytes > 0 5647 && quota_left > 0) 5648 { 5649 if (!(m_channel_state[upload_channel] & peer_info::bw_disk)) 5650 m_counters.inc_stats_counter(counters::num_peers_up_disk); 5651 m_channel_state[upload_channel] |= peer_info::bw_disk; 5652 #ifndef TORRENT_DISABLE_LOGGING 5653 peer_log(peer_log_alert::outgoing, "WAITING_FOR_DISK", "outstanding: %d" 5654 , m_reading_bytes); 5655 #endif 5656 5657 if (!m_connecting 5658 && !m_requests.empty() 5659 && m_reading_bytes > m_settings.get_int(settings_pack::send_buffer_watermark) - 0x4000) 5660 { 5661 std::shared_ptr<torrent> t = m_torrent.lock(); 5662 5663 // we're stalled on the disk. We want to write and we can write 5664 // but our send buffer is empty, waiting to be refilled from the disk 5665 // this either means the disk is slower than the network connection 5666 // or that our send buffer watermark is too small, because we can 5667 // send it all before the disk gets back to us. That's why we only 5668 // trigger this if we've also filled the allowed send buffer. The 5669 // first request would not fill it all the way up because of the 5670 // upload rate being virtually 0. If m_requests is empty, it doesn't 5671 // matter anyway, because we don't have any more requests from the 5672 // peer to hang on to the disk 5673 if (t && t->alerts().should_post<performance_alert>()) 5674 { 5675 t->alerts().emplace_alert<performance_alert>(t->get_handle() 5676 , performance_alert::send_buffer_watermark_too_low); 5677 } 5678 } 5679 } 5680 else 5681 { 5682 if (m_channel_state[upload_channel] & peer_info::bw_disk) 5683 m_counters.inc_stats_counter(counters::num_peers_up_disk, -1); 5684 m_channel_state[upload_channel] &= ~peer_info::bw_disk; 5685 } 5686 5687 if (!can_write()) 5688 { 5689 #ifndef TORRENT_DISABLE_LOGGING 5690 if (should_log(peer_log_alert::outgoing)) 5691 { 5692 if (m_send_buffer.empty()) 5693 { 5694 peer_log(peer_log_alert::outgoing, "SEND_BUFFER_DEPLETED" 5695 , "quota: %d buf: %d connecting: %s disconnecting: %s " 5696 "pending_disk: %d piece-requests: %d" 5697 , m_quota[upload_channel] 5698 , m_send_buffer.size(), m_connecting?"yes":"no" 5699 , m_disconnecting?"yes":"no", m_reading_bytes 5700 , int(m_requests.size())); 5701 } 5702 else 5703 { 5704 peer_log(peer_log_alert::outgoing, "CANNOT_WRITE" 5705 , "quota: %d buf: %d connecting: %s disconnecting: %s " 5706 "pending_disk: %d" 5707 , m_quota[upload_channel] 5708 , m_send_buffer.size(), m_connecting?"yes":"no" 5709 , m_disconnecting?"yes":"no", m_reading_bytes); 5710 } 5711 } 5712 #endif 5713 return; 5714 } 5715 5716 int const amount_to_send = std::min({ 5717 m_send_buffer.size() 5718 , quota_left 5719 , m_send_barrier}); 5720 5721 TORRENT_ASSERT(amount_to_send > 0); 5722 5723 TORRENT_ASSERT(!(m_channel_state[upload_channel] & peer_info::bw_network)); 5724 #ifndef TORRENT_DISABLE_LOGGING 5725 peer_log(peer_log_alert::outgoing, "ASYNC_WRITE", "bytes: %d", amount_to_send); 5726 #endif 5727 auto const vec = m_send_buffer.build_iovec(amount_to_send); 5728 ADD_OUTSTANDING_ASYNC("peer_connection::on_send_data"); 5729 5730 #if TORRENT_USE_ASSERTS 5731 TORRENT_ASSERT(!m_socket_is_writing); 5732 m_socket_is_writing = true; 5733 #endif 5734 5735 auto conn = self(); 5736 m_socket->async_write_some(vec, make_handler( 5737 std::bind(&peer_connection::on_send_data, conn, _1, _2) 5738 , m_write_handler_storage, *this)); 5739 5740 m_channel_state[upload_channel] |= peer_info::bw_network; 5741 m_last_sent = aux::time_now(); 5742 } 5743 on_disk()5744 void peer_connection::on_disk() 5745 { 5746 TORRENT_ASSERT(is_single_thread()); 5747 if (!(m_channel_state[download_channel] & peer_info::bw_disk)) return; 5748 std::shared_ptr<peer_connection> me(self()); 5749 5750 #ifndef TORRENT_DISABLE_LOGGING 5751 peer_log(peer_log_alert::info, "DISK", "dropped below disk buffer watermark"); 5752 #endif 5753 m_counters.inc_stats_counter(counters::num_peers_down_disk, -1); 5754 m_channel_state[download_channel] &= ~peer_info::bw_disk; 5755 setup_receive(); 5756 } 5757 setup_receive()5758 void peer_connection::setup_receive() 5759 { 5760 TORRENT_ASSERT(is_single_thread()); 5761 INVARIANT_CHECK; 5762 5763 if (m_disconnecting) return; 5764 5765 if (m_recv_buffer.capacity() < 100 5766 && m_recv_buffer.max_receive() == 0) 5767 { 5768 m_recv_buffer.reserve(100); 5769 } 5770 5771 // we may want to request more quota at this point 5772 int const buffer_size = m_recv_buffer.max_receive(); 5773 request_bandwidth(download_channel, buffer_size); 5774 5775 if (m_channel_state[download_channel] & peer_info::bw_network) return; 5776 5777 if (m_quota[download_channel] == 0 5778 && !m_connecting) 5779 { 5780 return; 5781 } 5782 5783 if (!can_read()) 5784 { 5785 #ifndef TORRENT_DISABLE_LOGGING 5786 if (should_log(peer_log_alert::incoming)) 5787 { 5788 peer_log(peer_log_alert::incoming, "CANNOT_READ", "quota: %d " 5789 "can-write-to-disk: %s queue-limit: %d disconnecting: %s " 5790 " connecting: %s" 5791 , m_quota[download_channel] 5792 , ((m_channel_state[download_channel] & peer_info::bw_disk)?"no":"yes") 5793 , m_settings.get_int(settings_pack::max_queued_disk_bytes) 5794 , (m_disconnecting?"yes":"no") 5795 , (m_connecting?"yes":"no")); 5796 } 5797 #endif 5798 // if we block reading, waiting for the disk, we will wake up 5799 // by the disk_io_thread posting a message every time it drops 5800 // from being at or exceeding the limit down to below the limit 5801 return; 5802 } 5803 TORRENT_ASSERT(m_connected); 5804 if (m_quota[download_channel] == 0) return; 5805 5806 int const quota_left = m_quota[download_channel]; 5807 int const max_receive = std::min(buffer_size, quota_left); 5808 5809 if (max_receive == 0) return; 5810 5811 span<char> const vec = m_recv_buffer.reserve(max_receive); 5812 TORRENT_ASSERT(!(m_channel_state[download_channel] & peer_info::bw_network)); 5813 m_channel_state[download_channel] |= peer_info::bw_network; 5814 #ifndef TORRENT_DISABLE_LOGGING 5815 peer_log(peer_log_alert::incoming, "ASYNC_READ" 5816 , "max: %d bytes", max_receive); 5817 #endif 5818 5819 ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data"); 5820 auto conn = self(); 5821 m_socket->async_read_some( 5822 boost::asio::mutable_buffers_1(vec.data(), std::size_t(vec.size())), make_handler( 5823 std::bind(&peer_connection::on_receive_data, conn, _1, _2) 5824 , m_read_handler_storage, *this)); 5825 } 5826 downloading_piece_progress() const5827 piece_block_progress peer_connection::downloading_piece_progress() const 5828 { 5829 #ifndef TORRENT_DISABLE_LOGGING 5830 peer_log(peer_log_alert::info, "ERROR" 5831 , "downloading_piece_progress() dispatched to the base class!"); 5832 #endif 5833 return {}; 5834 } 5835 send_buffer(span<char const> buf)5836 void peer_connection::send_buffer(span<char const> buf) 5837 { 5838 TORRENT_ASSERT(is_single_thread()); 5839 5840 int const free_space = std::min( 5841 m_send_buffer.space_in_last_buffer(), int(buf.size())); 5842 if (free_space > 0) 5843 { 5844 char* dst = m_send_buffer.append(buf.first(free_space)); 5845 5846 // this should always succeed, because we checked how much space 5847 // there was up-front 5848 TORRENT_UNUSED(dst); 5849 TORRENT_ASSERT(dst != nullptr); 5850 buf = buf.subspan(free_space); 5851 } 5852 if (buf.empty()) return; 5853 5854 // allocate a buffer and initialize the beginning of it with 'buf' 5855 buffer snd_buf(std::max(int(buf.size()), 128), buf); 5856 m_send_buffer.append_buffer(std::move(snd_buf), int(buf.size())); 5857 5858 setup_send(); 5859 } 5860 5861 // -------------------------- 5862 // RECEIVE DATA 5863 // -------------------------- 5864 account_received_bytes(int const bytes_transferred)5865 void peer_connection::account_received_bytes(int const bytes_transferred) 5866 { 5867 // tell the receive buffer we just fed it this many bytes of incoming data 5868 TORRENT_ASSERT(bytes_transferred > 0); 5869 m_recv_buffer.received(bytes_transferred); 5870 5871 // update the dl quota 5872 TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]); 5873 m_quota[download_channel] -= bytes_transferred; 5874 5875 // account receiver buffer size stats to the session 5876 m_ses.received_buffer(bytes_transferred); 5877 5878 // estimate transport protocol overhead 5879 trancieve_ip_packet(bytes_transferred, is_v6(m_remote)); 5880 5881 #ifndef TORRENT_DISABLE_LOGGING 5882 peer_log(peer_log_alert::incoming, "READ" 5883 , "%d bytes", bytes_transferred); 5884 #endif 5885 } 5886 on_receive_data(const error_code & error,std::size_t bytes_transferred)5887 void peer_connection::on_receive_data(const error_code& error 5888 , std::size_t bytes_transferred) 5889 { 5890 TORRENT_ASSERT(is_single_thread()); 5891 COMPLETE_ASYNC("peer_connection::on_receive_data"); 5892 5893 #ifndef TORRENT_DISABLE_LOGGING 5894 if (should_log(peer_log_alert::incoming)) 5895 { 5896 peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA" 5897 , "bytes: %d %s" 5898 , int(bytes_transferred), print_error(error).c_str()); 5899 } 5900 #endif 5901 5902 // leave this bit set until we're done looping, reading from the socket. 5903 // that way we don't trigger any async read calls until the end of this 5904 // function. 5905 TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); 5906 5907 TORRENT_ASSERT(bytes_transferred > 0 || error); 5908 5909 m_counters.inc_stats_counter(counters::on_read_counter); 5910 5911 INVARIANT_CHECK; 5912 5913 if (error) 5914 { 5915 #ifndef TORRENT_DISABLE_LOGGING 5916 if (should_log(peer_log_alert::info)) 5917 { 5918 peer_log(peer_log_alert::info, "ERROR" 5919 , "in peer_connection::on_receive_data_impl %s" 5920 , print_error(error).c_str()); 5921 } 5922 #endif 5923 on_receive(error, bytes_transferred); 5924 disconnect(error, operation_t::sock_read); 5925 return; 5926 } 5927 5928 m_last_receive = aux::time_now(); 5929 5930 // submit all disk jobs later 5931 m_ses.deferred_submit_jobs(); 5932 5933 // keep ourselves alive in until this function exits in 5934 // case we disconnect 5935 // this needs to be created before the invariant check, 5936 // to keep the object alive through the exit check 5937 std::shared_ptr<peer_connection> me(self()); 5938 5939 TORRENT_ASSERT(bytes_transferred > 0); 5940 5941 // flush the send buffer at the end of this function 5942 cork _c(*this); 5943 5944 // if we received exactly as many bytes as we provided a receive buffer 5945 // for. There most likely are more bytes to read, and we should grow our 5946 // receive buffer. 5947 TORRENT_ASSERT(int(bytes_transferred) <= m_recv_buffer.max_receive()); 5948 bool const grow_buffer = (int(bytes_transferred) == m_recv_buffer.max_receive()); 5949 account_received_bytes(int(bytes_transferred)); 5950 5951 if (m_extension_outstanding_bytes > 0) 5952 m_extension_outstanding_bytes -= std::min(m_extension_outstanding_bytes, int(bytes_transferred)); 5953 5954 check_graceful_pause(); 5955 if (m_disconnecting) return; 5956 5957 // this is the case where we try to grow the receive buffer and try to 5958 // drain the socket 5959 if (grow_buffer) 5960 { 5961 error_code ec; 5962 int buffer_size = int(m_socket->available(ec)); 5963 if (ec) 5964 { 5965 disconnect(ec, operation_t::available); 5966 return; 5967 } 5968 5969 #ifndef TORRENT_DISABLE_LOGGING 5970 peer_log(peer_log_alert::incoming, "AVAILABLE" 5971 , "%d bytes", buffer_size); 5972 #endif 5973 5974 request_bandwidth(download_channel, buffer_size); 5975 5976 int const quota_left = m_quota[download_channel]; 5977 if (buffer_size > quota_left) buffer_size = quota_left; 5978 if (buffer_size > 0) 5979 { 5980 span<char> const vec = m_recv_buffer.reserve(buffer_size); 5981 std::size_t const bytes = m_socket->read_some( 5982 boost::asio::mutable_buffers_1(vec.data(), std::size_t(vec.size())), ec); 5983 5984 // this is weird. You would imagine read_some() would do this 5985 if (bytes == 0 && !ec) ec = boost::asio::error::eof; 5986 5987 #ifndef TORRENT_DISABLE_LOGGING 5988 if (should_log(peer_log_alert::incoming)) 5989 { 5990 peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s" 5991 , buffer_size, int(bytes), ec ? ec.message().c_str() : ""); 5992 } 5993 #endif 5994 5995 TORRENT_ASSERT(bytes > 0 || ec); 5996 if (ec) 5997 { 5998 if (ec != boost::asio::error::would_block 5999 && ec != boost::asio::error::try_again) 6000 { 6001 disconnect(ec, operation_t::sock_read); 6002 return; 6003 } 6004 } 6005 else 6006 { 6007 account_received_bytes(int(bytes)); 6008 bytes_transferred += bytes; 6009 } 6010 } 6011 } 6012 6013 // feed bytes in receive buffer to upper layer by calling on_receive() 6014 6015 bool const prev_choked = m_peer_choked; 6016 int bytes = int(bytes_transferred); 6017 int sub_transferred = 0; 6018 do { 6019 sub_transferred = m_recv_buffer.advance_pos(bytes); 6020 TORRENT_ASSERT(sub_transferred > 0); 6021 on_receive(error, std::size_t(sub_transferred)); 6022 bytes -= sub_transferred; 6023 if (m_disconnecting) return; 6024 } while (bytes > 0 && sub_transferred > 0); 6025 6026 // if the peer went from unchoked to choked, suggest to the receive 6027 // buffer that it shrinks to 100 bytes 6028 int const force_shrink = (m_peer_choked && !prev_choked) 6029 ? 100 : 0; 6030 m_recv_buffer.normalize(force_shrink); 6031 6032 if (m_recv_buffer.max_receive() == 0) 6033 { 6034 // the message we're receiving is larger than our receive 6035 // buffer, we must grow. 6036 int const buffer_size_limit 6037 = m_settings.get_int(settings_pack::max_peer_recv_buffer_size); 6038 m_recv_buffer.grow(buffer_size_limit); 6039 #ifndef TORRENT_DISABLE_LOGGING 6040 peer_log(peer_log_alert::incoming, "GROW_BUFFER", "%d bytes" 6041 , m_recv_buffer.capacity()); 6042 #endif 6043 } 6044 6045 TORRENT_ASSERT(m_recv_buffer.pos_at_end()); 6046 TORRENT_ASSERT(m_recv_buffer.packet_size() > 0); 6047 6048 if (is_seed()) 6049 { 6050 std::shared_ptr<torrent> t = m_torrent.lock(); 6051 if (t) t->seen_complete(); 6052 } 6053 6054 // allow reading from the socket again 6055 TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network); 6056 m_channel_state[download_channel] &= ~peer_info::bw_network; 6057 6058 setup_receive(); 6059 } 6060 can_write() const6061 bool peer_connection::can_write() const 6062 { 6063 TORRENT_ASSERT(is_single_thread()); 6064 // if we have requests or pending data to be sent or announcements to be made 6065 // we want to send data 6066 return !m_send_buffer.empty() 6067 && m_quota[upload_channel] > 0 6068 && (m_send_barrier > 0) 6069 && !m_connecting; 6070 } 6071 can_read()6072 bool peer_connection::can_read() 6073 { 6074 TORRENT_ASSERT(is_single_thread()); 6075 INVARIANT_CHECK; 6076 6077 std::shared_ptr<torrent> t = m_torrent.lock(); 6078 6079 bool bw_limit = m_quota[download_channel] > 0; 6080 6081 if (!bw_limit) return false; 6082 6083 if (m_outstanding_bytes > 0) 6084 { 6085 // if we're expecting to download piece data, we might not 6086 // want to read from the socket in case we're out of disk 6087 // cache space right now 6088 6089 if (m_channel_state[download_channel] & peer_info::bw_disk) return false; 6090 } 6091 6092 return !m_connecting && !m_disconnecting; 6093 } 6094 on_connection_complete(error_code const & e)6095 void peer_connection::on_connection_complete(error_code const& e) 6096 { 6097 TORRENT_ASSERT(is_single_thread()); 6098 COMPLETE_ASYNC("peer_connection::on_connection_complete"); 6099 6100 INVARIANT_CHECK; 6101 6102 // if t is nullptr, we better not be connecting, since 6103 // we can't decrement the connecting counter 6104 std::shared_ptr<torrent> t = m_torrent.lock(); 6105 TORRENT_ASSERT(t || !m_connecting); 6106 if (m_connecting) 6107 { 6108 m_counters.inc_stats_counter(counters::num_peers_half_open, -1); 6109 if (t) t->dec_num_connecting(m_peer_info); 6110 m_connecting = false; 6111 } 6112 6113 if (m_disconnecting) return; 6114 6115 if (e) 6116 { 6117 connect_failed(e); 6118 return; 6119 } 6120 6121 TORRENT_ASSERT(!m_connected); 6122 m_connected = true; 6123 m_counters.inc_stats_counter(counters::num_peers_connected); 6124 6125 if (m_disconnecting) return; 6126 m_last_receive = aux::time_now(); 6127 6128 error_code ec; 6129 m_local = m_socket->local_endpoint(ec); 6130 if (ec) 6131 { 6132 disconnect(ec, operation_t::getname); 6133 return; 6134 } 6135 6136 // if there are outgoing interfaces specified, verify this 6137 // peer is correctly bound to one of them 6138 if (!m_settings.get_str(settings_pack::outgoing_interfaces).empty()) 6139 { 6140 if (!m_ses.verify_bound_address(m_local.address() 6141 , is_utp(*m_socket), ec)) 6142 { 6143 if (ec) 6144 { 6145 disconnect(ec, operation_t::get_interface); 6146 return; 6147 } 6148 disconnect(error_code( 6149 boost::system::errc::no_such_device, generic_category()) 6150 , operation_t::connect); 6151 return; 6152 } 6153 } 6154 6155 if (is_utp(*m_socket) && m_peer_info) 6156 { 6157 m_peer_info->confirmed_supports_utp = true; 6158 m_peer_info->supports_utp = false; 6159 } 6160 6161 // this means the connection just succeeded 6162 6163 received_synack(is_v6(m_remote)); 6164 6165 TORRENT_ASSERT(m_socket); 6166 #ifndef TORRENT_DISABLE_LOGGING 6167 if (should_log(peer_log_alert::outgoing)) 6168 { 6169 peer_log(peer_log_alert::outgoing, "COMPLETED" 6170 , "ep: %s", print_endpoint(m_remote).c_str()); 6171 } 6172 #endif 6173 6174 // set the socket to non-blocking, so that we can 6175 // read the entire buffer on each read event we get 6176 #ifndef TORRENT_DISABLE_LOGGING 6177 peer_log(peer_log_alert::info, "SET_NON_BLOCKING"); 6178 #endif 6179 m_socket->non_blocking(true, ec); 6180 if (ec) 6181 { 6182 disconnect(ec, operation_t::iocontrol); 6183 return; 6184 } 6185 6186 if (m_remote == m_socket->local_endpoint(ec)) 6187 { 6188 disconnect(errors::self_connection, operation_t::bittorrent, failure); 6189 return; 6190 } 6191 6192 if (is_v4(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0) 6193 { 6194 error_code err; 6195 m_socket->set_option(type_of_service(char(m_settings.get_int(settings_pack::peer_tos))), err); 6196 #ifndef TORRENT_DISABLE_LOGGING 6197 if (should_log(peer_log_alert::outgoing)) 6198 { 6199 peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s" 6200 , m_settings.get_int(settings_pack::peer_tos), err.message().c_str()); 6201 } 6202 #endif 6203 } 6204 #if defined IPV6_TCLASS 6205 else if (is_v6(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0) 6206 { 6207 error_code err; 6208 m_socket->set_option(traffic_class(char(m_settings.get_int(settings_pack::peer_tos))), err); 6209 #ifndef TORRENT_DISABLE_LOGGING 6210 if (should_log(peer_log_alert::outgoing)) 6211 { 6212 peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s" 6213 , m_settings.get_int(settings_pack::peer_tos), err.message().c_str()); 6214 } 6215 #endif 6216 } 6217 #endif 6218 6219 #ifndef TORRENT_DISABLE_EXTENSIONS 6220 for (auto const& ext : m_extensions) 6221 { 6222 ext->on_connected(); 6223 } 6224 #endif 6225 6226 on_connected(); 6227 setup_send(); 6228 setup_receive(); 6229 } 6230 6231 // -------------------------- 6232 // SEND DATA 6233 // -------------------------- 6234 on_send_data(error_code const & error,std::size_t const bytes_transferred)6235 void peer_connection::on_send_data(error_code const& error 6236 , std::size_t const bytes_transferred) 6237 { 6238 TORRENT_ASSERT(is_single_thread()); 6239 m_counters.inc_stats_counter(counters::on_write_counter); 6240 m_ses.sent_buffer(int(bytes_transferred)); 6241 6242 #if TORRENT_USE_ASSERTS 6243 TORRENT_ASSERT(m_socket_is_writing); 6244 m_socket_is_writing = false; 6245 #endif 6246 6247 // submit all disk jobs when we've processed all messages 6248 // in the current message queue 6249 m_ses.deferred_submit_jobs(); 6250 6251 #ifndef TORRENT_DISABLE_LOGGING 6252 if (should_log(peer_log_alert::info)) 6253 { 6254 peer_log(peer_log_alert::info, "ON_SEND_DATA", "bytes: %d %s" 6255 , int(bytes_transferred), print_error(error).c_str()); 6256 } 6257 #endif 6258 6259 INVARIANT_CHECK; 6260 6261 COMPLETE_ASYNC("peer_connection::on_send_data"); 6262 // keep ourselves alive in until this function exits in 6263 // case we disconnect 6264 std::shared_ptr<peer_connection> me(self()); 6265 6266 TORRENT_ASSERT(m_channel_state[upload_channel] & peer_info::bw_network); 6267 6268 m_send_buffer.pop_front(int(bytes_transferred)); 6269 6270 time_point const now = clock_type::now(); 6271 6272 for (auto& block : m_download_queue) 6273 { 6274 if (block.send_buffer_offset == pending_block::not_in_buffer) 6275 continue; 6276 if (block.send_buffer_offset < int(bytes_transferred)) 6277 block.send_buffer_offset = pending_block::not_in_buffer; 6278 else 6279 block.send_buffer_offset -= int(bytes_transferred); 6280 } 6281 6282 m_channel_state[upload_channel] &= ~peer_info::bw_network; 6283 6284 TORRENT_ASSERT(int(bytes_transferred) <= m_quota[upload_channel]); 6285 m_quota[upload_channel] -= int(bytes_transferred); 6286 6287 trancieve_ip_packet(int(bytes_transferred), is_v6(m_remote)); 6288 6289 if (m_send_barrier != INT_MAX) 6290 m_send_barrier -= int(bytes_transferred); 6291 6292 #ifndef TORRENT_DISABLE_LOGGING 6293 peer_log(peer_log_alert::outgoing, "WROTE" 6294 , "%d bytes", int(bytes_transferred)); 6295 #endif 6296 6297 if (error) 6298 { 6299 #ifndef TORRENT_DISABLE_LOGGING 6300 if (should_log(peer_log_alert::info)) 6301 { 6302 peer_log(peer_log_alert::info, "ERROR" 6303 , "%s in peer_connection::on_send_data", error.message().c_str()); 6304 } 6305 #endif 6306 disconnect(error, operation_t::sock_write); 6307 return; 6308 } 6309 if (m_disconnecting) 6310 { 6311 // make sure we free up all send buffers that are owned 6312 // by the disk thread 6313 m_send_buffer.clear(); 6314 return; 6315 } 6316 6317 TORRENT_ASSERT(!m_connecting); 6318 TORRENT_ASSERT(bytes_transferred > 0); 6319 6320 m_last_sent = now; 6321 6322 #if TORRENT_USE_ASSERTS 6323 std::int64_t const cur_payload_ul = m_statistics.last_payload_uploaded(); 6324 std::int64_t const cur_protocol_ul = m_statistics.last_protocol_uploaded(); 6325 #endif 6326 on_sent(error, bytes_transferred); 6327 #if TORRENT_USE_ASSERTS 6328 TORRENT_ASSERT(m_statistics.last_payload_uploaded() - cur_payload_ul >= 0); 6329 TORRENT_ASSERT(m_statistics.last_protocol_uploaded() - cur_protocol_ul >= 0); 6330 std::int64_t stats_diff = m_statistics.last_payload_uploaded() - cur_payload_ul 6331 + m_statistics.last_protocol_uploaded() - cur_protocol_ul; 6332 TORRENT_ASSERT(stats_diff == int(bytes_transferred)); 6333 #endif 6334 6335 fill_send_buffer(); 6336 6337 setup_send(); 6338 } 6339 6340 #if TORRENT_USE_INVARIANT_CHECKS 6341 struct peer_count_t 6342 { peer_count_tlibtorrent::peer_count_t6343 peer_count_t(): num_peers(0), num_peers_with_timeouts(0), num_peers_with_nowant(0), num_not_requested(0) {} 6344 int num_peers; 6345 int num_peers_with_timeouts; 6346 int num_peers_with_nowant; 6347 int num_not_requested; 6348 // std::vector<peer_connection const*> peers; 6349 }; 6350 check_invariant() const6351 void peer_connection::check_invariant() const 6352 { 6353 TORRENT_ASSERT(is_single_thread()); 6354 TORRENT_ASSERT(m_in_use == 1337); 6355 TORRENT_ASSERT(m_queued_time_critical <= int(m_request_queue.size())); 6356 TORRENT_ASSERT(m_accept_fast.size() == m_accept_fast_piece_cnt.size()); 6357 6358 m_recv_buffer.check_invariant(); 6359 6360 for (int i = 0; i < 2; ++i) 6361 { 6362 if (m_channel_state[i] & peer_info::bw_limit) 6363 { 6364 // if we're waiting for bandwidth, we should be in the 6365 // bandwidth manager's queue 6366 TORRENT_ASSERT(m_ses.get_bandwidth_manager(i)->is_queued(this)); 6367 } 6368 } 6369 6370 std::shared_ptr<torrent> t = m_torrent.lock(); 6371 6372 #if TORRENT_USE_INVARIANT_CHECKS \ 6373 && !defined TORRENT_NO_EXPENSIVE_INVARIANT_CHECK 6374 if (t && t->has_picker() && !m_disconnecting) 6375 t->picker().check_peer_invariant(m_have_piece, peer_info_struct()); 6376 #endif 6377 6378 if (!m_disconnect_started && m_initialized) 6379 { 6380 // none of this matters if we're disconnecting anyway 6381 if (t->is_finished()) 6382 TORRENT_ASSERT(!is_interesting() || m_need_interest_update); 6383 if (is_seed()) 6384 TORRENT_ASSERT(upload_only()); 6385 } 6386 6387 if (m_disconnecting) 6388 { 6389 TORRENT_ASSERT(m_download_queue.empty()); 6390 TORRENT_ASSERT(m_request_queue.empty()); 6391 TORRENT_ASSERT(m_disconnect_started); 6392 } 6393 6394 TORRENT_ASSERT(m_outstanding_bytes >= 0); 6395 if (t && t->valid_metadata() && !m_disconnecting) 6396 { 6397 torrent_info const& ti = t->torrent_file(); 6398 // if the piece is fully downloaded, we might have popped it from the 6399 // download queue already 6400 int outstanding_bytes = 0; 6401 // bool in_download_queue = false; 6402 int const bs = t->block_size(); 6403 piece_block last_block(ti.last_piece() 6404 , (ti.piece_size(ti.last_piece()) + bs - 1) / bs); 6405 for (std::vector<pending_block>::const_iterator i = m_download_queue.begin() 6406 , end(m_download_queue.end()); i != end; ++i) 6407 { 6408 TORRENT_ASSERT(i->block.piece_index <= last_block.piece_index); 6409 TORRENT_ASSERT(i->block.piece_index < last_block.piece_index 6410 || i->block.block_index <= last_block.block_index); 6411 if (m_received_in_piece && i == m_download_queue.begin()) 6412 { 6413 // in_download_queue = true; 6414 // this assert is not correct since block may have different sizes 6415 // and may not be returned in the order they were requested 6416 // TORRENT_ASSERT(t->to_req(i->block).length >= m_received_in_piece); 6417 outstanding_bytes += t->to_req(i->block).length - m_received_in_piece; 6418 } 6419 else 6420 { 6421 outstanding_bytes += t->to_req(i->block).length; 6422 } 6423 } 6424 //if (p && p->bytes_downloaded < p->full_block_bytes) TORRENT_ASSERT(in_download_queue); 6425 6426 if (m_outstanding_bytes != outstanding_bytes) 6427 { 6428 std::fprintf(stderr, "m_outstanding_bytes = %d\noutstanding_bytes = %d\n" 6429 , m_outstanding_bytes, outstanding_bytes); 6430 } 6431 6432 TORRENT_ASSERT(m_outstanding_bytes == outstanding_bytes); 6433 } 6434 6435 std::set<piece_block> unique; 6436 std::transform(m_download_queue.begin(), m_download_queue.end() 6437 , std::inserter(unique, unique.begin()), std::bind(&pending_block::block, _1)); 6438 std::transform(m_request_queue.begin(), m_request_queue.end() 6439 , std::inserter(unique, unique.begin()), std::bind(&pending_block::block, _1)); 6440 TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size()); 6441 if (m_peer_info) 6442 { 6443 TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0); 6444 TORRENT_ASSERT(m_peer_info->prev_amount_download == 0); 6445 TORRENT_ASSERT(m_peer_info->connection == this 6446 || m_peer_info->connection == nullptr); 6447 6448 if (m_peer_info->optimistically_unchoked) 6449 TORRENT_ASSERT(!is_choked()); 6450 } 6451 6452 TORRENT_ASSERT(m_have_piece.count() == m_num_pieces); 6453 6454 if (!t) 6455 { 6456 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS 6457 // since this connection doesn't have a torrent reference 6458 // no torrent should have a reference to this connection either 6459 TORRENT_ASSERT(!m_ses.any_torrent_has_peer(this)); 6460 #endif 6461 return; 6462 } 6463 6464 if (t->ready_for_connections() && m_initialized) 6465 TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size())); 6466 6467 // in share mode we don't close redundant connections 6468 if (m_settings.get_bool(settings_pack::close_redundant_connections) 6469 #ifndef TORRENT_DISABLE_SHARE_MODE 6470 && !t->share_mode() 6471 #endif 6472 ) 6473 { 6474 bool const ok_to_disconnect = 6475 can_disconnect(errors::upload_upload_connection) 6476 || can_disconnect(errors::uninteresting_upload_peer) 6477 || can_disconnect(errors::too_many_requests_when_choked) 6478 || can_disconnect(errors::timed_out_no_interest) 6479 || can_disconnect(errors::timed_out_no_request) 6480 || can_disconnect(errors::timed_out_inactivity); 6481 6482 // make sure upload only peers are disconnected 6483 if (t->is_upload_only() 6484 && m_upload_only 6485 && !m_need_interest_update 6486 && t->valid_metadata() 6487 && has_metadata() 6488 && ok_to_disconnect) 6489 TORRENT_ASSERT(m_disconnect_started || t->graceful_pause() || t->has_error()); 6490 6491 if (m_upload_only 6492 && !m_interesting 6493 && !m_need_interest_update 6494 && m_bitfield_received 6495 && t->are_files_checked() 6496 && t->valid_metadata() 6497 && has_metadata() 6498 && ok_to_disconnect) 6499 TORRENT_ASSERT(m_disconnect_started); 6500 } 6501 6502 if (!m_disconnect_started && m_initialized 6503 && m_settings.get_bool(settings_pack::close_redundant_connections)) 6504 { 6505 // none of this matters if we're disconnecting anyway 6506 if (t->is_upload_only() && !m_need_interest_update) 6507 TORRENT_ASSERT(!m_interesting || t->graceful_pause() || t->has_error()); 6508 if (is_seed()) 6509 TORRENT_ASSERT(m_upload_only); 6510 } 6511 6512 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS 6513 if (t->has_picker()) 6514 { 6515 std::map<piece_block, peer_count_t> num_requests; 6516 for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i) 6517 { 6518 // make sure this peer is not a dangling pointer 6519 TORRENT_ASSERT(m_ses.has_peer(*i)); 6520 peer_connection const& p = *(*i); 6521 for (std::vector<pending_block>::const_iterator j = p.request_queue().begin() 6522 , end(p.request_queue().end()); j != end; ++j) 6523 { 6524 ++num_requests[j->block].num_peers; 6525 ++num_requests[j->block].num_peers_with_timeouts; 6526 ++num_requests[j->block].num_peers_with_nowant; 6527 ++num_requests[j->block].num_not_requested; 6528 // num_requests[j->block].peers.push_back(&p); 6529 } 6530 for (std::vector<pending_block>::const_iterator j = p.download_queue().begin() 6531 , end(p.download_queue().end()); j != end; ++j) 6532 { 6533 if (!j->not_wanted && !j->timed_out) ++num_requests[j->block].num_peers; 6534 if (j->timed_out) ++num_requests[j->block].num_peers_with_timeouts; 6535 if (j->not_wanted) ++num_requests[j->block].num_peers_with_nowant; 6536 // num_requests[j->block].peers.push_back(&p); 6537 } 6538 } 6539 for (std::map<piece_block, peer_count_t>::iterator j = num_requests.begin() 6540 , end(num_requests.end()); j != end; ++j) 6541 { 6542 piece_block b = j->first; 6543 peer_count_t const& pc = j->second; 6544 int count = pc.num_peers; 6545 int count_with_timeouts = pc.num_peers_with_timeouts; 6546 int count_with_nowant = pc.num_peers_with_nowant; 6547 (void)count_with_timeouts; 6548 (void)count_with_nowant; 6549 int picker_count = t->picker().num_peers(b); 6550 if (!t->picker().is_downloaded(b)) 6551 TORRENT_ASSERT(picker_count == count); 6552 } 6553 } 6554 #endif 6555 /* 6556 if (t->has_picker() && !t->is_aborted()) 6557 { 6558 for (std::vector<pending_block>::const_iterator i = m_download_queue.begin() 6559 , end(m_download_queue.end()); i != end; ++i) 6560 { 6561 pending_block const& pb = *i; 6562 if (pb.timed_out || pb.not_wanted) continue; 6563 TORRENT_ASSERT(t->picker().get_block_state(pb.block) != piece_picker::block_info::state_none); 6564 TORRENT_ASSERT(complete); 6565 } 6566 } 6567 */ 6568 // extremely expensive invariant check 6569 /* 6570 if (!t->is_seed()) 6571 { 6572 piece_picker& p = t->picker(); 6573 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue(); 6574 const int blocks_per_piece = static_cast<int>( 6575 t->torrent_file().piece_length() / t->block_size()); 6576 6577 for (std::vector<piece_picker::downloading_piece>::const_iterator i = 6578 dlq.begin(); i != dlq.end(); ++i) 6579 { 6580 for (int j = 0; j < blocks_per_piece; ++j) 6581 { 6582 if (std::find(m_request_queue.begin(), m_request_queue.end() 6583 , piece_block(i->index, j)) != m_request_queue.end() 6584 || 6585 std::find(m_download_queue.begin(), m_download_queue.end() 6586 , piece_block(i->index, j)) != m_download_queue.end()) 6587 { 6588 TORRENT_ASSERT(i->info[j].peer == m_remote); 6589 } 6590 else 6591 { 6592 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished); 6593 } 6594 } 6595 } 6596 } 6597 */ 6598 } 6599 #endif 6600 set_holepunch_mode()6601 void peer_connection::set_holepunch_mode() 6602 { 6603 m_holepunch_mode = true; 6604 #ifndef TORRENT_DISABLE_LOGGING 6605 peer_log(peer_log_alert::info, "HOLEPUNCH_MODE", "[ on ]"); 6606 #endif 6607 } 6608 keep_alive()6609 void peer_connection::keep_alive() 6610 { 6611 TORRENT_ASSERT(is_single_thread()); 6612 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS 6613 INVARIANT_CHECK; 6614 #endif 6615 6616 time_duration const d = aux::time_now() - m_last_sent; 6617 if (total_seconds(d) < timeout() / 2) return; 6618 6619 if (m_connecting) return; 6620 if (in_handshake()) return; 6621 6622 // if the last send has not completed yet, do not send a keep 6623 // alive 6624 if (m_channel_state[upload_channel] & peer_info::bw_network) return; 6625 6626 #ifndef TORRENT_DISABLE_LOGGING 6627 peer_log(peer_log_alert::outgoing_message, "KEEPALIVE"); 6628 #endif 6629 6630 write_keepalive(); 6631 } 6632 is_seed() const6633 bool peer_connection::is_seed() const 6634 { 6635 TORRENT_ASSERT(is_single_thread()); 6636 6637 // if m_num_pieces == 0, we probably don't have the 6638 // metadata yet. 6639 std::shared_ptr<torrent> t = m_torrent.lock(); 6640 return m_num_pieces == m_have_piece.size() 6641 && m_num_pieces > 0 && t && t->valid_metadata(); 6642 } 6643 6644 #ifndef TORRENT_DISABLE_SHARE_MODE set_share_mode(bool u)6645 void peer_connection::set_share_mode(bool u) 6646 { 6647 TORRENT_ASSERT(is_single_thread()); 6648 // if the peer is a seed, ignore share mode messages 6649 if (is_seed()) return; 6650 6651 m_share_mode = u; 6652 } 6653 #endif 6654 set_upload_only(bool u)6655 void peer_connection::set_upload_only(bool u) 6656 { 6657 TORRENT_ASSERT(is_single_thread()); 6658 // if the peer is a seed, don't allow setting 6659 // upload_only to false 6660 if (m_upload_only && is_seed()) return; 6661 6662 m_upload_only = u; 6663 disconnect_if_redundant(); 6664 } 6665 6666 } 6667