1 /*
2 
3 Copyright (c) 2003-2018, Arvid Norberg
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include <vector>
34 #include <functional>
35 #include <cstdint>
36 
37 #include "libtorrent/config.hpp"
38 #include "libtorrent/peer_connection.hpp"
39 #include "libtorrent/entry.hpp"
40 #include "libtorrent/bencode.hpp"
41 #include "libtorrent/alert_types.hpp"
42 #include "libtorrent/invariant_check.hpp"
43 #include "libtorrent/io.hpp"
44 #include "libtorrent/extensions.hpp"
45 #include "libtorrent/aux_/session_interface.hpp"
46 #include "libtorrent/peer_list.hpp"
47 #include "libtorrent/aux_/socket_type.hpp"
48 #include "libtorrent/hasher.hpp"
49 #include "libtorrent/assert.hpp"
50 #include "libtorrent/broadcast_socket.hpp"
51 #include "libtorrent/torrent.hpp"
52 #include "libtorrent/peer_info.hpp"
53 #include "libtorrent/bt_peer_connection.hpp"
54 #include "libtorrent/error.hpp"
55 #include "libtorrent/aux_/alloca.hpp"
56 #include "libtorrent/disk_interface.hpp"
57 #include "libtorrent/bandwidth_manager.hpp"
58 #include "libtorrent/request_blocks.hpp" // for request_a_block
59 #include "libtorrent/performance_counters.hpp" // for counters
60 #include "libtorrent/alert_manager.hpp" // for alert_manager
61 #include "libtorrent/ip_filter.hpp"
62 #include "libtorrent/ip_voter.hpp"
63 #include "libtorrent/kademlia/node_id.hpp"
64 #include "libtorrent/close_reason.hpp"
65 #include "libtorrent/aux_/has_block.hpp"
66 #include "libtorrent/aux_/time.hpp"
67 #include "libtorrent/buffer.hpp"
68 #include "libtorrent/aux_/array.hpp"
69 #include "libtorrent/aux_/set_socket_buffer.hpp"
70 
71 #if TORRENT_USE_ASSERTS
72 #include <set>
73 #endif
74 
75 #ifdef TORRENT_USE_OPENSSL
76 #include <openssl/rand.h>
77 #endif
78 
79 #ifndef TORRENT_DISABLE_LOGGING
80 #include <cstdarg> // for va_start, va_end
81 #include <cstdio> // for vsnprintf
82 #include "libtorrent/socket_io.hpp"
83 #include "libtorrent/hex.hpp" // to_hex
84 #endif
85 
86 #include "libtorrent/aux_/torrent_impl.hpp"
87 
88 //#define TORRENT_CORRUPT_DATA
89 
90 using namespace std::placeholders;
91 
92 namespace libtorrent {
93 
94 	constexpr request_flags_t peer_connection::time_critical;
95 	constexpr request_flags_t peer_connection::busy;
96 
97 	namespace {
98 
99 	// the limits of the download queue size
100 	constexpr int min_request_queue = 2;
101 
pending_block_in_buffer(pending_block const & pb)102 	bool pending_block_in_buffer(pending_block const& pb)
103 	{
104 		return pb.send_buffer_offset != pending_block::not_in_buffer;
105 	}
106 
107 	}
108 
109 	constexpr piece_index_t piece_block_progress::invalid_index;
110 
111 	constexpr disconnect_severity_t peer_connection_interface::normal;
112 	constexpr disconnect_severity_t peer_connection_interface::failure;
113 	constexpr disconnect_severity_t peer_connection_interface::peer_error;
114 
115 #if TORRENT_USE_ASSERTS
is_single_thread() const116 	bool peer_connection::is_single_thread() const
117 	{
118 #ifdef TORRENT_USE_INVARIANT_CHECKS
119 		std::shared_ptr<torrent> t = m_torrent.lock();
120 		if (!t) return true;
121 		return t->is_single_thread();
122 #else
123 		return true;
124 #endif
125 	}
126 #endif
127 
peer_connection(peer_connection_args const & pack)128 	peer_connection::peer_connection(peer_connection_args const& pack)
129 		: peer_connection_hot_members(pack.tor, *pack.ses, *pack.sett)
130 		, m_socket(pack.s)
131 		, m_peer_info(pack.peerinfo)
132 		, m_counters(*pack.stats_counters)
133 		, m_num_pieces(0)
134 		, m_max_out_request_queue(m_settings.get_int(settings_pack::max_out_request_queue))
135 		, m_remote(pack.endp)
136 		, m_disk_thread(*pack.disk_thread)
137 		, m_ios(*pack.ios)
138 		, m_work(m_ios)
139 		, m_outstanding_piece_verification(0)
140 		, m_outgoing(!pack.tor.expired())
141 		, m_received_listen_port(false)
142 		, m_fast_reconnect(false)
143 		, m_failed(false)
144 		, m_connected(pack.tor.expired())
145 		, m_request_large_blocks(false)
146 #ifndef TORRENT_DISABLE_SHARE_MODE
147 		, m_share_mode(false)
148 #endif
149 		, m_upload_only(false)
150 		, m_bitfield_received(false)
151 		, m_no_download(false)
152 		, m_holepunch_mode(false)
153 		, m_peer_choked(true)
154 		, m_have_all(false)
155 		, m_peer_interested(false)
156 		, m_need_interest_update(false)
157 		, m_has_metadata(true)
158 		, m_exceeded_limit(false)
159 		, m_slow_start(true)
160 	{
161 		m_counters.inc_stats_counter(counters::num_tcp_peers + m_socket->type() - 1);
162 		std::shared_ptr<torrent> t = m_torrent.lock();
163 
164 		if (m_connected)
165 			m_counters.inc_stats_counter(counters::num_peers_connected);
166 		else if (m_connecting)
167 			m_counters.inc_stats_counter(counters::num_peers_half_open);
168 
169 		// if t is nullptr, we better not be connecting, since
170 		// we can't decrement the connecting counter
171 		TORRENT_ASSERT(t || !m_connecting);
172 #if TORRENT_ABI_VERSION == 1
173 		m_est_reciprocation_rate = m_settings.get_int(settings_pack::default_est_reciprocation_rate);
174 #endif
175 
176 		m_channel_state[upload_channel] = peer_info::bw_idle;
177 		m_channel_state[download_channel] = peer_info::bw_idle;
178 
179 		m_quota[0] = 0;
180 		m_quota[1] = 0;
181 
182 		TORRENT_ASSERT(pack.peerinfo == nullptr || pack.peerinfo->banned == false);
183 #ifndef TORRENT_DISABLE_LOGGING
184 		if (should_log(m_outgoing ? peer_log_alert::outgoing : peer_log_alert::incoming))
185 		{
186 			error_code ec;
187 			TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec);
188 			tcp::endpoint local_ep = m_socket->local_endpoint(ec);
189 
190 			peer_log(m_outgoing ? peer_log_alert::outgoing : peer_log_alert::incoming
191 				, m_outgoing ? "OUTGOING_CONNECTION" : "INCOMING_CONNECTION"
192 				, "ep: %s type: %s seed: %d p: %p local: %s"
193 				, print_endpoint(m_remote).c_str()
194 				, m_socket->type_name()
195 				, m_peer_info ? m_peer_info->seed : 0
196 				, static_cast<void*>(m_peer_info)
197 				, print_endpoint(local_ep).c_str());
198 		}
199 #endif
200 
201 		// this counter should not be incremented until we know constructing this
202 		// peer object can't fail anymore
203 		if (m_connecting && t) t->inc_num_connecting(m_peer_info);
204 
205 #if TORRENT_USE_ASSERTS
206 		piece_failed = false;
207 		m_in_constructor = false;
208 #endif
209 	}
210 
211 	template <typename Fun, typename... Args>
wrap(Fun f,Args &&...a)212 	void peer_connection::wrap(Fun f, Args&&... a)
213 #ifndef BOOST_NO_EXCEPTIONS
214 		try
215 #endif
216 	{
217 		(this->*f)(std::forward<Args>(a)...);
218 	}
219 #ifndef BOOST_NO_EXCEPTIONS
220 	catch (std::bad_alloc const&) {
221 #ifndef TORRENT_DISABLE_LOGGING
222 		peer_log(peer_log_alert::info, "EXCEPTION", "bad_alloc");
223 #endif
224 		disconnect(make_error_code(boost::system::errc::not_enough_memory)
225 			, operation_t::unknown);
226 	}
227 	catch (system_error const& e) {
228 #ifndef TORRENT_DISABLE_LOGGING
229 		peer_log(peer_log_alert::info, "EXCEPTION", "(%d %s) %s"
230 			, e.code().value()
231 			, e.code().message().c_str()
232 			, e.what());
233 #endif
234 		disconnect(e.code(), operation_t::unknown);
235 	}
236 	catch (std::exception const& e) {
237 		TORRENT_UNUSED(e);
238 #ifndef TORRENT_DISABLE_LOGGING
239 		peer_log(peer_log_alert::info, "EXCEPTION", "%s", e.what());
240 #endif
241 		disconnect(make_error_code(boost::system::errc::not_enough_memory)
242 			, operation_t::sock_write);
243 	}
244 #endif // BOOST_NO_EXCEPTIONS
245 
timeout() const246 	int peer_connection::timeout() const
247 	{
248 		TORRENT_ASSERT(is_single_thread());
249 		int ret = m_settings.get_int(settings_pack::peer_timeout);
250 #if TORRENT_USE_I2P
251 		if (m_peer_info && m_peer_info->is_i2p_addr)
252 		{
253 			// quadruple the timeout for i2p peers
254 			ret *= 4;
255 		}
256 #endif
257 		return ret;
258 	}
259 
on_exception(std::exception const & e)260 	void peer_connection::on_exception(std::exception const& e)
261 	{
262 		TORRENT_UNUSED(e);
263 #ifndef TORRENT_DISABLE_LOGGING
264 		peer_log(peer_log_alert::info, "PEER_ERROR", "ERROR: %s"
265 			, e.what());
266 #endif
267 		disconnect(error_code(), operation_t::unknown, peer_error);
268 	}
269 
on_error(error_code const & ec)270 	void peer_connection::on_error(error_code const& ec)
271 	{
272 		disconnect(ec, operation_t::unknown, peer_error);
273 	}
274 
275 #if TORRENT_ABI_VERSION == 1
increase_est_reciprocation_rate()276 	void peer_connection::increase_est_reciprocation_rate()
277 	{
278 		TORRENT_ASSERT(is_single_thread());
279 		m_est_reciprocation_rate += m_est_reciprocation_rate
280 			* m_settings.get_int(settings_pack::increase_est_reciprocation_rate) / 100;
281 	}
282 
decrease_est_reciprocation_rate()283 	void peer_connection::decrease_est_reciprocation_rate()
284 	{
285 		TORRENT_ASSERT(is_single_thread());
286 		m_est_reciprocation_rate -= m_est_reciprocation_rate
287 			* m_settings.get_int(settings_pack::decrease_est_reciprocation_rate) / 100;
288 	}
289 #endif
290 
get_priority(int const channel) const291 	int peer_connection::get_priority(int const channel) const
292 	{
293 		TORRENT_ASSERT(is_single_thread());
294 		TORRENT_ASSERT(channel >= 0 && channel < 2);
295 		int prio = 1;
296 		for (int i = 0; i < num_classes(); ++i)
297 		{
298 			int class_prio = m_ses.peer_classes().at(class_at(i))->priority[channel];
299 			if (prio < class_prio) prio = class_prio;
300 		}
301 
302 		std::shared_ptr<torrent> t = associated_torrent().lock();
303 
304 		if (t)
305 		{
306 			for (int i = 0; i < t->num_classes(); ++i)
307 			{
308 				int class_prio = m_ses.peer_classes().at(t->class_at(i))->priority[channel];
309 				if (prio < class_prio) prio = class_prio;
310 			}
311 		}
312 		return prio;
313 	}
314 
reset_choke_counters()315 	void peer_connection::reset_choke_counters()
316 	{
317 		TORRENT_ASSERT(is_single_thread());
318 		m_downloaded_at_last_round= m_statistics.total_payload_download();
319 		m_uploaded_at_last_round = m_statistics.total_payload_upload();
320 	}
321 
start()322 	void peer_connection::start()
323 	{
324 		TORRENT_ASSERT(is_single_thread());
325 		TORRENT_ASSERT(m_peer_info == nullptr || m_peer_info->connection == this);
326 		std::shared_ptr<torrent> t = m_torrent.lock();
327 
328 		if (!m_outgoing)
329 		{
330 			error_code ec;
331 			m_socket->non_blocking(true, ec);
332 			if (ec)
333 			{
334 				disconnect(ec, operation_t::iocontrol);
335 				return;
336 			}
337 			m_remote = m_socket->remote_endpoint(ec);
338 			if (ec)
339 			{
340 				disconnect(ec, operation_t::getpeername);
341 				return;
342 			}
343 			m_local = m_socket->local_endpoint(ec);
344 			if (ec)
345 			{
346 				disconnect(ec, operation_t::getname);
347 				return;
348 			}
349 			if (is_v4(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0)
350 			{
351 				m_socket->set_option(type_of_service(char(m_settings.get_int(settings_pack::peer_tos))), ec);
352 #ifndef TORRENT_DISABLE_LOGGING
353 				if (should_log(peer_log_alert::outgoing))
354 				{
355 					peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s"
356 						, m_settings.get_int(settings_pack::peer_tos), ec.message().c_str());
357 				}
358 #endif
359 			}
360 #if defined IPV6_TCLASS
361 			else if (is_v6(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0)
362 			{
363 				m_socket->set_option(traffic_class(char(m_settings.get_int(settings_pack::peer_tos))), ec);
364 			}
365 #endif
366 		}
367 
368 #ifndef TORRENT_DISABLE_LOGGING
369 		if (should_log(peer_log_alert::info))
370 		{
371 			peer_log(peer_log_alert::info, "SET_PEER_CLASS", "a: %s"
372 				, print_address(m_remote.address()).c_str());
373 		}
374 #endif
375 
376 		m_ses.set_peer_classes(this, m_remote.address(), m_socket->type());
377 
378 #ifndef TORRENT_DISABLE_LOGGING
379 		if (should_log(peer_log_alert::info))
380 		{
381 			std::string classes;
382 			for (int i = 0; i < num_classes(); ++i)
383 			{
384 				classes += m_ses.peer_classes().at(class_at(i))->label;
385 				classes += ' ';
386 			}
387 			peer_log(peer_log_alert::info, "CLASS", "%s"
388 				, classes.c_str());
389 		}
390 #endif
391 
392 		if (t && t->ready_for_connections())
393 		{
394 			init();
395 		}
396 
397 		// if this is an incoming connection, we're done here
398 		if (!m_connecting)
399 		{
400 			error_code err;
401 			aux::set_socket_buffer_size(*m_socket, m_settings, err);
402 #ifndef TORRENT_DISABLE_LOGGING
403 			if (err && should_log(peer_log_alert::incoming))
404 			{
405 				peer_log(peer_log_alert::incoming, "SOCKET_BUFFER", "%s %s"
406 					, print_endpoint(m_remote).c_str()
407 					, print_error(err).c_str());
408 			}
409 #endif
410 
411 			return;
412 		}
413 
414 #ifndef TORRENT_DISABLE_LOGGING
415 		if (should_log(peer_log_alert::outgoing))
416 		{
417 			peer_log(peer_log_alert::outgoing, "OPEN", "protocol: %s"
418 				, (is_v4(m_remote) ? "IPv4" : "IPv6"));
419 		}
420 #endif
421 		error_code ec;
422 		m_socket->open(m_remote.protocol(), ec);
423 		if (ec)
424 		{
425 			disconnect(ec, operation_t::sock_open);
426 			return;
427 		}
428 
429 		tcp::endpoint const bound_ip = m_ses.bind_outgoing_socket(*m_socket
430 			, m_remote.address(), ec);
431 #ifndef TORRENT_DISABLE_LOGGING
432 		if (should_log(peer_log_alert::outgoing))
433 		{
434 			peer_log(peer_log_alert::outgoing, "BIND", "dst: %s ec: %s"
435 				, print_endpoint(bound_ip).c_str()
436 				, ec.message().c_str());
437 		}
438 #else
439 		TORRENT_UNUSED(bound_ip);
440 #endif
441 		if (ec)
442 		{
443 			disconnect(ec, operation_t::sock_bind);
444 			return;
445 		}
446 
447 		{
448 			error_code err;
449 			aux::set_socket_buffer_size(*m_socket, m_settings, err);
450 #ifndef TORRENT_DISABLE_LOGGING
451 			if (err && should_log(peer_log_alert::outgoing))
452 			{
453 				peer_log(peer_log_alert::outgoing, "SOCKET_BUFFER", "%s %s"
454 					, print_endpoint(m_remote).c_str()
455 					, print_error(err).c_str());
456 			}
457 #endif
458 		}
459 
460 #ifndef TORRENT_DISABLE_LOGGING
461 		if (should_log(peer_log_alert::outgoing))
462 		{
463 			peer_log(peer_log_alert::outgoing, "ASYNC_CONNECT", "dst: %s"
464 				, print_endpoint(m_remote).c_str());
465 		}
466 #endif
467 		ADD_OUTSTANDING_ASYNC("peer_connection::on_connection_complete");
468 
469 		auto conn = self();
470 		m_socket->async_connect(m_remote
471 			, [conn](error_code const& e) { conn->wrap(&peer_connection::on_connection_complete, e); });
472 		m_connect = aux::time_now();
473 
474 		sent_syn(is_v6(m_remote));
475 
476 		if (t && t->alerts().should_post<peer_connect_alert>())
477 		{
478 			t->alerts().emplace_alert<peer_connect_alert>(
479 				t->get_handle(), remote(), pid(), m_socket->type());
480 		}
481 #ifndef TORRENT_DISABLE_LOGGING
482 		if (should_log(peer_log_alert::info))
483 		{
484 			peer_log(peer_log_alert::info, "LOCAL ENDPOINT", "e: %s"
485 				, print_endpoint(m_socket->local_endpoint(ec)).c_str());
486 		}
487 #endif
488 	}
489 
update_interest()490 	void peer_connection::update_interest()
491 	{
492 		TORRENT_ASSERT(is_single_thread());
493 		if (!m_need_interest_update)
494 		{
495 			// we're the first to request an interest update
496 			// post a message in order to delay it enough for
497 			// any potential other messages already in the queue
498 			// to not trigger another one. This effectively defer
499 			// the update until the current message queue is
500 			// flushed
501 			auto conn = self();
502 			m_ios.post([conn] { conn->wrap(&peer_connection::do_update_interest); });
503 		}
504 		m_need_interest_update = true;
505 	}
506 
do_update_interest()507 	void peer_connection::do_update_interest()
508 	{
509 		TORRENT_ASSERT(is_single_thread());
510 		TORRENT_ASSERT(m_need_interest_update);
511 		m_need_interest_update = false;
512 
513 		std::shared_ptr<torrent> t = m_torrent.lock();
514 		if (!t) return;
515 
516 		// if m_have_piece is 0, it means the connections
517 		// have not been initialized yet. The interested
518 		// flag will be updated once they are.
519 		if (m_have_piece.empty())
520 		{
521 #ifndef TORRENT_DISABLE_LOGGING
522 			peer_log(peer_log_alert::info, "UPDATE_INTEREST", "connections not initialized");
523 #endif
524 			return;
525 		}
526 		if (!t->ready_for_connections())
527 		{
528 #ifndef TORRENT_DISABLE_LOGGING
529 			peer_log(peer_log_alert::info, "UPDATE_INTEREST", "not ready for connections");
530 #endif
531 			return;
532 		}
533 
534 		bool interested = false;
535 		if (!t->is_upload_only())
536 		{
537 			t->need_picker();
538 			piece_picker const& p = t->picker();
539 			piece_index_t const end_piece(p.num_pieces());
540 			for (piece_index_t j(0); j != end_piece; ++j)
541 			{
542 				if (m_have_piece[j]
543 					&& t->piece_priority(j) > dont_download
544 					&& !p.has_piece_passed(j))
545 				{
546 					interested = true;
547 #ifndef TORRENT_DISABLE_LOGGING
548 					peer_log(peer_log_alert::info, "UPDATE_INTEREST", "interesting, piece: %d"
549 						, static_cast<int>(j));
550 #endif
551 					break;
552 				}
553 			}
554 		}
555 
556 #ifndef TORRENT_DISABLE_LOGGING
557 		if (!interested)
558 			peer_log(peer_log_alert::info, "UPDATE_INTEREST", "not interesting");
559 #endif
560 
561 		if (!interested) send_not_interested();
562 		else t->peer_is_interesting(*this);
563 
564 		TORRENT_ASSERT(in_handshake() || is_interesting() == interested);
565 
566 		disconnect_if_redundant();
567 	}
568 
569 #ifndef TORRENT_DISABLE_LOGGING
should_log(peer_log_alert::direction_t) const570 	bool peer_connection::should_log(peer_log_alert::direction_t) const
571 	{
572 		return m_ses.alerts().should_post<peer_log_alert>();
573 	}
574 
peer_log(peer_log_alert::direction_t direction,char const * event) const575 	void peer_connection::peer_log(peer_log_alert::direction_t direction
576 		, char const* event) const noexcept
577 	{
578 		peer_log(direction, event, "");
579 	}
580 
581 	TORRENT_FORMAT(4,5)
peer_log(peer_log_alert::direction_t direction,char const * event,char const * fmt,...) const582 	void peer_connection::peer_log(peer_log_alert::direction_t direction
583 		, char const* event, char const* fmt, ...) const noexcept try
584 	{
585 		TORRENT_ASSERT(is_single_thread());
586 
587 		if (!m_ses.alerts().should_post<peer_log_alert>()) return;
588 
589 		va_list v;
590 		va_start(v, fmt);
591 
592 		torrent_handle h;
593 		std::shared_ptr<torrent> t = m_torrent.lock();
594 		if (t) h = t->get_handle();
595 
596 		m_ses.alerts().emplace_alert<peer_log_alert>(
597 			h, m_remote, m_peer_id, direction, event, fmt, v);
598 
599 		va_end(v);
600 
601 	}
602 	catch (std::exception const&) {}
603 #endif
604 
605 #ifndef TORRENT_DISABLE_EXTENSIONS
add_extension(std::shared_ptr<peer_plugin> ext)606 	void peer_connection::add_extension(std::shared_ptr<peer_plugin> ext)
607 	{
608 		TORRENT_ASSERT(is_single_thread());
609 		m_extensions.push_back(ext);
610 	}
611 
find_plugin(string_view type)612 	peer_plugin const* peer_connection::find_plugin(string_view type)
613 	{
614 		TORRENT_ASSERT(is_single_thread());
615 		auto p = std::find_if(m_extensions.begin(), m_extensions.end()
616 			, [&](std::shared_ptr<peer_plugin> const& e) { return e->type() == type; });
617 		return p != m_extensions.end() ? p->get() : nullptr;
618 	}
619 #endif
620 
send_allowed_set()621 	void peer_connection::send_allowed_set()
622 	{
623 		TORRENT_ASSERT(is_single_thread());
624 		INVARIANT_CHECK;
625 
626 		std::shared_ptr<torrent> t = m_torrent.lock();
627 		TORRENT_ASSERT(t);
628 
629 		if (!t->valid_metadata())
630 		{
631 #ifndef TORRENT_DISABLE_LOGGING
632 			peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because we don't have metadata");
633 #endif
634 			return;
635 		}
636 
637 #ifndef TORRENT_DISABLE_SUPERSEEDING
638 		if (t->super_seeding())
639 		{
640 #ifndef TORRENT_DISABLE_LOGGING
641 			peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because of super seeding");
642 #endif
643 			return;
644 		}
645 #endif
646 
647 		if (upload_only())
648 		{
649 #ifndef TORRENT_DISABLE_LOGGING
650 			peer_log(peer_log_alert::info, "ALLOWED", "skipping allowed set because peer is upload only");
651 #endif
652 			return;
653 		}
654 
655 		int const num_allowed_pieces = m_settings.get_int(settings_pack::allowed_fast_set_size);
656 		if (num_allowed_pieces <= 0) return;
657 
658 		if (!t->valid_metadata()) return;
659 
660 		int const num_pieces = t->torrent_file().num_pieces();
661 
662 		if (num_allowed_pieces >= num_pieces)
663 		{
664 			// this is a special case where we have more allowed
665 			// fast pieces than pieces in the torrent. Just send
666 			// an allowed fast message for every single piece
667 			for (auto const i : t->torrent_file().piece_range())
668 			{
669 				// there's no point in offering fast pieces
670 				// that the peer already has
671 				if (has_piece(i)) continue;
672 
673 				write_allow_fast(i);
674 				TORRENT_ASSERT(std::find(m_accept_fast.begin()
675 					, m_accept_fast.end(), i)
676 					== m_accept_fast.end());
677 				if (m_accept_fast.empty())
678 				{
679 					m_accept_fast.reserve(10);
680 					m_accept_fast_piece_cnt.reserve(10);
681 				}
682 				m_accept_fast.push_back(i);
683 				m_accept_fast_piece_cnt.push_back(0);
684 			}
685 			return;
686 		}
687 
688 		std::string x;
689 		address const& addr = m_remote.address();
690 		if (addr.is_v4())
691 		{
692 			address_v4::bytes_type bytes = addr.to_v4().to_bytes();
693 			x.assign(reinterpret_cast<char*>(bytes.data()), bytes.size());
694 		}
695 		else
696 		{
697 			address_v6::bytes_type bytes = addr.to_v6().to_bytes();
698 			x.assign(reinterpret_cast<char*>(bytes.data()), bytes.size());
699 		}
700 		x.append(t->torrent_file().info_hash().data(), 20);
701 
702 		sha1_hash hash = hasher(x).final();
703 		int attempts = 0;
704 		int loops = 0;
705 		for (;;)
706 		{
707 			char const* p = hash.data();
708 			for (int i = 0; i < int(hash.size() / sizeof(std::uint32_t)); ++i)
709 			{
710 				++loops;
711 				TORRENT_ASSERT(num_pieces > 0);
712 				piece_index_t const piece(int(detail::read_uint32(p) % std::uint32_t(num_pieces)));
713 				if (std::find(m_accept_fast.begin(), m_accept_fast.end(), piece)
714 					!= m_accept_fast.end())
715 				{
716 					// this is our safety-net to make sure this loop terminates, even
717 					// under the worst conditions
718 					if (++loops > 500) return;
719 					continue;
720 				}
721 
722 				if (!has_piece(piece))
723 				{
724 					write_allow_fast(piece);
725 					if (m_accept_fast.empty())
726 					{
727 						m_accept_fast.reserve(10);
728 						m_accept_fast_piece_cnt.reserve(10);
729 					}
730 					m_accept_fast.push_back(piece);
731 					m_accept_fast_piece_cnt.push_back(0);
732 				}
733 				if (++attempts >= num_allowed_pieces) return;
734 			}
735 			hash = hasher(hash).final();
736 		}
737 	}
738 
on_metadata_impl()739 	void peer_connection::on_metadata_impl()
740 	{
741 		TORRENT_ASSERT(is_single_thread());
742 		std::shared_ptr<torrent> t = associated_torrent().lock();
743 		m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
744 		m_num_pieces = m_have_piece.count();
745 
746 		piece_index_t const limit(m_num_pieces);
747 
748 		// now that we know how many pieces there are
749 		// remove any invalid allowed_fast and suggest pieces
750 		// now that we know what the number of pieces are
751 		m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin(), m_allowed_fast.end()
752 			, [=](piece_index_t const p) { return p >= limit; })
753 			, m_allowed_fast.end());
754 
755 		// remove any piece suggested to us whose index is invalid
756 		// now that we know how many pieces there are
757 		m_suggested_pieces.erase(
758 			std::remove_if(m_suggested_pieces.begin(), m_suggested_pieces.end()
759 				, [=](piece_index_t const p) { return p >= limit; })
760 			, m_suggested_pieces.end());
761 
762 		on_metadata();
763 		if (m_disconnecting) return;
764 	}
765 
init()766 	void peer_connection::init()
767 	{
768 		TORRENT_ASSERT(is_single_thread());
769 		INVARIANT_CHECK;
770 
771 		std::shared_ptr<torrent> t = m_torrent.lock();
772 		TORRENT_ASSERT(t);
773 		TORRENT_ASSERT(t->valid_metadata());
774 		TORRENT_ASSERT(t->ready_for_connections());
775 
776 		m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
777 
778 		if (m_have_all)
779 		{
780 			m_num_pieces = t->torrent_file().num_pieces();
781 			m_have_piece.set_all();
782 		}
783 
784 #if TORRENT_USE_ASSERTS
785 		TORRENT_ASSERT(!m_initialized);
786 		m_initialized = true;
787 #endif
788 		// now that we have a piece_picker,
789 		// update it with this peer's pieces
790 
791 		TORRENT_ASSERT(m_num_pieces == m_have_piece.count());
792 
793 		if (m_num_pieces == m_have_piece.size())
794 		{
795 #ifndef TORRENT_DISABLE_LOGGING
796 			peer_log(peer_log_alert::info, "INIT", "this is a seed p: %p"
797 				, static_cast<void*>(m_peer_info));
798 #endif
799 
800 			TORRENT_ASSERT(m_have_piece.all_set());
801 			TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size());
802 			TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces());
803 
804 			// if this is a web seed. we don't have a peer_info struct
805 			t->set_seed(m_peer_info, true);
806 			TORRENT_ASSERT(is_seed());
807 			m_upload_only = true;
808 
809 			t->peer_has_all(this);
810 
811 #if TORRENT_USE_INVARIANT_CHECKS
812 			if (t && t->has_picker())
813 				t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
814 #endif
815 			if (t->is_upload_only()) send_not_interested();
816 			else t->peer_is_interesting(*this);
817 			disconnect_if_redundant();
818 			return;
819 		}
820 
821 		TORRENT_ASSERT(!is_seed());
822 
823 		// if we're a seed, we don't keep track of piece availability
824 		if (t->has_picker())
825 		{
826 			TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces());
827 			t->peer_has(m_have_piece, this);
828 			bool interesting = false;
829 			for (auto const i : m_have_piece.range())
830 			{
831 				if (!m_have_piece[i]) continue;
832 				// if the peer has a piece and we don't, the peer is interesting
833 				if (!t->have_piece(i)
834 					&& t->picker().piece_priority(i) != dont_download)
835 					interesting = true;
836 			}
837 			if (interesting) t->peer_is_interesting(*this);
838 			else send_not_interested();
839 		}
840 		else
841 		{
842 			update_interest();
843 		}
844 	}
845 
~peer_connection()846 	peer_connection::~peer_connection()
847 	{
848 		m_counters.inc_stats_counter(counters::num_tcp_peers + m_socket->type() - 1, -1);
849 
850 //		INVARIANT_CHECK;
851 		TORRENT_ASSERT(!m_in_constructor);
852 		TORRENT_ASSERT(!m_destructed);
853 #if TORRENT_USE_ASSERTS
854 		m_destructed = true;
855 #endif
856 
857 #if TORRENT_USE_ASSERTS
858 		m_in_use = 0;
859 #endif
860 
861 		// decrement the stats counter
862 		set_endgame(false);
863 
864 		if (m_interesting)
865 			m_counters.inc_stats_counter(counters::num_peers_down_interested, -1);
866 		if (m_peer_interested)
867 			m_counters.inc_stats_counter(counters::num_peers_up_interested, -1);
868 		if (!m_choked)
869 		{
870 			m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1);
871 			if (!ignore_unchoke_slots())
872 				m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
873 		}
874 		if (!m_peer_choked)
875 			m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1);
876 		if (m_connected)
877 			m_counters.inc_stats_counter(counters::num_peers_connected, -1);
878 		m_connected = false;
879 		if (!m_download_queue.empty())
880 			m_counters.inc_stats_counter(counters::num_peers_down_requests, -1);
881 
882 		// defensive
883 		std::shared_ptr<torrent> t = m_torrent.lock();
884 		// if t is nullptr, we better not be connecting, since
885 		// we can't decrement the connecting counter
886 		TORRENT_ASSERT(t || !m_connecting);
887 
888 		// we should really have dealt with this already
889 		if (m_connecting)
890 		{
891 			m_counters.inc_stats_counter(counters::num_peers_half_open, -1);
892 			if (t) t->dec_num_connecting(m_peer_info);
893 			m_connecting = false;
894 		}
895 
896 #ifndef TORRENT_DISABLE_EXTENSIONS
897 		m_extensions.clear();
898 #endif
899 
900 #ifndef TORRENT_DISABLE_LOGGING
901 		peer_log(peer_log_alert::info, "CONNECTION CLOSED");
902 #endif
903 		TORRENT_ASSERT(m_request_queue.empty());
904 		TORRENT_ASSERT(m_download_queue.empty());
905 	}
906 
on_parole() const907 	bool peer_connection::on_parole() const
908 	{ return peer_info_struct() && peer_info_struct()->on_parole; }
909 
picker_options() const910 	picker_options_t peer_connection::picker_options() const
911 	{
912 		TORRENT_ASSERT(is_single_thread());
913 		picker_options_t ret = m_picker_options;
914 
915 		std::shared_ptr<torrent> t = m_torrent.lock();
916 		TORRENT_ASSERT(t);
917 		if (!t) return {};
918 
919 		if (t->num_time_critical_pieces() > 0)
920 		{
921 			ret |= piece_picker::time_critical_mode;
922 		}
923 
924 		if (t->is_sequential_download())
925 		{
926 			ret |= piece_picker::sequential;
927 		}
928 		else if (t->num_have() < m_settings.get_int(settings_pack::initial_picker_threshold))
929 		{
930 			// if we have fewer pieces than a certain threshold
931 			// don't pick rare pieces, just pick random ones,
932 			// and prioritize finishing them
933 			ret |= piece_picker::prioritize_partials;
934 		}
935 		else
936 		{
937 			ret |= piece_picker::rarest_first;
938 
939 			if (m_snubbed)
940 			{
941 				// snubbed peers should request
942 				// the common pieces first, just to make
943 				// it more likely for all snubbed peers to
944 				// request blocks from the same piece
945 				ret |= piece_picker::reverse;
946 			}
947 			else
948 			{
949 				if (m_settings.get_bool(settings_pack::piece_extent_affinity)
950 					&& t->num_time_critical_pieces() == 0)
951 					ret |= piece_picker::piece_extent_affinity;
952 			}
953 		}
954 
955 		if (m_settings.get_bool(settings_pack::prioritize_partial_pieces))
956 			ret |= piece_picker::prioritize_partials;
957 
958 		if (on_parole()) ret |= piece_picker::on_parole
959 			| piece_picker::prioritize_partials;
960 
961 		// only one of rarest_first and sequential can be set. i.e. the sum of
962 		// whether the bit is set or not may only be 0 or 1 (never 2)
963 		TORRENT_ASSERT(((ret & piece_picker::rarest_first) ? 1 : 0)
964 			+ ((ret & piece_picker::sequential) ? 1 : 0) <= 1);
965 		return ret;
966 	}
967 
fast_reconnect(bool r)968 	void peer_connection::fast_reconnect(bool r)
969 	{
970 		TORRENT_ASSERT(is_single_thread());
971 		if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1)
972 			return;
973 		m_fast_reconnect = r;
974 		peer_info_struct()->last_connected = std::uint16_t(m_ses.session_time());
975 		int const rewind = m_settings.get_int(settings_pack::min_reconnect_time)
976 			* m_settings.get_int(settings_pack::max_failcount);
977 		if (int(peer_info_struct()->last_connected) < rewind) peer_info_struct()->last_connected = 0;
978 		else peer_info_struct()->last_connected -= std::uint16_t(rewind);
979 
980 		if (peer_info_struct()->fast_reconnects < 15)
981 			++peer_info_struct()->fast_reconnects;
982 	}
983 
received_piece(piece_index_t const index)984 	void peer_connection::received_piece(piece_index_t const index)
985 	{
986 		TORRENT_ASSERT(is_single_thread());
987 		// dont announce during handshake
988 		if (in_handshake()) return;
989 
990 #ifndef TORRENT_DISABLE_LOGGING
991 		peer_log(peer_log_alert::incoming, "RECEIVED", "piece: %d"
992 			, static_cast<int>(index));
993 #endif
994 
995 		// remove suggested pieces once we have them
996 		auto i = std::find(m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
997 		if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
998 
999 		// remove allowed fast pieces
1000 		i = std::find(m_allowed_fast.begin(), m_allowed_fast.end(), index);
1001 		if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
1002 
1003 		if (has_piece(index))
1004 		{
1005 			// if we got a piece that this peer has
1006 			// it might have been the last interesting
1007 			// piece this peer had. We might not be
1008 			// interested anymore
1009 			update_interest();
1010 			if (is_disconnecting()) return;
1011 		}
1012 
1013 		if (disconnect_if_redundant()) return;
1014 
1015 #if TORRENT_USE_ASSERTS
1016 		std::shared_ptr<torrent> t = m_torrent.lock();
1017 		TORRENT_ASSERT(t);
1018 #endif
1019 	}
1020 
announce_piece(piece_index_t const index)1021 	void peer_connection::announce_piece(piece_index_t const index)
1022 	{
1023 		TORRENT_ASSERT(is_single_thread());
1024 		// dont announce during handshake
1025 		if (in_handshake()) return;
1026 
1027 		// optimization, don't send have messages
1028 		// to peers that already have the piece
1029 		if (!m_settings.get_bool(settings_pack::send_redundant_have)
1030 			&& has_piece(index))
1031 		{
1032 #ifndef TORRENT_DISABLE_LOGGING
1033 			peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d SUPPRESSED"
1034 				, static_cast<int>(index));
1035 #endif
1036 			return;
1037 		}
1038 
1039 		if (disconnect_if_redundant()) return;
1040 
1041 #ifndef TORRENT_DISABLE_LOGGING
1042 		peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d"
1043 			, static_cast<int>(index));
1044 #endif
1045 		write_have(index);
1046 #if TORRENT_USE_ASSERTS
1047 		std::shared_ptr<torrent> t = m_torrent.lock();
1048 		TORRENT_ASSERT(t);
1049 #endif
1050 	}
1051 
has_piece(piece_index_t const i) const1052 	bool peer_connection::has_piece(piece_index_t const i) const
1053 	{
1054 		TORRENT_ASSERT(is_single_thread());
1055 #if TORRENT_USE_ASSERTS
1056 		std::shared_ptr<torrent> t = m_torrent.lock();
1057 		TORRENT_ASSERT(t);
1058 		TORRENT_ASSERT(t->valid_metadata());
1059 		TORRENT_ASSERT(i >= piece_index_t(0));
1060 		TORRENT_ASSERT(i < t->torrent_file().end_piece());
1061 #endif
1062 		if (m_have_piece.empty()) return false;
1063 		return m_have_piece[i];
1064 	}
1065 
request_queue() const1066 	std::vector<pending_block> const& peer_connection::request_queue() const
1067 	{
1068 		TORRENT_ASSERT(is_single_thread());
1069 		return m_request_queue;
1070 	}
1071 
download_queue() const1072 	std::vector<pending_block> const& peer_connection::download_queue() const
1073 	{
1074 		TORRENT_ASSERT(is_single_thread());
1075 		return m_download_queue;
1076 	}
1077 
upload_queue() const1078 	std::vector<peer_request> const& peer_connection::upload_queue() const
1079 	{
1080 		TORRENT_ASSERT(is_single_thread());
1081 		return m_requests;
1082 	}
1083 
download_queue_time(int const extra_bytes) const1084 	time_duration peer_connection::download_queue_time(int const extra_bytes) const
1085 	{
1086 		TORRENT_ASSERT(is_single_thread());
1087 		std::shared_ptr<torrent> t = m_torrent.lock();
1088 		TORRENT_ASSERT(t);
1089 
1090 		int rate = 0;
1091 
1092 		// if we haven't received any data recently, the current download rate
1093 		// is not representative
1094 		if (aux::time_now() - m_last_piece > seconds(30) && m_download_rate_peak > 0)
1095 		{
1096 			rate = m_download_rate_peak;
1097 		}
1098 		else if (aux::time_now() - m_last_unchoked < seconds(5)
1099 			&& m_statistics.total_payload_upload() < 2 * 0x4000)
1100 		{
1101 			// if we're have only been unchoked for a short period of time,
1102 			// we don't know what rate we can get from this peer. Instead of assuming
1103 			// the lowest possible rate, assume the average.
1104 
1105 			int peers_with_requests = int(stats_counters()[counters::num_peers_down_requests]);
1106 			// avoid division by 0
1107 			if (peers_with_requests == 0) peers_with_requests = 1;
1108 
1109 			// TODO: this should be the global download rate
1110 			rate = t->statistics().transfer_rate(stat::download_payload) / peers_with_requests;
1111 		}
1112 		else
1113 		{
1114 			// current download rate in bytes per seconds
1115 			rate = m_statistics.transfer_rate(stat::download_payload);
1116 		}
1117 
1118 		// avoid division by zero
1119 		if (rate < 50) rate = 50;
1120 
1121 		// average of current rate and peak
1122 //		rate = (rate + m_download_rate_peak) / 2;
1123 
1124 		return milliseconds((m_outstanding_bytes + extra_bytes
1125 			+ m_queued_time_critical * t->block_size() * 1000) / rate);
1126 	}
1127 
add_stat(std::int64_t const downloaded,std::int64_t const uploaded)1128 	void peer_connection::add_stat(std::int64_t const downloaded, std::int64_t const uploaded)
1129 	{
1130 		TORRENT_ASSERT(is_single_thread());
1131 		m_statistics.add_stat(downloaded, uploaded);
1132 	}
1133 
received_bytes(int const bytes_payload,int const bytes_protocol)1134 	void peer_connection::received_bytes(int const bytes_payload, int const bytes_protocol)
1135 	{
1136 		TORRENT_ASSERT(is_single_thread());
1137 		m_statistics.received_bytes(bytes_payload, bytes_protocol);
1138 		if (m_ignore_stats) return;
1139 		std::shared_ptr<torrent> t = m_torrent.lock();
1140 		if (!t) return;
1141 		t->received_bytes(bytes_payload, bytes_protocol);
1142 	}
1143 
sent_bytes(int const bytes_payload,int const bytes_protocol)1144 	void peer_connection::sent_bytes(int const bytes_payload, int const bytes_protocol)
1145 	{
1146 		TORRENT_ASSERT(is_single_thread());
1147 		m_statistics.sent_bytes(bytes_payload, bytes_protocol);
1148 #ifndef TORRENT_DISABLE_EXTENSIONS
1149 		if (bytes_payload)
1150 		{
1151 			for (auto const& e : m_extensions)
1152 			{
1153 				e->sent_payload(bytes_payload);
1154 			}
1155 		}
1156 #endif
1157 		if (bytes_payload > 0) m_last_sent_payload = clock_type::now();
1158 		if (m_ignore_stats) return;
1159 		std::shared_ptr<torrent> t = m_torrent.lock();
1160 		if (!t) return;
1161 		t->sent_bytes(bytes_payload, bytes_protocol);
1162 	}
1163 
trancieve_ip_packet(int const bytes,bool const ipv6)1164 	void peer_connection::trancieve_ip_packet(int const bytes, bool const ipv6)
1165 	{
1166 		TORRENT_ASSERT(is_single_thread());
1167 		m_statistics.trancieve_ip_packet(bytes, ipv6);
1168 		if (m_ignore_stats) return;
1169 		std::shared_ptr<torrent> t = m_torrent.lock();
1170 		if (!t) return;
1171 		t->trancieve_ip_packet(bytes, ipv6);
1172 	}
1173 
sent_syn(bool const ipv6)1174 	void peer_connection::sent_syn(bool const ipv6)
1175 	{
1176 		TORRENT_ASSERT(is_single_thread());
1177 		m_statistics.sent_syn(ipv6);
1178 		if (m_ignore_stats) return;
1179 		std::shared_ptr<torrent> t = m_torrent.lock();
1180 		if (!t) return;
1181 		t->sent_syn(ipv6);
1182 	}
1183 
received_synack(bool const ipv6)1184 	void peer_connection::received_synack(bool const ipv6)
1185 	{
1186 		TORRENT_ASSERT(is_single_thread());
1187 		m_statistics.received_synack(ipv6);
1188 		if (m_ignore_stats) return;
1189 		std::shared_ptr<torrent> t = m_torrent.lock();
1190 		if (!t) return;
1191 		t->received_synack(ipv6);
1192 	}
1193 
get_bitfield() const1194 	typed_bitfield<piece_index_t> const& peer_connection::get_bitfield() const
1195 	{
1196 		TORRENT_ASSERT(is_single_thread());
1197 		return m_have_piece;
1198 	}
1199 
received_valid_data(piece_index_t const index)1200 	void peer_connection::received_valid_data(piece_index_t const index)
1201 	{
1202 		TORRENT_ASSERT(is_single_thread());
1203 		// this fails because we haven't had time to disconnect
1204 		// seeds yet, and we might have just become one
1205 //		INVARIANT_CHECK;
1206 
1207 #ifndef TORRENT_DISABLE_EXTENSIONS
1208 		for (auto const& e : m_extensions)
1209 		{
1210 			e->on_piece_pass(index);
1211 		}
1212 #else
1213 		TORRENT_UNUSED(index);
1214 #endif
1215 	}
1216 
1217 	// single_peer is true if the entire piece was received by a single
1218 	// peer
received_invalid_data(piece_index_t const index,bool single_peer)1219 	bool peer_connection::received_invalid_data(piece_index_t const index, bool single_peer)
1220 	{
1221 		TORRENT_ASSERT(is_single_thread());
1222 		INVARIANT_CHECK;
1223 		TORRENT_UNUSED(single_peer);
1224 
1225 #ifndef TORRENT_DISABLE_EXTENSIONS
1226 		for (auto const& e : m_extensions)
1227 		{
1228 			e->on_piece_failed(index);
1229 		}
1230 #else
1231 		TORRENT_UNUSED(index);
1232 #endif
1233 		return true;
1234 	}
1235 
1236 	// verifies a piece to see if it is valid (is within a valid range)
1237 	// and if it can correspond to a request generated by libtorrent.
verify_piece(peer_request const & p) const1238 	bool peer_connection::verify_piece(peer_request const& p) const
1239 	{
1240 		TORRENT_ASSERT(is_single_thread());
1241 		std::shared_ptr<torrent> t = m_torrent.lock();
1242 		TORRENT_ASSERT(t);
1243 
1244 		TORRENT_ASSERT(t->valid_metadata());
1245 		torrent_info const& ti = t->torrent_file();
1246 
1247 		return p.piece >= piece_index_t(0)
1248 			&& p.piece < ti.end_piece()
1249 			&& p.start >= 0
1250 			&& p.start < ti.piece_length()
1251 			&& t->to_req(piece_block(p.piece, p.start / t->block_size())) == p;
1252 	}
1253 
attach_to_torrent(sha1_hash const & ih)1254 	void peer_connection::attach_to_torrent(sha1_hash const& ih)
1255 	{
1256 		TORRENT_ASSERT(is_single_thread());
1257 		INVARIANT_CHECK;
1258 
1259 #ifndef TORRENT_DISABLE_LOGGING
1260 		peer_log(peer_log_alert::info, "ATTACH", "attached to torrent");
1261 #endif
1262 
1263 		TORRENT_ASSERT(!m_disconnecting);
1264 		TORRENT_ASSERT(m_torrent.expired());
1265 		std::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
1266 		std::shared_ptr<torrent> t = wpt.lock();
1267 
1268 		if (t && t->is_aborted())
1269 		{
1270 #ifndef TORRENT_DISABLE_LOGGING
1271 			peer_log(peer_log_alert::info, "ATTACH", "the torrent has been aborted");
1272 #endif
1273 			t.reset();
1274 		}
1275 
1276 		if (!t)
1277 		{
1278 			t = m_ses.delay_load_torrent(ih, this);
1279 #ifndef TORRENT_DISABLE_LOGGING
1280 			if (t && should_log(peer_log_alert::info))
1281 			{
1282 				peer_log(peer_log_alert::info, "ATTACH"
1283 					, "Delay loaded torrent: %s:", aux::to_hex(ih).c_str());
1284 			}
1285 #endif
1286 		}
1287 
1288 		if (!t)
1289 		{
1290 			// we couldn't find the torrent!
1291 #ifndef TORRENT_DISABLE_LOGGING
1292 			if (should_log(peer_log_alert::info))
1293 			{
1294 				peer_log(peer_log_alert::info, "ATTACH"
1295 					, "couldn't find a torrent with the given info_hash: %s torrents:"
1296 					, aux::to_hex(ih).c_str());
1297 			}
1298 #endif
1299 
1300 #ifndef TORRENT_DISABLE_DHT
1301 			if (dht::verify_secret_id(ih))
1302 			{
1303 				// this means the hash was generated from our generate_secret_id()
1304 				// as part of DHT traffic. The fact that we got an incoming
1305 				// connection on this info-hash, means the other end, making this
1306 				// connection fished it out of the DHT chatter. That's suspicious.
1307 				m_ses.ban_ip(m_remote.address());
1308 			}
1309 #endif
1310 			disconnect(errors::invalid_info_hash, operation_t::bittorrent, failure);
1311 			return;
1312 		}
1313 
1314 		if (t->is_paused()
1315 			&& t->is_auto_managed()
1316 			&& m_settings.get_bool(settings_pack::incoming_starts_queued_torrents)
1317 			&& !t->is_aborted())
1318 		{
1319 			t->resume();
1320 		}
1321 
1322 		if (t->is_paused() || t->is_aborted() || t->graceful_pause())
1323 		{
1324 			// paused torrents will not accept
1325 			// incoming connections unless they are auto managed
1326 			// and incoming_starts_queued_torrents is true
1327 			// torrents that have errors should always reject
1328 			// incoming peers
1329 #ifndef TORRENT_DISABLE_LOGGING
1330 			peer_log(peer_log_alert::info, "ATTACH", "rejected connection to paused torrent");
1331 #endif
1332 			disconnect(errors::torrent_paused, operation_t::bittorrent, peer_error);
1333 			return;
1334 		}
1335 
1336 #if TORRENT_USE_I2P
1337 		auto* i2ps = m_socket->get<i2p_stream>();
1338 		if (!i2ps && t->torrent_file().is_i2p()
1339 			&& !m_settings.get_bool(settings_pack::allow_i2p_mixed))
1340 		{
1341 			// the torrent is an i2p torrent, the peer is a regular peer
1342 			// and we don't allow mixed mode. Disconnect the peer.
1343 #ifndef TORRENT_DISABLE_LOGGING
1344 			peer_log(peer_log_alert::info, "ATTACH", "rejected regular connection to i2p torrent");
1345 #endif
1346 			disconnect(errors::peer_banned, operation_t::bittorrent, peer_error);
1347 			return;
1348 		}
1349 #endif // TORRENT_USE_I2P
1350 
1351 		TORRENT_ASSERT(m_torrent.expired());
1352 
1353 		// check to make sure we don't have another connection with the same
1354 		// info_hash and peer_id. If we do. close this connection.
1355 		t->attach_peer(this);
1356 		if (m_disconnecting) return;
1357 		// it's important to assign the torrent after successfully attaching.
1358 		// if the peer disconnects while attaching, it's not a proper member
1359 		// of the torrent and peer_connection::disconnect() will fail if it
1360 		// think it is
1361 		m_torrent = t;
1362 
1363 		if (m_exceeded_limit)
1364 		{
1365 			// find a peer in some torrent (presumably the one with most peers)
1366 			// and disconnect the lowest ranking peer
1367 			std::weak_ptr<torrent> torr = m_ses.find_disconnect_candidate_torrent();
1368 			std::shared_ptr<torrent> other_t = torr.lock();
1369 
1370 			if (other_t)
1371 			{
1372 				if (other_t->num_peers() <= t->num_peers())
1373 				{
1374 					disconnect(errors::too_many_connections, operation_t::bittorrent);
1375 					return;
1376 				}
1377 				// find the lowest ranking peer and disconnect that
1378 				peer_connection* p = other_t->find_lowest_ranking_peer();
1379 				if (p != nullptr)
1380 				{
1381 					p->disconnect(errors::too_many_connections, operation_t::bittorrent);
1382 					peer_disconnected_other();
1383 				}
1384 				else
1385 				{
1386 					disconnect(errors::too_many_connections, operation_t::bittorrent);
1387 					return;
1388 				}
1389 			}
1390 			else
1391 			{
1392 				disconnect(errors::too_many_connections, operation_t::bittorrent);
1393 				return;
1394 			}
1395 		}
1396 
1397 		TORRENT_ASSERT(!m_torrent.expired());
1398 
1399 		// if the torrent isn't ready to accept
1400 		// connections yet, we'll have to wait with
1401 		// our initialization
1402 		if (t->ready_for_connections()) init();
1403 
1404 		TORRENT_ASSERT(!m_torrent.expired());
1405 
1406 		// assume the other end has no pieces
1407 		// if we don't have valid metadata yet,
1408 		// leave the vector unallocated
1409 		TORRENT_ASSERT(m_num_pieces == 0);
1410 		m_have_piece.clear_all();
1411 		TORRENT_ASSERT(!m_torrent.expired());
1412 	}
1413 
peer_rank() const1414 	std::uint32_t peer_connection::peer_rank() const
1415 	{
1416 		TORRENT_ASSERT(is_single_thread());
1417 		return m_peer_info == nullptr ? 0
1418 			: m_peer_info->rank(m_ses.external_address(), m_ses.listen_port());
1419 	}
1420 
1421 	// message handlers
1422 
1423 	// -----------------------------
1424 	// --------- KEEPALIVE ---------
1425 	// -----------------------------
1426 
incoming_keepalive()1427 	void peer_connection::incoming_keepalive()
1428 	{
1429 		TORRENT_ASSERT(is_single_thread());
1430 		INVARIANT_CHECK;
1431 
1432 #ifndef TORRENT_DISABLE_LOGGING
1433 		peer_log(peer_log_alert::incoming_message, "KEEPALIVE");
1434 #endif
1435 	}
1436 
1437 	// -----------------------------
1438 	// ----------- CHOKE -----------
1439 	// -----------------------------
1440 
set_endgame(bool b)1441 	void peer_connection::set_endgame(bool b)
1442 	{
1443 		TORRENT_ASSERT(is_single_thread());
1444 		if (m_endgame_mode == b) return;
1445 		m_endgame_mode = b;
1446 		if (m_endgame_mode)
1447 			m_counters.inc_stats_counter(counters::num_peers_end_game);
1448 		else
1449 			m_counters.inc_stats_counter(counters::num_peers_end_game, -1);
1450 	}
1451 
incoming_choke()1452 	void peer_connection::incoming_choke()
1453 	{
1454 		TORRENT_ASSERT(is_single_thread());
1455 		INVARIANT_CHECK;
1456 
1457 #ifndef TORRENT_DISABLE_EXTENSIONS
1458 		for (auto const& e : m_extensions)
1459 		{
1460 			if (e->on_choke()) return;
1461 		}
1462 #endif
1463 		if (is_disconnecting()) return;
1464 
1465 #ifndef TORRENT_DISABLE_LOGGING
1466 		peer_log(peer_log_alert::incoming_message, "CHOKE");
1467 #endif
1468 		if (m_peer_choked == false)
1469 			m_counters.inc_stats_counter(counters::num_peers_down_unchoked, -1);
1470 
1471 		m_peer_choked = true;
1472 		set_endgame(false);
1473 
1474 		clear_request_queue();
1475 	}
1476 
clear_request_queue()1477 	void peer_connection::clear_request_queue()
1478 	{
1479 		TORRENT_ASSERT(is_single_thread());
1480 		std::shared_ptr<torrent> t = m_torrent.lock();
1481 		TORRENT_ASSERT(t);
1482 		if (!t->has_picker())
1483 		{
1484 			m_request_queue.clear();
1485 			return;
1486 		}
1487 
1488 		// clear the requests that haven't been sent yet
1489 		if (peer_info_struct() == nullptr || !peer_info_struct()->on_parole)
1490 		{
1491 			// if the peer is not in parole mode, clear the queued
1492 			// up block requests
1493 			piece_picker& p = t->picker();
1494 			for (auto const& r : m_request_queue)
1495 			{
1496 				p.abort_download(r.block, peer_info_struct());
1497 			}
1498 			m_request_queue.clear();
1499 			m_queued_time_critical = 0;
1500 		}
1501 	}
1502 
clear_download_queue()1503 	void peer_connection::clear_download_queue()
1504 	{
1505 		std::shared_ptr<torrent> t = m_torrent.lock();
1506 		piece_picker& picker = t->picker();
1507 		torrent_peer* self_peer = peer_info_struct();
1508 		while (!m_download_queue.empty())
1509 		{
1510 			pending_block& qe = m_download_queue.back();
1511 			if (!qe.timed_out && !qe.not_wanted)
1512 				picker.abort_download(qe.block, self_peer);
1513 			m_outstanding_bytes -= t->to_req(qe.block).length;
1514 			if (m_outstanding_bytes < 0) m_outstanding_bytes = 0;
1515 			m_download_queue.pop_back();
1516 		}
1517 	}
1518 
1519 	// -----------------------------
1520 	// -------- REJECT PIECE -------
1521 	// -----------------------------
1522 
incoming_reject_request(peer_request const & r)1523 	void peer_connection::incoming_reject_request(peer_request const& r)
1524 	{
1525 		TORRENT_ASSERT(is_single_thread());
1526 		INVARIANT_CHECK;
1527 
1528 		std::shared_ptr<torrent> t = m_torrent.lock();
1529 		TORRENT_ASSERT(t);
1530 
1531 #ifndef TORRENT_DISABLE_LOGGING
1532 		peer_log(peer_log_alert::incoming_message, "REJECT_PIECE", "piece: %d s: %x l: %x"
1533 			, static_cast<int>(r.piece), r.start, r.length);
1534 #endif
1535 
1536 #ifndef TORRENT_DISABLE_EXTENSIONS
1537 		for (auto const& e : m_extensions)
1538 		{
1539 			if (e->on_reject(r)) return;
1540 		}
1541 #endif
1542 
1543 		if (is_disconnecting()) return;
1544 
1545 		int const block_size = t->block_size();
1546 		if (r.piece < piece_index_t{}
1547 			|| r.piece >= t->torrent_file().files().end_piece()
1548 			|| r.start < 0
1549 			|| r.start >= t->torrent_file().piece_length()
1550 			|| (r.start % block_size) != 0
1551 			|| r.length != std::min(t->torrent_file().piece_size(r.piece) - r.start, block_size))
1552 		{
1553 #ifndef TORRENT_DISABLE_LOGGING
1554 			peer_log(peer_log_alert::info, "REJECT_PIECE", "invalid reject message (%d, %d, %d)"
1555 				, int(r.piece), int(r.start), int(r.length));
1556 #endif
1557 			return;
1558 		}
1559 
1560 		auto const dlq_iter = std::find_if(
1561 			m_download_queue.begin(), m_download_queue.end()
1562 			, [&r, block_size](pending_block const& pb)
1563 			{
1564 				auto const& b = pb.block;
1565 				if (b.piece_index != r.piece) return false;
1566 				if (b.block_index != r.start / block_size) return false;
1567 				return true;
1568 			});
1569 
1570 		if (dlq_iter != m_download_queue.end())
1571 		{
1572 			pending_block const b = *dlq_iter;
1573 			bool const remove_from_picker = !dlq_iter->timed_out && !dlq_iter->not_wanted;
1574 			m_download_queue.erase(dlq_iter);
1575 			TORRENT_ASSERT(m_outstanding_bytes >= r.length);
1576 			m_outstanding_bytes -= r.length;
1577 			if (m_outstanding_bytes < 0) m_outstanding_bytes = 0;
1578 
1579 			if (m_download_queue.empty())
1580 				m_counters.inc_stats_counter(counters::num_peers_down_requests, -1);
1581 
1582 			// if the peer is in parole mode, keep the request
1583 			if (peer_info_struct() && peer_info_struct()->on_parole)
1584 			{
1585 				// we should only add it if the block is marked as
1586 				// busy in the piece-picker
1587 				if (remove_from_picker)
1588 					m_request_queue.insert(m_request_queue.begin(), b);
1589 			}
1590 			else if (!t->is_seed() && remove_from_picker)
1591 			{
1592 				piece_picker& p = t->picker();
1593 				p.abort_download(b.block, peer_info_struct());
1594 			}
1595 #if TORRENT_USE_INVARIANT_CHECKS
1596 			check_invariant();
1597 #endif
1598 		}
1599 #ifndef TORRENT_DISABLE_LOGGING
1600 		else
1601 		{
1602 			peer_log(peer_log_alert::info, "REJECT_PIECE", "piece not in request queue (%d, %d, %d)"
1603 				, int(r.piece), int(r.start), int(r.length));
1604 		}
1605 #endif
1606 		if (has_peer_choked())
1607 		{
1608 			// if we're choked and we got a rejection of
1609 			// a piece in the allowed fast set, remove it
1610 			// from the allow fast set.
1611 			auto const i = std::find(m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
1612 			if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
1613 		}
1614 		else
1615 		{
1616 			auto const i = std::find(m_suggested_pieces.begin(), m_suggested_pieces.end(), r.piece);
1617 			if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
1618 		}
1619 
1620 		check_graceful_pause();
1621 		if (is_disconnecting()) return;
1622 
1623 		if (m_request_queue.empty() && m_download_queue.size() < 2)
1624 		{
1625 			if (request_a_block(*t, *this))
1626 				m_counters.inc_stats_counter(counters::reject_piece_picks);
1627 		}
1628 
1629 		send_block_requests();
1630 	}
1631 
1632 	// -----------------------------
1633 	// ------- SUGGEST PIECE -------
1634 	// -----------------------------
1635 
incoming_suggest(piece_index_t const index)1636 	void peer_connection::incoming_suggest(piece_index_t const index)
1637 	{
1638 		TORRENT_ASSERT(is_single_thread());
1639 		INVARIANT_CHECK;
1640 
1641 #ifndef TORRENT_DISABLE_LOGGING
1642 		peer_log(peer_log_alert::incoming_message, "SUGGEST_PIECE"
1643 			, "piece: %d", static_cast<int>(index));
1644 #endif
1645 		std::shared_ptr<torrent> t = m_torrent.lock();
1646 		if (!t) return;
1647 
1648 #ifndef TORRENT_DISABLE_EXTENSIONS
1649 		for (auto const& e : m_extensions)
1650 		{
1651 			if (e->on_suggest(index)) return;
1652 		}
1653 #endif
1654 
1655 		if (is_disconnecting()) return;
1656 		if (index < piece_index_t(0))
1657 		{
1658 #ifndef TORRENT_DISABLE_LOGGING
1659 			peer_log(peer_log_alert::incoming_message, "INVALID_SUGGEST_PIECE"
1660 				, "%d", static_cast<int>(index));
1661 #endif
1662 			return;
1663 		}
1664 
1665 		if (t->valid_metadata())
1666 		{
1667 			if (index >= m_have_piece.end_index())
1668 			{
1669 #ifndef TORRENT_DISABLE_LOGGING
1670 				peer_log(peer_log_alert::incoming_message, "INVALID_SUGGEST"
1671 					, "%d s: %d", static_cast<int>(index), m_have_piece.size());
1672 #endif
1673 				return;
1674 			}
1675 
1676 			// if we already have the piece, we can
1677 			// ignore this message
1678 			if (t->have_piece(index))
1679 				return;
1680 		}
1681 
1682 		// the piece picker will prioritize the pieces from the beginning to end.
1683 		// the later the suggestion is received, the higher priority we should
1684 		// ascribe to it, so we need to insert suggestions at the front of the
1685 		// queue.
1686 		if (m_suggested_pieces.end_index() > m_settings.get_int(settings_pack::max_suggest_pieces))
1687 			m_suggested_pieces.resize(m_settings.get_int(settings_pack::max_suggest_pieces) - 1);
1688 
1689 		m_suggested_pieces.insert(m_suggested_pieces.begin(), index);
1690 
1691 #ifndef TORRENT_DISABLE_LOGGING
1692 		peer_log(peer_log_alert::info, "SUGGEST_PIECE", "piece: %d added to set: %d"
1693 			, static_cast<int>(index), m_suggested_pieces.end_index());
1694 #endif
1695 	}
1696 
1697 	// -----------------------------
1698 	// ---------- UNCHOKE ----------
1699 	// -----------------------------
1700 
incoming_unchoke()1701 	void peer_connection::incoming_unchoke()
1702 	{
1703 		TORRENT_ASSERT(is_single_thread());
1704 		INVARIANT_CHECK;
1705 
1706 		std::shared_ptr<torrent> t = m_torrent.lock();
1707 		TORRENT_ASSERT(t);
1708 
1709 #ifndef TORRENT_DISABLE_EXTENSIONS
1710 		for (auto const& e : m_extensions)
1711 		{
1712 			if (e->on_unchoke()) return;
1713 		}
1714 #endif
1715 
1716 #ifndef TORRENT_DISABLE_LOGGING
1717 		peer_log(peer_log_alert::incoming_message, "UNCHOKE");
1718 #endif
1719 		if (m_peer_choked)
1720 			m_counters.inc_stats_counter(counters::num_peers_down_unchoked);
1721 
1722 		m_peer_choked = false;
1723 		m_last_unchoked = aux::time_now();
1724 		if (is_disconnecting()) return;
1725 
1726 		if (is_interesting())
1727 		{
1728 			if (request_a_block(*t, *this))
1729 				m_counters.inc_stats_counter(counters::unchoke_piece_picks);
1730 			send_block_requests();
1731 		}
1732 	}
1733 
1734 	// -----------------------------
1735 	// -------- INTERESTED ---------
1736 	// -----------------------------
1737 
incoming_interested()1738 	void peer_connection::incoming_interested()
1739 	{
1740 		TORRENT_ASSERT(is_single_thread());
1741 		INVARIANT_CHECK;
1742 
1743 		std::shared_ptr<torrent> t = m_torrent.lock();
1744 		TORRENT_ASSERT(t);
1745 
1746 #ifndef TORRENT_DISABLE_EXTENSIONS
1747 		for (auto const& e : m_extensions)
1748 		{
1749 			if (e->on_interested()) return;
1750 		}
1751 #endif
1752 
1753 #ifndef TORRENT_DISABLE_LOGGING
1754 		peer_log(peer_log_alert::incoming_message, "INTERESTED");
1755 #endif
1756 		if (m_peer_interested == false)
1757 		{
1758 			m_counters.inc_stats_counter(counters::num_peers_up_interested);
1759 			m_peer_interested = true;
1760 		}
1761 		if (is_disconnecting()) return;
1762 
1763 		// if the peer is ready to download stuff, it must have metadata
1764 		m_has_metadata = true;
1765 
1766 		disconnect_if_redundant();
1767 		if (is_disconnecting()) return;
1768 
1769 		if (t->graceful_pause())
1770 		{
1771 #ifndef TORRENT_DISABLE_LOGGING
1772 			peer_log(peer_log_alert::info, "UNCHOKE"
1773 				, "did not unchoke, graceful pause mode");
1774 #endif
1775 			return;
1776 		}
1777 
1778 		if (!is_choked())
1779 		{
1780 			// the reason to send an extra unchoke message here is that
1781 			// because of the handshake-round-trip optimization, we may
1782 			// end up sending an unchoke before the other end sends us
1783 			// an interested message. This may confuse clients, not reacting
1784 			// to the first unchoke, and then not check whether it's unchoked
1785 			// when sending the interested message. If the other end's client
1786 			// has this problem, sending another unchoke here will kick it
1787 			// to react to the fact that it's unchoked.
1788 #ifndef TORRENT_DISABLE_LOGGING
1789 			peer_log(peer_log_alert::info, "UNCHOKE", "sending redundant unchoke");
1790 #endif
1791 			write_unchoke();
1792 			return;
1793 		}
1794 
1795 		maybe_unchoke_this_peer();
1796 	}
1797 
maybe_unchoke_this_peer()1798 	void peer_connection::maybe_unchoke_this_peer()
1799 	{
1800 		TORRENT_ASSERT(is_single_thread());
1801 		if (ignore_unchoke_slots())
1802 		{
1803 #ifndef TORRENT_DISABLE_LOGGING
1804 			peer_log(peer_log_alert::info, "UNCHOKE", "about to unchoke, peer ignores unchoke slots");
1805 #endif
1806 			// if this peer is exempted from the choker
1807 			// just unchoke it immediately
1808 			send_unchoke();
1809 		}
1810 		else if (m_ses.preemptive_unchoke())
1811 		{
1812 			// if the peer is choked and we have upload slots left,
1813 			// then unchoke it.
1814 
1815 			std::shared_ptr<torrent> t = m_torrent.lock();
1816 			TORRENT_ASSERT(t);
1817 
1818 			t->unchoke_peer(*this);
1819 		}
1820 #ifndef TORRENT_DISABLE_LOGGING
1821 		else if (should_log(peer_log_alert::info))
1822 		{
1823 			peer_log(peer_log_alert::info, "UNCHOKE", "did not unchoke, the number of uploads (%d) "
1824 				"is more than or equal to the available slots (%d), limit (%d)"
1825 				, int(m_counters[counters::num_peers_up_unchoked])
1826 				, int(m_counters[counters::num_unchoke_slots])
1827 				, m_settings.get_int(settings_pack::unchoke_slots_limit));
1828 		}
1829 #endif
1830 	}
1831 
1832 	// -----------------------------
1833 	// ------ NOT INTERESTED -------
1834 	// -----------------------------
1835 
incoming_not_interested()1836 	void peer_connection::incoming_not_interested()
1837 	{
1838 		TORRENT_ASSERT(is_single_thread());
1839 		INVARIANT_CHECK;
1840 
1841 #ifndef TORRENT_DISABLE_EXTENSIONS
1842 		for (auto const& e : m_extensions)
1843 		{
1844 			if (e->on_not_interested()) return;
1845 		}
1846 #endif
1847 
1848 #ifndef TORRENT_DISABLE_LOGGING
1849 		peer_log(peer_log_alert::incoming_message, "NOT_INTERESTED");
1850 #endif
1851 		if (m_peer_interested)
1852 		{
1853 			m_counters.inc_stats_counter(counters::num_peers_up_interested, -1);
1854 			m_became_uninterested = aux::time_now();
1855 			m_peer_interested = false;
1856 		}
1857 
1858 		if (is_disconnecting()) return;
1859 
1860 		std::shared_ptr<torrent> t = m_torrent.lock();
1861 		TORRENT_ASSERT(t);
1862 
1863 		choke_this_peer();
1864 	}
1865 
choke_this_peer()1866 	void peer_connection::choke_this_peer()
1867 	{
1868 		TORRENT_ASSERT(is_single_thread());
1869 		if (is_choked()) return;
1870 		if (ignore_unchoke_slots())
1871 		{
1872 			send_choke();
1873 			return;
1874 		}
1875 
1876 		std::shared_ptr<torrent> t = m_torrent.lock();
1877 		TORRENT_ASSERT(t);
1878 
1879 		if (m_peer_info && m_peer_info->optimistically_unchoked)
1880 		{
1881 			m_peer_info->optimistically_unchoked = false;
1882 			m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
1883 			t->trigger_optimistic_unchoke();
1884 		}
1885 		t->choke_peer(*this);
1886 		t->trigger_unchoke();
1887 	}
1888 
1889 	// -----------------------------
1890 	// ----------- HAVE ------------
1891 	// -----------------------------
1892 
incoming_have(piece_index_t const index)1893 	void peer_connection::incoming_have(piece_index_t const index)
1894 	{
1895 		TORRENT_ASSERT(is_single_thread());
1896 		INVARIANT_CHECK;
1897 
1898 		std::shared_ptr<torrent> t = m_torrent.lock();
1899 		TORRENT_ASSERT(t);
1900 
1901 #ifndef TORRENT_DISABLE_EXTENSIONS
1902 		for (auto const& e : m_extensions)
1903 		{
1904 			if (e->on_have(index)) return;
1905 		}
1906 #endif
1907 
1908 		if (is_disconnecting()) return;
1909 
1910 		// if we haven't received a bitfield, it was
1911 		// probably omitted, which is the same as 'have_none'
1912 		if (!m_bitfield_received) incoming_have_none();
1913 
1914 		// if this peer is choked, there's no point in sending suggest messages to
1915 		// it. They would just be out-of-date by the time we unchoke the peer
1916 		// anyway.
1917 		if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache
1918 			&& !is_choked()
1919 			&& std::any_of(m_suggest_pieces.begin(), m_suggest_pieces.end()
1920 				, [=](piece_index_t const idx) { return idx == index; }))
1921 		{
1922 			send_piece_suggestions(2);
1923 		}
1924 
1925 #ifndef TORRENT_DISABLE_LOGGING
1926 		peer_log(peer_log_alert::incoming_message, "HAVE", "piece: %d"
1927 			, static_cast<int>(index));
1928 #endif
1929 
1930 		if (is_disconnecting()) return;
1931 
1932 		if (!t->valid_metadata() && index >= m_have_piece.end_index())
1933 		{
1934 			if (index < piece_index_t(0x200000))
1935 			{
1936 				// if we don't have metadata
1937 				// and we might not have received a bitfield
1938 				// extend the bitmask to fit the new
1939 				// have message
1940 				m_have_piece.resize(static_cast<int>(index) + 1, false);
1941 			}
1942 			else
1943 			{
1944 				// unless the index > 64k, in which case
1945 				// we just ignore it
1946 				return;
1947 			}
1948 		}
1949 
1950 		// if we got an invalid message, abort
1951 		if (index >= m_have_piece.end_index() || index < piece_index_t(0))
1952 		{
1953 #ifndef TORRENT_DISABLE_LOGGING
1954 			peer_log(peer_log_alert::info, "ERROR", "have-metadata have_piece: %d size: %d"
1955 				, static_cast<int>(index), m_have_piece.size());
1956 #endif
1957 			disconnect(errors::invalid_have, operation_t::bittorrent, peer_error);
1958 			return;
1959 		}
1960 
1961 #ifndef TORRENT_DISABLE_SUPERSEEDING
1962 		if (t->super_seeding()
1963 #if TORRENT_ABI_VERSION == 1
1964 			&& !m_settings.get_bool(settings_pack::strict_super_seeding)
1965 #endif
1966 			)
1967 		{
1968 			// if we're super-seeding and the peer just told
1969 			// us that it completed the piece we're super-seeding
1970 			// to it, change the super-seeding piece for this peer
1971 			// if the peer optimizes out redundant have messages
1972 			// this will be handled when the peer sends not-interested
1973 			// instead.
1974 			if (super_seeded_piece(index))
1975 			{
1976 				superseed_piece(index, t->get_piece_to_super_seed(m_have_piece));
1977 			}
1978 		}
1979 #endif
1980 
1981 		if (m_have_piece[index])
1982 		{
1983 #ifndef TORRENT_DISABLE_LOGGING
1984 			peer_log(peer_log_alert::incoming, "HAVE"
1985 				, "got redundant HAVE message for index: %d"
1986 				, static_cast<int>(index));
1987 #endif
1988 			return;
1989 		}
1990 
1991 		m_have_piece.set_bit(index);
1992 		++m_num_pieces;
1993 
1994 		// if the peer is downloading stuff, it must have metadata
1995 		m_has_metadata = true;
1996 
1997 		// only update the piece_picker if
1998 		// we have the metadata and if
1999 		// we're not a seed (in which case
2000 		// we won't have a piece picker)
2001 		if (!t->valid_metadata()) return;
2002 
2003 		t->peer_has(index, this);
2004 
2005 		// it's important to not disconnect before we have
2006 		// updated the piece picker, otherwise we will incorrectly
2007 		// decrement the piece count without first incrementing it
2008 		if (is_seed())
2009 		{
2010 #ifndef TORRENT_DISABLE_LOGGING
2011 			peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p"
2012 				, static_cast<void*>(m_peer_info));
2013 #endif
2014 
2015 			TORRENT_ASSERT(t->ready_for_connections());
2016 			TORRENT_ASSERT(m_have_piece.all_set());
2017 			TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size());
2018 			TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces());
2019 
2020 			t->seen_complete();
2021 			t->set_seed(m_peer_info, true);
2022 			TORRENT_ASSERT(is_seed());
2023 			m_upload_only = true;
2024 
2025 #if TORRENT_USE_INVARIANT_CHECKS
2026 			if (t && t->has_picker())
2027 				t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
2028 #endif
2029 			if (disconnect_if_redundant()) return;
2030 		}
2031 
2032 		// it's important to update whether we're interested in this peer before
2033 		// calling disconnect_if_redundant, otherwise we may disconnect even if
2034 		// we are interested
2035 		if (!t->has_piece_passed(index)
2036 			&& !t->is_upload_only()
2037 			&& !is_interesting()
2038 			&& (!t->has_picker() || t->picker().piece_priority(index) != dont_download))
2039 			t->peer_is_interesting(*this);
2040 
2041 		disconnect_if_redundant();
2042 		if (is_disconnecting()) return;
2043 
2044 #ifndef TORRENT_DISABLE_SUPERSEEDING
2045 #if TORRENT_ABI_VERSION == 1
2046 		// if we're super seeding, this might mean that somebody
2047 		// forwarded this piece. In which case we need to give
2048 		// a new piece to that peer
2049 		if (t->super_seeding()
2050 			&& m_settings.get_bool(settings_pack::strict_super_seeding)
2051 			&& (!super_seeded_piece(index) || t->num_peers() == 1))
2052 		{
2053 			for (auto& p : *t)
2054 			{
2055 				if (!p->super_seeded_piece(index)) continue;
2056 				if (!p->has_piece(index)) continue;
2057 				p->superseed_piece(index, t->get_piece_to_super_seed(p->get_bitfield()));
2058 			}
2059 		}
2060 #endif // TORRENT_ABI_VERSION
2061 #endif // TORRENT_DISABLE_SUPERSEEDING
2062 	}
2063 
2064 	// -----------------------------
2065 	// -------- DONT HAVE ----------
2066 	// -----------------------------
2067 
incoming_dont_have(piece_index_t const index)2068 	void peer_connection::incoming_dont_have(piece_index_t const index)
2069 	{
2070 		TORRENT_ASSERT(is_single_thread());
2071 		INVARIANT_CHECK;
2072 
2073 		std::shared_ptr<torrent> t = m_torrent.lock();
2074 		TORRENT_ASSERT(t);
2075 
2076 		if (index < piece_index_t{}
2077 			|| index >= t->torrent_file().end_piece())
2078 		{
2079 #ifndef TORRENT_DISABLE_LOGGING
2080 			peer_log(peer_log_alert::incoming, "DONT_HAVE"
2081 				, "invalid piece: %d", static_cast<int>(index));
2082 #endif
2083 			return;
2084 		}
2085 
2086 #ifndef TORRENT_DISABLE_EXTENSIONS
2087 		for (auto const& e : m_extensions)
2088 		{
2089 			if (e->on_dont_have(index)) return;
2090 		}
2091 #endif
2092 
2093 		if (is_disconnecting()) return;
2094 
2095 #ifndef TORRENT_DISABLE_LOGGING
2096 		peer_log(peer_log_alert::incoming_message, "DONT_HAVE", "piece: %d"
2097 			, static_cast<int>(index));
2098 #endif
2099 
2100 		// if we got an invalid message, abort
2101 		if (index >= m_have_piece.end_index() || index < piece_index_t(0))
2102 		{
2103 			disconnect(errors::invalid_dont_have, operation_t::bittorrent, peer_error);
2104 			return;
2105 		}
2106 
2107 		if (!m_have_piece[index])
2108 		{
2109 #ifndef TORRENT_DISABLE_LOGGING
2110 			peer_log(peer_log_alert::incoming, "DONT_HAVE"
2111 				, "got redundant DONT_HAVE message for index: %d"
2112 				, static_cast<int>(index));
2113 #endif
2114 			return;
2115 		}
2116 
2117 		bool const was_seed = is_seed();
2118 		m_have_piece.clear_bit(index);
2119 		TORRENT_ASSERT(m_num_pieces > 0);
2120 		--m_num_pieces;
2121 		m_have_all = false;
2122 
2123 		// only update the piece_picker if
2124 		// we have the metadata and if
2125 		// we're not a seed (in which case
2126 		// we won't have a piece picker)
2127 		if (!t->valid_metadata()) return;
2128 
2129 		t->peer_lost(index, this);
2130 
2131 		if (was_seed)
2132 		{
2133 			t->set_seed(m_peer_info, false);
2134 			TORRENT_ASSERT(!is_seed());
2135 		}
2136 	}
2137 
2138 	// -----------------------------
2139 	// --------- BITFIELD ----------
2140 	// -----------------------------
2141 
incoming_bitfield(typed_bitfield<piece_index_t> const & bits)2142 	void peer_connection::incoming_bitfield(typed_bitfield<piece_index_t> const& bits)
2143 	{
2144 		TORRENT_ASSERT(is_single_thread());
2145 		INVARIANT_CHECK;
2146 
2147 		std::shared_ptr<torrent> t = m_torrent.lock();
2148 		TORRENT_ASSERT(t);
2149 
2150 #ifndef TORRENT_DISABLE_EXTENSIONS
2151 		for (auto const& e : m_extensions)
2152 		{
2153 			if (e->on_bitfield(bits)) return;
2154 		}
2155 #endif
2156 
2157 		if (is_disconnecting()) return;
2158 
2159 #ifndef TORRENT_DISABLE_LOGGING
2160 		if (should_log(peer_log_alert::incoming_message))
2161 		{
2162 			std::string bitfield_str;
2163 			bitfield_str.resize(aux::numeric_cast<std::size_t>(bits.size()));
2164 			for (auto const i : bits.range())
2165 				bitfield_str[std::size_t(static_cast<int>(i))] = bits[i] ? '1' : '0';
2166 			peer_log(peer_log_alert::incoming_message, "BITFIELD"
2167 				, "%s", bitfield_str.c_str());
2168 		}
2169 #endif
2170 
2171 		// if we don't have the metadata, we cannot
2172 		// verify the bitfield size
2173 		if (t->valid_metadata()
2174 			&& bits.size() != m_have_piece.size())
2175 		{
2176 #ifndef TORRENT_DISABLE_LOGGING
2177 			if (should_log(peer_log_alert::incoming_message))
2178 			{
2179 				peer_log(peer_log_alert::incoming_message, "BITFIELD"
2180 					, "invalid size: %d expected %d", bits.size()
2181 					, m_have_piece.size());
2182 			}
2183 #endif
2184 			disconnect(errors::invalid_bitfield_size, operation_t::bittorrent, peer_error);
2185 			return;
2186 		}
2187 
2188 		if (m_bitfield_received)
2189 		{
2190 			// if we've already received a bitfield message
2191 			// we first need to count down all the pieces
2192 			// we believe the peer has first
2193 			t->peer_lost(m_have_piece, this);
2194 		}
2195 
2196 		m_bitfield_received = true;
2197 
2198 		// if we don't have metadata yet
2199 		// just remember the bitmask
2200 		// don't update the piecepicker
2201 		// (since it doesn't exist yet)
2202 		if (!t->ready_for_connections())
2203 		{
2204 #ifndef TORRENT_DISABLE_LOGGING
2205 			if (m_num_pieces == bits.size())
2206 				peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p"
2207 					, static_cast<void*>(m_peer_info));
2208 #endif
2209 			m_have_piece = bits;
2210 			m_num_pieces = bits.count();
2211 			t->set_seed(m_peer_info, m_num_pieces == bits.size());
2212 			TORRENT_ASSERT(is_seed() == (m_num_pieces == bits.size()));
2213 
2214 #if TORRENT_USE_INVARIANT_CHECKS
2215 			if (t && t->has_picker())
2216 				t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
2217 #endif
2218 			return;
2219 		}
2220 
2221 		TORRENT_ASSERT(t->valid_metadata());
2222 
2223 		int const num_pieces = bits.count();
2224 		t->set_seed(m_peer_info, num_pieces == m_have_piece.size());
2225 		if (num_pieces == m_have_piece.size())
2226 		{
2227 #ifndef TORRENT_DISABLE_LOGGING
2228 			peer_log(peer_log_alert::info, "SEED", "this is a seed. p: %p"
2229 				, static_cast<void*>(m_peer_info));
2230 #endif
2231 			m_upload_only = true;
2232 
2233 			m_have_piece.set_all();
2234 			m_num_pieces = num_pieces;
2235 			t->peer_has_all(this);
2236 			TORRENT_ASSERT(is_seed());
2237 
2238 			TORRENT_ASSERT(m_have_piece.all_set());
2239 			TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size());
2240 			TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces());
2241 
2242 #if TORRENT_USE_INVARIANT_CHECKS
2243 			if (t && t->has_picker())
2244 				t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
2245 #endif
2246 
2247 			// this will cause us to send the INTERESTED message
2248 			if (!t->is_upload_only())
2249 				t->peer_is_interesting(*this);
2250 
2251 			disconnect_if_redundant();
2252 
2253 			return;
2254 		}
2255 
2256 		// let the torrent know which pieces the peer has if we're a seed, we
2257 		// don't keep track of piece availability
2258 		t->peer_has(bits, this);
2259 
2260 		m_have_piece = bits;
2261 		m_num_pieces = num_pieces;
2262 
2263 		update_interest();
2264 	}
2265 
disconnect_if_redundant()2266 	bool peer_connection::disconnect_if_redundant()
2267 	{
2268 		TORRENT_ASSERT(is_single_thread());
2269 		if (m_disconnecting) return false;
2270 		if (m_need_interest_update) return false;
2271 
2272 		// we cannot disconnect in a constructor
2273 		TORRENT_ASSERT(m_in_constructor == false);
2274 		if (!m_settings.get_bool(settings_pack::close_redundant_connections)) return false;
2275 
2276 		std::shared_ptr<torrent> t = m_torrent.lock();
2277 		if (!t) return false;
2278 
2279 		// if we don't have the metadata yet, don't disconnect
2280 		// also, if the peer doesn't have metadata we shouldn't
2281 		// disconnect it, since it may want to request the
2282 		// metadata from us
2283 		if (!t->valid_metadata() || !has_metadata()) return false;
2284 
2285 #ifndef TORRENT_DISABLE_SHARE_MODE
2286 		// don't close connections in share mode, we don't know if we need them
2287 		if (t->share_mode()) return false;
2288 #endif
2289 
2290 		if (m_upload_only && t->is_upload_only()
2291 			&& can_disconnect(errors::upload_upload_connection))
2292 		{
2293 #ifndef TORRENT_DISABLE_LOGGING
2294 			peer_log(peer_log_alert::info, "UPLOAD_ONLY", "the peer is upload-only and our torrent is also upload-only");
2295 #endif
2296 			disconnect(errors::upload_upload_connection, operation_t::bittorrent);
2297 			return true;
2298 		}
2299 
2300 		if (m_upload_only
2301 			&& !m_interesting
2302 			&& m_bitfield_received
2303 			&& t->are_files_checked()
2304 			&& can_disconnect(errors::uninteresting_upload_peer))
2305 		{
2306 #ifndef TORRENT_DISABLE_LOGGING
2307 			peer_log(peer_log_alert::info, "UPLOAD_ONLY", "the peer is upload-only and we're not interested in it");
2308 #endif
2309 			disconnect(errors::uninteresting_upload_peer, operation_t::bittorrent);
2310 			return true;
2311 		}
2312 
2313 		return false;
2314 	}
2315 
can_disconnect(error_code const & ec) const2316 	bool peer_connection::can_disconnect(error_code const& ec) const
2317 	{
2318 		TORRENT_ASSERT(is_single_thread());
2319 #ifndef TORRENT_DISABLE_EXTENSIONS
2320 		for (auto const& e : m_extensions)
2321 		{
2322 			if (!e->can_disconnect(ec)) return false;
2323 		}
2324 #else
2325 		TORRENT_UNUSED(ec);
2326 #endif
2327 		return true;
2328 	}
2329 
2330 	// -----------------------------
2331 	// ---------- REQUEST ----------
2332 	// -----------------------------
2333 
incoming_request(peer_request const & r)2334 	void peer_connection::incoming_request(peer_request const& r)
2335 	{
2336 		TORRENT_ASSERT(is_single_thread());
2337 		INVARIANT_CHECK;
2338 
2339 		std::shared_ptr<torrent> t = m_torrent.lock();
2340 		TORRENT_ASSERT(t);
2341 		torrent_info const& ti = t->torrent_file();
2342 
2343 		m_counters.inc_stats_counter(counters::piece_requests);
2344 
2345 #ifndef TORRENT_DISABLE_LOGGING
2346 		const bool valid_piece_index
2347 			= r.piece >= piece_index_t(0)
2348 			&& r.piece < t->torrent_file().end_piece();
2349 
2350 		peer_log(peer_log_alert::incoming_message, "REQUEST"
2351 			, "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length);
2352 #endif
2353 
2354 #ifndef TORRENT_DISABLE_SUPERSEEDING
2355 		if (t->super_seeding()
2356 			&& !super_seeded_piece(r.piece))
2357 		{
2358 			m_counters.inc_stats_counter(counters::invalid_piece_requests);
2359 			++m_num_invalid_requests;
2360 #ifndef TORRENT_DISABLE_LOGGING
2361 			if (should_log(peer_log_alert::info))
2362 			{
2363 				peer_log(peer_log_alert::info, "INVALID_REQUEST", "piece not super-seeded "
2364 					"i: %d t: %d n: %d h: %d ss1: %d ss2: %d"
2365 					, m_peer_interested
2366 					, valid_piece_index
2367 						? t->torrent_file().piece_size(r.piece) : -1
2368 					, t->torrent_file().num_pieces()
2369 					, valid_piece_index ? t->has_piece_passed(r.piece) : 0
2370 					, static_cast<int>(m_superseed_piece[0])
2371 					, static_cast<int>(m_superseed_piece[1]));
2372 			}
2373 #endif
2374 
2375 			write_reject_request(r);
2376 
2377 			if (t->alerts().should_post<invalid_request_alert>())
2378 			{
2379 				// msvc 12 appears to deduce the rvalue reference template
2380 				// incorrectly for bool temporaries. So, create a dummy instance
2381 				bool const peer_interested = bool(m_peer_interested);
2382 				t->alerts().emplace_alert<invalid_request_alert>(
2383 					t->get_handle(), m_remote, m_peer_id, r
2384 					, t->has_piece_passed(r.piece), peer_interested, true);
2385 			}
2386 			return;
2387 		}
2388 #endif // TORRENT_DISABLE_SUPERSEEDING
2389 
2390 		// if we haven't received a bitfield, it was
2391 		// probably omitted, which is the same as 'have_none'
2392 		if (!m_bitfield_received) incoming_have_none();
2393 		if (is_disconnecting()) return;
2394 
2395 #ifndef TORRENT_DISABLE_EXTENSIONS
2396 		for (auto const& e : m_extensions)
2397 		{
2398 			if (e->on_request(r)) return;
2399 		}
2400 		if (is_disconnecting()) return;
2401 #endif
2402 
2403 		if (!t->valid_metadata())
2404 		{
2405 			m_counters.inc_stats_counter(counters::invalid_piece_requests);
2406 			// if we don't have valid metadata yet,
2407 			// we shouldn't get a request
2408 #ifndef TORRENT_DISABLE_LOGGING
2409 			peer_log(peer_log_alert::info, "INVALID_REQUEST", "we don't have metadata yet");
2410 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x no metadata"
2411 				, static_cast<int>(r.piece), r.start, r.length);
2412 #endif
2413 			write_reject_request(r);
2414 			return;
2415 		}
2416 
2417 		if (int(m_requests.size()) > m_settings.get_int(settings_pack::max_allowed_in_request_queue))
2418 		{
2419 			m_counters.inc_stats_counter(counters::max_piece_requests);
2420 			// don't allow clients to abuse our
2421 			// memory consumption.
2422 			// ignore requests if the client
2423 			// is making too many of them.
2424 #ifndef TORRENT_DISABLE_LOGGING
2425 			peer_log(peer_log_alert::info, "INVALID_REQUEST", "incoming request queue full %d"
2426 				, int(m_requests.size()));
2427 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x too many requests"
2428 				, static_cast<int>(r.piece), r.start, r.length);
2429 #endif
2430 			write_reject_request(r);
2431 			return;
2432 		}
2433 
2434 		int fast_idx = -1;
2435 		auto const fast_iter = std::find(m_accept_fast.begin()
2436 			, m_accept_fast.end(), r.piece);
2437 		if (fast_iter != m_accept_fast.end()) fast_idx = int(fast_iter - m_accept_fast.begin());
2438 
2439 		if (!m_peer_interested)
2440 		{
2441 #ifndef TORRENT_DISABLE_LOGGING
2442 			if (should_log(peer_log_alert::info))
2443 			{
2444 				peer_log(peer_log_alert::info, "INVALID_REQUEST", "peer is not interested "
2445 					" t: %d n: %d block_limit: %d"
2446 					, valid_piece_index
2447 						? t->torrent_file().piece_size(r.piece) : -1
2448 					, t->torrent_file().num_pieces()
2449 					, t->block_size());
2450 				peer_log(peer_log_alert::info, "INTERESTED", "artificial incoming INTERESTED message");
2451 			}
2452 #endif
2453 			if (t->alerts().should_post<invalid_request_alert>())
2454 			{
2455 				t->alerts().emplace_alert<invalid_request_alert>(
2456 					t->get_handle(), m_remote, m_peer_id, r
2457 					, t->has_piece_passed(r.piece)
2458 					, false, false);
2459 			}
2460 
2461 			// be lenient and pretend that the peer said it was interested
2462 			incoming_interested();
2463 		}
2464 
2465 		// make sure this request
2466 		// is legal and that the peer
2467 		// is not choked
2468 		if (r.piece < piece_index_t(0)
2469 			|| r.piece >= t->torrent_file().end_piece()
2470 			|| (!t->has_piece_passed(r.piece)
2471 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES
2472 				&& !t->is_predictive_piece(r.piece)
2473 #endif
2474 				&& !t->seed_mode())
2475 			|| r.start < 0
2476 			|| r.start >= ti.piece_size(r.piece)
2477 			|| r.length <= 0
2478 			|| r.length + r.start > ti.piece_size(r.piece)
2479 			|| r.length > t->block_size())
2480 		{
2481 			m_counters.inc_stats_counter(counters::invalid_piece_requests);
2482 
2483 #ifndef TORRENT_DISABLE_LOGGING
2484 			if (should_log(peer_log_alert::info))
2485 			{
2486 				peer_log(peer_log_alert::info, "INVALID_REQUEST"
2487 					, "i: %d t: %d n: %d h: %d block_limit: %d"
2488 					, m_peer_interested
2489 					, valid_piece_index
2490 						? t->torrent_file().piece_size(r.piece) : -1
2491 					, ti.num_pieces()
2492 					, t->has_piece_passed(r.piece)
2493 					, t->block_size());
2494 			}
2495 
2496 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE"
2497 				, "piece: %d s: %d l: %d invalid request"
2498 				, static_cast<int>(r.piece), r.start , r.length);
2499 #endif
2500 
2501 			write_reject_request(r);
2502 			++m_num_invalid_requests;
2503 
2504 			if (t->alerts().should_post<invalid_request_alert>())
2505 			{
2506 				// msvc 12 appears to deduce the rvalue reference template
2507 				// incorrectly for bool temporaries. So, create a dummy instance
2508 				bool const peer_interested = bool(m_peer_interested);
2509 				t->alerts().emplace_alert<invalid_request_alert>(
2510 					t->get_handle(), m_remote, m_peer_id, r
2511 					, t->has_piece_passed(r.piece), peer_interested, false);
2512 			}
2513 
2514 			// every ten invalid request, remind the peer that it's choked
2515 			if (!m_peer_interested && m_num_invalid_requests % 10 == 0 && m_choked)
2516 			{
2517 				// TODO: 2 this should probably be based on time instead of number
2518 				// of request messages. For a very high throughput connection, 300
2519 				// may be a legitimate number of requests to have in flight when
2520 				// getting choked
2521 				if (m_num_invalid_requests > 300 && !m_peer_choked
2522 					&& can_disconnect(errors::too_many_requests_when_choked))
2523 				{
2524 					disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error);
2525 					return;
2526 				}
2527 #ifndef TORRENT_DISABLE_LOGGING
2528 				peer_log(peer_log_alert::outgoing_message, "CHOKE");
2529 #endif
2530 				write_choke();
2531 			}
2532 
2533 			return;
2534 		}
2535 
2536 		// if we have choked the client
2537 		// ignore the request
2538 		int const blocks_per_piece =
2539 			(ti.piece_length() + t->block_size() - 1) / t->block_size();
2540 
2541 		// disconnect peers that downloads more than foo times an allowed
2542 		// fast piece
2543 		if (m_choked && fast_idx != -1 && m_accept_fast_piece_cnt[fast_idx] >= 3 * blocks_per_piece
2544 			&& can_disconnect(errors::too_many_requests_when_choked))
2545 		{
2546 			disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error);
2547 			return;
2548 		}
2549 
2550 		if (m_choked && fast_idx == -1)
2551 		{
2552 #ifndef TORRENT_DISABLE_LOGGING
2553 			peer_log(peer_log_alert::info, "REJECTING REQUEST", "peer choked and piece not in allowed fast set");
2554 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %d l: %d peer choked"
2555 				, static_cast<int>(r.piece), r.start, r.length);
2556 #endif
2557 			m_counters.inc_stats_counter(counters::choked_piece_requests);
2558 			write_reject_request(r);
2559 
2560 			// allow peers to send request up to 2 seconds after getting choked,
2561 			// then disconnect them
2562 			if (aux::time_now() - seconds(2) > m_last_choke
2563 				&& can_disconnect(errors::too_many_requests_when_choked))
2564 			{
2565 				disconnect(errors::too_many_requests_when_choked, operation_t::bittorrent, peer_error);
2566 				return;
2567 			}
2568 		}
2569 		else
2570 		{
2571 			// increase the allowed fast set counter
2572 			if (fast_idx != -1)
2573 				++m_accept_fast_piece_cnt[fast_idx];
2574 
2575 			if (m_requests.empty())
2576 				m_counters.inc_stats_counter(counters::num_peers_up_requests);
2577 
2578 			TORRENT_ASSERT(t->valid_metadata());
2579 			TORRENT_ASSERT(r.piece >= piece_index_t(0));
2580 			TORRENT_ASSERT(r.piece < t->torrent_file().end_piece());
2581 
2582 			m_requests.push_back(r);
2583 
2584 			if (t->alerts().should_post<incoming_request_alert>())
2585 			{
2586 				t->alerts().emplace_alert<incoming_request_alert>(r, t->get_handle()
2587 					, m_remote, m_peer_id);
2588 			}
2589 
2590 			m_last_incoming_request = aux::time_now();
2591 			fill_send_buffer();
2592 		}
2593 	}
2594 
2595 	// reject all requests to this piece
reject_piece(piece_index_t const index)2596 	void peer_connection::reject_piece(piece_index_t const index)
2597 	{
2598 		TORRENT_ASSERT(is_single_thread());
2599 		for (auto i = m_requests.begin(), end(m_requests.end()); i != end; ++i)
2600 		{
2601 			peer_request const& r = *i;
2602 			if (r.piece != index) continue;
2603 			write_reject_request(r);
2604 			i = m_requests.erase(i);
2605 
2606 			if (m_requests.empty())
2607 				m_counters.inc_stats_counter(counters::num_peers_up_requests, -1);
2608 		}
2609 	}
2610 
incoming_piece_fragment(int const bytes)2611 	void peer_connection::incoming_piece_fragment(int const bytes)
2612 	{
2613 		TORRENT_ASSERT(is_single_thread());
2614 		m_last_piece = aux::time_now();
2615 		TORRENT_ASSERT_VAL(m_outstanding_bytes >= bytes, m_outstanding_bytes - bytes);
2616 		m_outstanding_bytes -= bytes;
2617 		if (m_outstanding_bytes < 0) m_outstanding_bytes = 0;
2618 		std::shared_ptr<torrent> t = associated_torrent().lock();
2619 #if TORRENT_USE_ASSERTS
2620 		TORRENT_ASSERT(m_received_in_piece + bytes <= t->block_size());
2621 		m_received_in_piece += bytes;
2622 #endif
2623 
2624 		// progress of this torrent increased
2625 		t->state_updated();
2626 
2627 #if TORRENT_USE_INVARIANT_CHECKS
2628 		check_invariant();
2629 #endif
2630 	}
2631 
start_receive_piece(peer_request const & r)2632 	void peer_connection::start_receive_piece(peer_request const& r)
2633 	{
2634 		TORRENT_ASSERT(is_single_thread());
2635 #if TORRENT_USE_INVARIANT_CHECKS
2636 		check_invariant();
2637 #endif
2638 #if TORRENT_USE_ASSERTS
2639 		span<char const> recv_buffer = m_recv_buffer.get();
2640 		int recv_pos = int(recv_buffer.end() - recv_buffer.begin());
2641 		TORRENT_ASSERT(recv_pos >= 9);
2642 #endif
2643 
2644 		std::shared_ptr<torrent> t = associated_torrent().lock();
2645 		TORRENT_ASSERT(t);
2646 
2647 		if (!verify_piece(r))
2648 		{
2649 #ifndef TORRENT_DISABLE_LOGGING
2650 			peer_log(peer_log_alert::info, "INVALID_PIECE", "piece: %d s: %d l: %d"
2651 				, static_cast<int>(r.piece), r.start, r.length);
2652 #endif
2653 			disconnect(errors::invalid_piece, operation_t::bittorrent, peer_error);
2654 			return;
2655 		}
2656 
2657 		piece_block const b(r.piece, r.start / t->block_size());
2658 		m_receiving_block = b;
2659 
2660 		bool in_req_queue = false;
2661 		for (auto const& pb : m_download_queue)
2662 		{
2663 			if (pb.block != b) continue;
2664 			in_req_queue = true;
2665 			break;
2666 		}
2667 
2668 		// if this is not in the request queue, we have to
2669 		// assume our outstanding bytes includes this piece too
2670 		// if we're disconnecting, we shouldn't add pieces
2671 		if (!in_req_queue && !m_disconnecting)
2672 		{
2673 			for (auto i = m_request_queue.begin()
2674 				, end(m_request_queue.end()); i != end; ++i)
2675 			{
2676 				if (i->block != b) continue;
2677 				in_req_queue = true;
2678 				if (i - m_request_queue.begin() < m_queued_time_critical)
2679 					--m_queued_time_critical;
2680 				m_request_queue.erase(i);
2681 				break;
2682 			}
2683 
2684 			if (m_download_queue.empty())
2685 				m_counters.inc_stats_counter(counters::num_peers_down_requests);
2686 
2687 			m_download_queue.insert(m_download_queue.begin(), b);
2688 			if (!in_req_queue)
2689 			{
2690 				if (t->alerts().should_post<unwanted_block_alert>())
2691 				{
2692 					t->alerts().emplace_alert<unwanted_block_alert>(t->get_handle()
2693 						, m_remote, m_peer_id, b.block_index, b.piece_index);
2694 				}
2695 #ifndef TORRENT_DISABLE_LOGGING
2696 				peer_log(peer_log_alert::info, "INVALID_REQUEST"
2697 					, "The block we just got was not in the request queue");
2698 #endif
2699 				TORRENT_ASSERT(m_download_queue.front().block == b);
2700 				m_download_queue.front().not_wanted = true;
2701 			}
2702 			m_outstanding_bytes += r.length;
2703 		}
2704 	}
2705 
2706 #if TORRENT_USE_INVARIANT_CHECKS
2707 	struct check_postcondition
2708 	{
check_postconditionlibtorrent::check_postcondition2709 		explicit check_postcondition(std::shared_ptr<torrent> const& t_
2710 			, bool init_check = true): t(t_) { if (init_check) check(); }
2711 
~check_postconditionlibtorrent::check_postcondition2712 		~check_postcondition() { check(); }
2713 
checklibtorrent::check_postcondition2714 		void check()
2715 		{
2716 			if (!t->is_seed())
2717 			{
2718 				const int blocks_per_piece = static_cast<int>(
2719 					(t->torrent_file().piece_length() + t->block_size() - 1) / t->block_size());
2720 
2721 				std::vector<piece_picker::downloading_piece> const& dl_queue
2722 					= t->picker().get_download_queue();
2723 
2724 				for (std::vector<piece_picker::downloading_piece>::const_iterator i =
2725 					dl_queue.begin(); i != dl_queue.end(); ++i)
2726 				{
2727 					TORRENT_ASSERT(i->finished <= blocks_per_piece);
2728 				}
2729 			}
2730 		}
2731 
2732 		std::shared_ptr<torrent> t;
2733 	};
2734 #endif
2735 
2736 
2737 	// -----------------------------
2738 	// ----------- PIECE -----------
2739 	// -----------------------------
2740 
incoming_piece(peer_request const & p,char const * data)2741 	void peer_connection::incoming_piece(peer_request const& p, char const* data)
2742 	{
2743 		TORRENT_ASSERT(is_single_thread());
2744 		INVARIANT_CHECK;
2745 
2746 		std::shared_ptr<torrent> t = m_torrent.lock();
2747 		TORRENT_ASSERT(t);
2748 
2749 		// we're not receiving any block right now
2750 		m_receiving_block = piece_block::invalid;
2751 
2752 #ifdef TORRENT_CORRUPT_DATA
2753 		// corrupt all pieces from certain peers
2754 		if (is_v4(m_remote)
2755 			&& (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
2756 		{
2757 			data[0] = ~data[0];
2758 		}
2759 #endif
2760 
2761 		// if we haven't received a bitfield, it was
2762 		// probably omitted, which is the same as 'have_none'
2763 		if (!m_bitfield_received) incoming_have_none();
2764 		if (is_disconnecting()) return;
2765 
2766 		// slow-start
2767 		if (m_slow_start)
2768 			m_desired_queue_size += 1;
2769 
2770 		update_desired_queue_size();
2771 
2772 #ifndef TORRENT_DISABLE_EXTENSIONS
2773 		for (auto const& e : m_extensions)
2774 		{
2775 			if (e->on_piece(p, {data, p.length}))
2776 			{
2777 #if TORRENT_USE_ASSERTS
2778 				TORRENT_ASSERT(m_received_in_piece == p.length);
2779 				m_received_in_piece = 0;
2780 #endif
2781 				return;
2782 			}
2783 		}
2784 #endif
2785 		if (is_disconnecting()) return;
2786 
2787 #if TORRENT_USE_INVARIANT_CHECKS
2788 		check_postcondition post_checker_(t);
2789 #if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
2790 		t->check_invariant();
2791 #endif
2792 #endif
2793 
2794 #ifndef TORRENT_DISABLE_LOGGING
2795 		if (should_log(peer_log_alert::incoming_message))
2796 		{
2797 			peer_log(peer_log_alert::incoming_message, "PIECE", "piece: %d s: %x l: %x ds: %d qs: %d q: %d"
2798 				, static_cast<int>(p.piece), p.start, p.length, statistics().download_rate()
2799 				, int(m_desired_queue_size), int(m_download_queue.size()));
2800 		}
2801 #endif
2802 
2803 		if (p.length == 0)
2804 		{
2805 			if (t->alerts().should_post<peer_error_alert>())
2806 			{
2807 				t->alerts().emplace_alert<peer_error_alert>(t->get_handle(), m_remote
2808 					, m_peer_id, operation_t::bittorrent, errors::peer_sent_empty_piece);
2809 			}
2810 			// This is used as a reject-request by bitcomet
2811 			incoming_reject_request(p);
2812 			return;
2813 		}
2814 
2815 		// if we're already seeding, don't bother,
2816 		// just ignore it
2817 		if (t->is_seed())
2818 		{
2819 #if TORRENT_USE_ASSERTS
2820 			TORRENT_ASSERT(m_received_in_piece == p.length);
2821 			m_received_in_piece = 0;
2822 #endif
2823 			if (!m_download_queue.empty())
2824 			{
2825 				m_download_queue.erase(m_download_queue.begin());
2826 				if (m_download_queue.empty())
2827 					m_counters.inc_stats_counter(counters::num_peers_down_requests, -1);
2828 			}
2829 			t->add_redundant_bytes(p.length, waste_reason::piece_seed);
2830 			return;
2831 		}
2832 
2833 		time_point const now = clock_type::now();
2834 
2835 		t->need_picker();
2836 
2837 		piece_picker& picker = t->picker();
2838 
2839 		piece_block block_finished(p.piece, p.start / t->block_size());
2840 		TORRENT_ASSERT(verify_piece(p));
2841 
2842 		auto const b = std::find_if(m_download_queue.begin()
2843 			, m_download_queue.end(), aux::has_block(block_finished));
2844 
2845 		if (b == m_download_queue.end())
2846 		{
2847 			if (t->alerts().should_post<unwanted_block_alert>())
2848 			{
2849 				t->alerts().emplace_alert<unwanted_block_alert>(t->get_handle()
2850 					, m_remote, m_peer_id, block_finished.block_index
2851 					, block_finished.piece_index);
2852 			}
2853 #ifndef TORRENT_DISABLE_LOGGING
2854 			peer_log(peer_log_alert::info, "INVALID_REQUEST", "The block we just got was not in the request queue");
2855 #endif
2856 #if TORRENT_USE_ASSERTS
2857 			TORRENT_ASSERT_VAL(m_received_in_piece == p.length, m_received_in_piece);
2858 			m_received_in_piece = 0;
2859 #endif
2860 			t->add_redundant_bytes(p.length, waste_reason::piece_unknown);
2861 
2862 			// the bytes of the piece we just completed have been deducted from
2863 			// m_outstanding_bytes as we received it, in incoming_piece_fragment.
2864 			// however, it now turns out the piece we received wasn't in the
2865 			// download queue, so we still have the same number of pieces in the
2866 			// download queue, which is why we need to add the bytes back.
2867 			m_outstanding_bytes += p.length;
2868 #if TORRENT_USE_INVARIANT_CHECKS
2869 			check_invariant();
2870 #endif
2871 			return;
2872 		}
2873 
2874 #if TORRENT_USE_ASSERTS
2875 		TORRENT_ASSERT_VAL(m_received_in_piece == p.length, m_received_in_piece);
2876 		m_received_in_piece = 0;
2877 #endif
2878 		// if the block we got is already finished, then ignore it
2879 		if (picker.is_downloaded(block_finished))
2880 		{
2881 			waste_reason const reason
2882 				= (b->timed_out) ? waste_reason::piece_timed_out
2883 				: (b->not_wanted) ? waste_reason::piece_cancelled
2884 				: (b->busy) ? waste_reason::piece_end_game
2885 				: waste_reason::piece_unknown;
2886 
2887 			t->add_redundant_bytes(p.length, reason);
2888 
2889 			m_download_queue.erase(b);
2890 			if (m_download_queue.empty())
2891 				m_counters.inc_stats_counter(counters::num_peers_down_requests, -1);
2892 
2893 			if (m_disconnecting) return;
2894 
2895 			m_request_time.add_sample(int(total_milliseconds(now - m_requested)));
2896 #ifndef TORRENT_DISABLE_LOGGING
2897 			if (should_log(peer_log_alert::info))
2898 			{
2899 				peer_log(peer_log_alert::info, "REQUEST_TIME", "%d +- %d ms"
2900 					, m_request_time.mean(), m_request_time.avg_deviation());
2901 			}
2902 #endif
2903 
2904 			// we completed an incoming block, and there are still outstanding
2905 			// requests. The next block we expect to receive now has another
2906 			// timeout period until we time out. So, reset the timer.
2907 			if (!m_download_queue.empty())
2908 				m_requested = now;
2909 
2910 			if (request_a_block(*t, *this))
2911 				m_counters.inc_stats_counter(counters::incoming_redundant_piece_picks);
2912 			send_block_requests();
2913 			return;
2914 		}
2915 
2916 		// we received a request within the timeout, make sure this peer is
2917 		// not snubbed anymore
2918 		if (total_seconds(now - m_requested) < request_timeout()
2919 			&& m_snubbed)
2920 		{
2921 			m_snubbed = false;
2922 			if (t->alerts().should_post<peer_unsnubbed_alert>())
2923 			{
2924 				t->alerts().emplace_alert<peer_unsnubbed_alert>(t->get_handle()
2925 					, m_remote, m_peer_id);
2926 			}
2927 		}
2928 
2929 #ifndef TORRENT_DISABLE_LOGGING
2930 		peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE", "piece: %d s: %x l: %x"
2931 			, static_cast<int>(p.piece), p.start, p.length);
2932 #endif
2933 		m_download_queue.erase(b);
2934 		if (m_download_queue.empty())
2935 			m_counters.inc_stats_counter(counters::num_peers_down_requests, -1);
2936 
2937 		if (t->is_deleted()) return;
2938 
2939 		auto conn = self();
2940 		bool const exceeded = m_disk_thread.async_write(t->storage(), p, data, self()
2941 			, [conn, p, t] (storage_error const& e)
2942 			{ conn->wrap(&peer_connection::on_disk_write_complete, e, p, t); });
2943 
2944 		// every peer is entitled to have two disk blocks allocated at any given
2945 		// time, regardless of whether the cache size is exceeded or not. If this
2946 		// was not the case, when the cache size setting is very small, most peers
2947 		// would be blocked most of the time, because the disk cache would
2948 		// continuously be in exceeded state. Only rarely would it actually drop
2949 		// down to 0 and unblock all peers.
2950 		if (exceeded && m_outstanding_writing_bytes > 0)
2951 		{
2952 			if (!(m_channel_state[download_channel] & peer_info::bw_disk))
2953 				m_counters.inc_stats_counter(counters::num_peers_down_disk);
2954 			m_channel_state[download_channel] |= peer_info::bw_disk;
2955 #ifndef TORRENT_DISABLE_LOGGING
2956 			peer_log(peer_log_alert::info, "DISK", "exceeded disk buffer watermark");
2957 #endif
2958 		}
2959 
2960 		std::int64_t const write_queue_size = m_counters.inc_stats_counter(
2961 			counters::queued_write_bytes, p.length);
2962 		m_outstanding_writing_bytes += p.length;
2963 
2964 		std::int64_t const max_queue_size = m_settings.get_int(
2965 			settings_pack::max_queued_disk_bytes);
2966 		if (write_queue_size > max_queue_size
2967 			&& write_queue_size - p.length < max_queue_size
2968 			&& m_settings.get_int(settings_pack::cache_size) > 5
2969 			&& t->alerts().should_post<performance_alert>())
2970 		{
2971 			t->alerts().emplace_alert<performance_alert>(t->get_handle()
2972 				, performance_alert::too_high_disk_queue_limit);
2973 		}
2974 
2975 		m_request_time.add_sample(int(total_milliseconds(now - m_requested)));
2976 #ifndef TORRENT_DISABLE_LOGGING
2977 		if (should_log(peer_log_alert::info))
2978 		{
2979 			peer_log(peer_log_alert::info, "REQUEST_TIME", "%d +- %d ms"
2980 				, m_request_time.mean(), m_request_time.avg_deviation());
2981 		}
2982 #endif
2983 
2984 		// we completed an incoming block, and there are still outstanding
2985 		// requests. The next block we expect to receive now has another
2986 		// timeout period until we time out. So, reset the timer.
2987 		if (!m_download_queue.empty())
2988 			m_requested = now;
2989 
2990 		bool const was_finished = picker.is_piece_finished(p.piece);
2991 		// did we request this block from any other peers?
2992 		bool const multi = picker.num_peers(block_finished) > 1;
2993 //		std::fprintf(stderr, "peer_connection mark_as_writing peer: %p piece: %d block: %d\n"
2994 //			, peer_info_struct(), block_finished.piece_index, block_finished.block_index);
2995 		picker.mark_as_writing(block_finished, peer_info_struct());
2996 
2997 		TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
2998 		// if we requested this block from other peers, cancel it now
2999 		if (multi) t->cancel_block(block_finished);
3000 
3001 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES
3002 		if (m_settings.get_int(settings_pack::predictive_piece_announce))
3003 		{
3004 			piece_index_t const piece = block_finished.piece_index;
3005 			piece_picker::downloading_piece st;
3006 			t->picker().piece_info(piece, st);
3007 
3008 			int const num_blocks = t->picker().blocks_in_piece(piece);
3009 			if (st.requested > 0 && st.writing + st.finished + st.requested == num_blocks)
3010 			{
3011 				std::vector<torrent_peer*> d;
3012 				t->picker().get_downloaders(d, piece);
3013 				if (d.size() == 1)
3014 				{
3015 					// only make predictions if all remaining
3016 					// blocks are requested from the same peer
3017 					torrent_peer* peer = d[0];
3018 					if (peer->connection)
3019 					{
3020 						// we have a connection. now, what is the current
3021 						// download rate from this peer, and how many blocks
3022 						// do we have left to download?
3023 						std::int64_t const rate = peer->connection->statistics().download_payload_rate();
3024 						std::int64_t const bytes_left = std::int64_t(st.requested) * t->block_size();
3025 						// the settings unit is milliseconds, so calculate the
3026 						// number of milliseconds worth of bytes left in the piece
3027 						if (rate > 1000
3028 							&& (bytes_left * 1000) / rate < m_settings.get_int(settings_pack::predictive_piece_announce))
3029 						{
3030 							// we predict we will complete this piece very soon.
3031 							t->predicted_have_piece(piece, int((bytes_left * 1000) / rate));
3032 						}
3033 					}
3034 				}
3035 			}
3036 		}
3037 #endif // TORRENT_DISABLE_PREDICTIVE_PIECES
3038 
3039 		TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
3040 
3041 #if TORRENT_USE_INVARIANT_CHECKS \
3042 	&& defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
3043 		t->check_invariant();
3044 #endif
3045 
3046 #if TORRENT_USE_ASSERTS
3047 		piece_picker::downloading_piece pi;
3048 		picker.piece_info(p.piece, pi);
3049 		int num_blocks = picker.blocks_in_piece(p.piece);
3050 		TORRENT_ASSERT(pi.writing + pi.finished + pi.requested <= num_blocks);
3051 		TORRENT_ASSERT(picker.is_piece_finished(p.piece) == (pi.writing + pi.finished == num_blocks));
3052 #endif
3053 
3054 		// did we just finish the piece?
3055 		// this means all blocks are either written
3056 		// to disk or are in the disk write cache
3057 		if (picker.is_piece_finished(p.piece) && !was_finished)
3058 		{
3059 #if TORRENT_USE_INVARIANT_CHECKS
3060 			check_postcondition post_checker2_(t, false);
3061 #endif
3062 			t->verify_piece(p.piece);
3063 		}
3064 
3065 		check_graceful_pause();
3066 
3067 		if (is_disconnecting()) return;
3068 
3069 		if (request_a_block(*t, *this))
3070 			m_counters.inc_stats_counter(counters::incoming_piece_picks);
3071 		send_block_requests();
3072 	}
3073 
check_graceful_pause()3074 	void peer_connection::check_graceful_pause()
3075 	{
3076 		// TODO: 3 instead of having to ask the torrent whether it's in graceful
3077 		// pause mode or not, the peers should keep that state (and the torrent
3078 		// should update them when it enters graceful pause). When a peer enters
3079 		// graceful pause mode, it should cancel all outstanding requests and
3080 		// clear its request queue.
3081 		std::shared_ptr<torrent> t = m_torrent.lock();
3082 		if (!t || !t->graceful_pause()) return;
3083 
3084 		if (m_outstanding_bytes > 0) return;
3085 
3086 #ifndef TORRENT_DISABLE_LOGGING
3087 		peer_log(peer_log_alert::info, "GRACEFUL_PAUSE", "NO MORE DOWNLOAD");
3088 #endif
3089 		disconnect(errors::torrent_paused, operation_t::bittorrent);
3090 	}
3091 
on_disk_write_complete(storage_error const & error,peer_request const & p,std::shared_ptr<torrent> t)3092 	void peer_connection::on_disk_write_complete(storage_error const& error
3093 		, peer_request const& p, std::shared_ptr<torrent> t)
3094 	{
3095 		TORRENT_ASSERT(is_single_thread());
3096 #ifndef TORRENT_DISABLE_LOGGING
3097 		if (should_log(peer_log_alert::info))
3098 		{
3099 			peer_log(peer_log_alert::info, "FILE_ASYNC_WRITE_COMPLETE", "piece: %d s: %x l: %x e: %s"
3100 				, static_cast<int>(p.piece), p.start, p.length, error.ec.message().c_str());
3101 		}
3102 #endif
3103 
3104 		m_counters.inc_stats_counter(counters::queued_write_bytes, -p.length);
3105 		m_outstanding_writing_bytes -= p.length;
3106 
3107 		TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
3108 
3109 		// every peer is entitled to allocate a disk buffer if it has no writes outstanding
3110 		// see the comment in incoming_piece
3111 		if (m_outstanding_writing_bytes == 0
3112 			&& m_channel_state[download_channel] & peer_info::bw_disk)
3113 		{
3114 			m_counters.inc_stats_counter(counters::num_peers_down_disk, -1);
3115 			m_channel_state[download_channel] &= ~peer_info::bw_disk;
3116 		}
3117 
3118 		INVARIANT_CHECK;
3119 
3120 		if (!t)
3121 		{
3122 			disconnect(error.ec, operation_t::file_write);
3123 			return;
3124 		}
3125 
3126 		// in case the outstanding bytes just dropped down
3127 		// to allow to receive more data
3128 		setup_receive();
3129 
3130 		piece_block const block_finished(p.piece, p.start / t->block_size());
3131 
3132 		if (error)
3133 		{
3134 			// we failed to write the piece to disk tell the piece picker
3135 			// this will block any other peer from issuing requests
3136 			// to this piece, until we've cleared it.
3137 			if (error.ec == boost::asio::error::operation_aborted)
3138 			{
3139 				if (t->has_picker())
3140 					t->picker().mark_as_canceled(block_finished, nullptr);
3141 			}
3142 			else
3143 			{
3144 				// if any other peer has a busy request to this block, we need
3145 				// to cancel it too
3146 				t->cancel_block(block_finished);
3147 				if (t->has_picker())
3148 					t->picker().write_failed(block_finished);
3149 
3150 				if (t->has_storage())
3151 				{
3152 					// when this returns, all outstanding jobs to the
3153 					// piece are done, and we can restore it, allowing
3154 					// new requests to it
3155 					m_disk_thread.async_clear_piece(t->storage(), p.piece
3156 						, [t, block_finished] (piece_index_t pi)
3157 						{ t->wrap(&torrent::on_piece_fail_sync, pi, block_finished); });
3158 				}
3159 				else
3160 				{
3161 					// is m_abort true? if so, we should probably just
3162 					// exit this function early, no need to keep the picker
3163 					// state up-to-date, right?
3164 					t->on_piece_fail_sync(p.piece, block_finished);
3165 				}
3166 			}
3167 			t->update_gauge();
3168 			// handle_disk_error may disconnect us
3169 			t->handle_disk_error("write", error, this, torrent::disk_class::write);
3170 			return;
3171 		}
3172 
3173 		if (!t->has_picker()) return;
3174 
3175 		piece_picker& picker = t->picker();
3176 
3177 		TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
3178 
3179 //		std::fprintf(stderr, "peer_connection mark_as_finished peer: %p piece: %d block: %d\n"
3180 //			, peer_info_struct(), block_finished.piece_index, block_finished.block_index);
3181 		picker.mark_as_finished(block_finished, peer_info_struct());
3182 
3183 		t->maybe_done_flushing();
3184 
3185 		if (t->alerts().should_post<block_finished_alert>())
3186 		{
3187 			t->alerts().emplace_alert<block_finished_alert>(t->get_handle(),
3188 				remote(), pid(), block_finished.block_index
3189 				, block_finished.piece_index);
3190 		}
3191 
3192 		disconnect_if_redundant();
3193 
3194 		if (m_disconnecting) return;
3195 
3196 #if TORRENT_USE_ASSERTS
3197 		if (t->has_picker())
3198 		{
3199 			auto const& q = picker.get_download_queue();
3200 
3201 			for (auto const& dp : q)
3202 			{
3203 				if (dp.index != block_finished.piece_index) continue;
3204 				auto const info = picker.blocks_for_piece(dp);
3205 				TORRENT_ASSERT(info[block_finished.block_index].state
3206 					== piece_picker::block_info::state_finished);
3207 			}
3208 		}
3209 #endif
3210 		if (t->is_aborted()) return;
3211 	}
3212 
3213 	// -----------------------------
3214 	// ---------- CANCEL -----------
3215 	// -----------------------------
3216 
incoming_cancel(peer_request const & r)3217 	void peer_connection::incoming_cancel(peer_request const& r)
3218 	{
3219 		TORRENT_ASSERT(is_single_thread());
3220 		INVARIANT_CHECK;
3221 
3222 #ifndef TORRENT_DISABLE_EXTENSIONS
3223 		for (auto const& e : m_extensions)
3224 		{
3225 			if (e->on_cancel(r)) return;
3226 		}
3227 #endif
3228 		if (is_disconnecting()) return;
3229 
3230 #ifndef TORRENT_DISABLE_LOGGING
3231 		peer_log(peer_log_alert::incoming_message, "CANCEL"
3232 			, "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length);
3233 #endif
3234 
3235 		auto const i = std::find(m_requests.begin(), m_requests.end(), r);
3236 
3237 		if (i != m_requests.end())
3238 		{
3239 			m_counters.inc_stats_counter(counters::cancelled_piece_requests);
3240 			m_requests.erase(i);
3241 
3242 			if (m_requests.empty())
3243 				m_counters.inc_stats_counter(counters::num_peers_up_requests, -1);
3244 
3245 #ifndef TORRENT_DISABLE_LOGGING
3246 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE", "piece: %d s: %x l: %x cancelled"
3247 				, static_cast<int>(r.piece), r.start , r.length);
3248 #endif
3249 			write_reject_request(r);
3250 		}
3251 		else
3252 		{
3253 			// TODO: 2 since we throw away the queue entry once we issue
3254 			// the disk job, this may happen. Instead, we should keep the
3255 			// queue entry around, mark it as having been requested from
3256 			// disk and once the disk job comes back, discard it if it has
3257 			// been cancelled. Maybe even be able to cancel disk jobs?
3258 #ifndef TORRENT_DISABLE_LOGGING
3259 			peer_log(peer_log_alert::info, "INVALID_CANCEL", "got cancel not in the queue");
3260 #endif
3261 		}
3262 	}
3263 
3264 	// -----------------------------
3265 	// --------- DHT PORT ----------
3266 	// -----------------------------
3267 
incoming_dht_port(int const listen_port)3268 	void peer_connection::incoming_dht_port(int const listen_port)
3269 	{
3270 		TORRENT_ASSERT(is_single_thread());
3271 		INVARIANT_CHECK;
3272 
3273 #ifndef TORRENT_DISABLE_LOGGING
3274 		peer_log(peer_log_alert::incoming_message, "DHT_PORT", "p: %d", listen_port);
3275 #endif
3276 #ifndef TORRENT_DISABLE_DHT
3277 		m_ses.add_dht_node({m_remote.address(), std::uint16_t(listen_port)});
3278 #else
3279 		TORRENT_UNUSED(listen_port);
3280 #endif
3281 	}
3282 
3283 	// -----------------------------
3284 	// --------- HAVE ALL ----------
3285 	// -----------------------------
3286 
incoming_have_all()3287 	void peer_connection::incoming_have_all()
3288 	{
3289 		TORRENT_ASSERT(is_single_thread());
3290 		INVARIANT_CHECK;
3291 
3292 		std::shared_ptr<torrent> t = m_torrent.lock();
3293 		TORRENT_ASSERT(t);
3294 
3295 		// we cannot disconnect in a constructor, and
3296 		// this function may end up doing that
3297 		TORRENT_ASSERT(m_in_constructor == false);
3298 
3299 #ifndef TORRENT_DISABLE_LOGGING
3300 		peer_log(peer_log_alert::incoming_message, "HAVE_ALL");
3301 #endif
3302 
3303 #ifndef TORRENT_DISABLE_EXTENSIONS
3304 		for (auto const& e : m_extensions)
3305 		{
3306 			if (e->on_have_all()) return;
3307 		}
3308 #endif
3309 		if (is_disconnecting()) return;
3310 
3311 		if (m_bitfield_received)
3312 			t->peer_lost(m_have_piece, this);
3313 
3314 		m_have_all = true;
3315 
3316 #ifndef TORRENT_DISABLE_LOGGING
3317 		peer_log(peer_log_alert::info, "SEED", "this is a seed p: %p"
3318 			, static_cast<void*>(m_peer_info));
3319 #endif
3320 
3321 		t->set_seed(m_peer_info, true);
3322 		m_upload_only = true;
3323 		m_bitfield_received = true;
3324 
3325 		// if we don't have metadata yet
3326 		// just remember the bitmask
3327 		// don't update the piecepicker
3328 		// (since it doesn't exist yet)
3329 		if (!t->ready_for_connections())
3330 		{
3331 			// assume seeds are interesting when we
3332 			// don't even have the metadata
3333 			t->peer_is_interesting(*this);
3334 
3335 			disconnect_if_redundant();
3336 			return;
3337 		}
3338 
3339 		TORRENT_ASSERT(!m_have_piece.empty());
3340 		m_have_piece.set_all();
3341 		m_num_pieces = m_have_piece.size();
3342 
3343 		t->peer_has_all(this);
3344 
3345 #if TORRENT_USE_INVARIANT_CHECKS
3346 		if (t && t->has_picker())
3347 			t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
3348 #endif
3349 
3350 		TORRENT_ASSERT(m_have_piece.all_set());
3351 		TORRENT_ASSERT(m_have_piece.count() == m_have_piece.size());
3352 		TORRENT_ASSERT(m_have_piece.size() == t->torrent_file().num_pieces());
3353 
3354 		// if we're finished, we're not interested
3355 		if (t->is_upload_only()) send_not_interested();
3356 		else t->peer_is_interesting(*this);
3357 
3358 		disconnect_if_redundant();
3359 	}
3360 
3361 	// -----------------------------
3362 	// --------- HAVE NONE ---------
3363 	// -----------------------------
3364 
incoming_have_none()3365 	void peer_connection::incoming_have_none()
3366 	{
3367 		TORRENT_ASSERT(is_single_thread());
3368 		INVARIANT_CHECK;
3369 
3370 #ifndef TORRENT_DISABLE_LOGGING
3371 		peer_log(peer_log_alert::incoming_message, "HAVE_NONE");
3372 #endif
3373 
3374 		std::shared_ptr<torrent> t = m_torrent.lock();
3375 		TORRENT_ASSERT(t);
3376 
3377 #ifndef TORRENT_DISABLE_EXTENSIONS
3378 		for (auto const& e : m_extensions)
3379 		{
3380 			if (e->on_have_none()) return;
3381 		}
3382 #endif
3383 		if (is_disconnecting()) return;
3384 
3385 		if (m_bitfield_received)
3386 			t->peer_lost(m_have_piece, this);
3387 
3388 		t->set_seed(m_peer_info, false);
3389 		m_bitfield_received = true;
3390 		m_have_all = false;
3391 
3392 		m_have_piece.clear_all();
3393 		m_num_pieces = 0;
3394 
3395 		TORRENT_ASSERT(!is_seed());
3396 
3397 		// if the peer is ready to download stuff, it must have metadata
3398 		m_has_metadata = true;
3399 
3400 		// we're never interested in a peer that doesn't have anything
3401 		send_not_interested();
3402 
3403 		TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
3404 		disconnect_if_redundant();
3405 	}
3406 
3407 	// -----------------------------
3408 	// ------- ALLOWED FAST --------
3409 	// -----------------------------
3410 
incoming_allowed_fast(piece_index_t const index)3411 	void peer_connection::incoming_allowed_fast(piece_index_t const index)
3412 	{
3413 		TORRENT_ASSERT(is_single_thread());
3414 		INVARIANT_CHECK;
3415 
3416 		std::shared_ptr<torrent> t = m_torrent.lock();
3417 		TORRENT_ASSERT(t);
3418 
3419 #ifndef TORRENT_DISABLE_LOGGING
3420 		peer_log(peer_log_alert::incoming_message, "ALLOWED_FAST", "%d"
3421 			, static_cast<int>(index));
3422 #endif
3423 
3424 #ifndef TORRENT_DISABLE_EXTENSIONS
3425 		for (auto const& e : m_extensions)
3426 		{
3427 			if (e->on_allowed_fast(index)) return;
3428 		}
3429 #endif
3430 		if (is_disconnecting()) return;
3431 
3432 		if (index < piece_index_t(0))
3433 		{
3434 #ifndef TORRENT_DISABLE_LOGGING
3435 			peer_log(peer_log_alert::incoming_message, "INVALID_ALLOWED_FAST"
3436 				, "%d", static_cast<int>(index));
3437 #endif
3438 			return;
3439 		}
3440 
3441 		if (t->valid_metadata())
3442 		{
3443 			if (index >= m_have_piece.end_index())
3444 			{
3445 #ifndef TORRENT_DISABLE_LOGGING
3446 				peer_log(peer_log_alert::incoming_message, "INVALID_ALLOWED_FAST"
3447 					, "%d s: %d", static_cast<int>(index), m_have_piece.size());
3448 #endif
3449 				return;
3450 			}
3451 
3452 			// if we already have the piece, we can
3453 			// ignore this message
3454 			if (t->have_piece(index))
3455 				return;
3456 		}
3457 
3458 		// if we don't have the metadata, we'll verify
3459 		// this piece index later
3460 		m_allowed_fast.push_back(index);
3461 
3462 		// if the peer has the piece and we want
3463 		// to download it, request it
3464 		if (index < m_have_piece.end_index()
3465 			&& m_have_piece[index]
3466 			&& !t->has_piece_passed(index)
3467 			&& t->valid_metadata()
3468 			&& t->has_picker()
3469 			&& t->picker().piece_priority(index) > dont_download)
3470 		{
3471 			t->peer_is_interesting(*this);
3472 		}
3473 	}
3474 
allowed_fast()3475 	std::vector<piece_index_t> const& peer_connection::allowed_fast()
3476 	{
3477 		TORRENT_ASSERT(is_single_thread());
3478 		std::shared_ptr<torrent> t = m_torrent.lock();
3479 		TORRENT_ASSERT(t);
3480 
3481 		// TODO: sort the allowed fast set in priority order
3482 		return m_allowed_fast;
3483 	}
3484 
can_request_time_critical() const3485 	bool peer_connection::can_request_time_critical() const
3486 	{
3487 		TORRENT_ASSERT(is_single_thread());
3488 		if (has_peer_choked() || !is_interesting()) return false;
3489 		if (int(m_download_queue.size()) + int(m_request_queue.size())
3490 			> m_desired_queue_size * 2) return false;
3491 		if (on_parole()) return false;
3492 		if (m_disconnecting) return false;
3493 		std::shared_ptr<torrent> t = m_torrent.lock();
3494 		TORRENT_ASSERT(t);
3495 		if (t->upload_mode()) return false;
3496 
3497 		// ignore snubbed peers, since they're not likely to return pieces in a
3498 		// timely manner anyway
3499 		if (m_snubbed) return false;
3500 		return true;
3501 	}
3502 
make_time_critical(piece_block const & block)3503 	bool peer_connection::make_time_critical(piece_block const& block)
3504 	{
3505 		TORRENT_ASSERT(is_single_thread());
3506 		auto const rit = std::find_if(m_request_queue.begin()
3507 			, m_request_queue.end(), aux::has_block(block));
3508 		if (rit == m_request_queue.end()) return false;
3509 #if TORRENT_USE_ASSERTS
3510 		std::shared_ptr<torrent> t = m_torrent.lock();
3511 		TORRENT_ASSERT(t);
3512 		TORRENT_ASSERT(t->has_picker());
3513 		TORRENT_ASSERT(t->picker().is_requested(block));
3514 #endif
3515 		// ignore it if it's already time critical
3516 		if (rit - m_request_queue.begin() < m_queued_time_critical) return false;
3517 		pending_block b = *rit;
3518 		m_request_queue.erase(rit);
3519 		m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical, b);
3520 		++m_queued_time_critical;
3521 		return true;
3522 	}
3523 
add_request(piece_block const & block,request_flags_t const flags)3524 	bool peer_connection::add_request(piece_block const& block
3525 		, request_flags_t const flags)
3526 	{
3527 		TORRENT_ASSERT(is_single_thread());
3528 		INVARIANT_CHECK;
3529 
3530 		std::shared_ptr<torrent> t = m_torrent.lock();
3531 		TORRENT_ASSERT(t);
3532 
3533 		TORRENT_ASSERT(!m_disconnecting);
3534 		TORRENT_ASSERT(t->valid_metadata());
3535 
3536 		TORRENT_ASSERT(block.block_index != piece_block::invalid.block_index);
3537 		TORRENT_ASSERT(block.piece_index != piece_block::invalid.piece_index);
3538 		TORRENT_ASSERT(block.piece_index < t->torrent_file().end_piece());
3539 		TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
3540 		TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
3541 		TORRENT_ASSERT(!t->have_piece(block.piece_index));
3542 		TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
3543 			, aux::has_block(block)) == m_download_queue.end());
3544 		TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
3545 			, block) == m_request_queue.end());
3546 
3547 		if (t->upload_mode())
3548 		{
3549 #ifndef TORRENT_DISABLE_LOGGING
3550 			peer_log(peer_log_alert::info, "PIECE_PICKER"
3551 				, "not_picking: %d,%d upload_mode"
3552 				, static_cast<int>(block.piece_index), block.block_index);
3553 #endif
3554 			return false;
3555 		}
3556 		if (m_disconnecting)
3557 		{
3558 #ifndef TORRENT_DISABLE_LOGGING
3559 			peer_log(peer_log_alert::info, "PIECE_PICKER"
3560 				, "not_picking: %d,%d disconnecting"
3561 				, static_cast<int>(block.piece_index), block.block_index);
3562 #endif
3563 			return false;
3564 		}
3565 
3566 		if ((flags & busy) && !(flags & time_critical))
3567 		{
3568 			// this block is busy (i.e. it has been requested
3569 			// from another peer already). Only allow one busy
3570 			// request in the pipeline at the time
3571 			// this rule does not apply to time critical pieces,
3572 			// in which case we are allowed to pick more than one
3573 			// busy blocks
3574 			if (std::any_of(m_download_queue.begin(), m_download_queue.end()
3575 				, [](pending_block const& i) { return i.busy; }))
3576 			{
3577 #ifndef TORRENT_DISABLE_LOGGING
3578 				peer_log(peer_log_alert::info, "PIECE_PICKER"
3579 					, "not_picking: %d,%d already in download queue & busy"
3580 					, static_cast<int>(block.piece_index), block.block_index);
3581 #endif
3582 				return false;
3583 			}
3584 
3585 			if (std::any_of(m_request_queue.begin(), m_request_queue.end()
3586 				, [](pending_block const& i) { return i.busy; }))
3587 			{
3588 #ifndef TORRENT_DISABLE_LOGGING
3589 				peer_log(peer_log_alert::info, "PIECE_PICKER"
3590 					, "not_picking: %d,%d already in request queue & busy"
3591 					, static_cast<int>(block.piece_index), block.block_index);
3592 #endif
3593 				return false;
3594 			}
3595 		}
3596 
3597 		if (!t->picker().mark_as_downloading(block, peer_info_struct()
3598 			, picker_options()))
3599 		{
3600 #ifndef TORRENT_DISABLE_LOGGING
3601 			peer_log(peer_log_alert::info, "PIECE_PICKER"
3602 				, "not_picking: %d,%d failed to mark_as_downloading"
3603 				, static_cast<int>(block.piece_index), block.block_index);
3604 #endif
3605 			return false;
3606 		}
3607 
3608 		if (t->alerts().should_post<block_downloading_alert>())
3609 		{
3610 			t->alerts().emplace_alert<block_downloading_alert>(t->get_handle()
3611 				, remote(), pid(), block.block_index, block.piece_index);
3612 		}
3613 
3614 		pending_block pb(block);
3615 		pb.busy = (flags & busy) ? true : false;
3616 		if (flags & time_critical)
3617 		{
3618 			m_request_queue.insert(m_request_queue.begin() + m_queued_time_critical
3619 				, pb);
3620 			++m_queued_time_critical;
3621 		}
3622 		else
3623 		{
3624 			m_request_queue.push_back(pb);
3625 		}
3626 		return true;
3627 	}
3628 
cancel_all_requests()3629 	void peer_connection::cancel_all_requests()
3630 	{
3631 		TORRENT_ASSERT(is_single_thread());
3632 		INVARIANT_CHECK;
3633 
3634 		std::shared_ptr<torrent> t = m_torrent.lock();
3635 		// this peer might be disconnecting
3636 		if (!t) return;
3637 
3638 		TORRENT_ASSERT(t->valid_metadata());
3639 
3640 #ifndef TORRENT_DISABLE_LOGGING
3641 		peer_log(peer_log_alert::info, "CANCEL_ALL_REQUESTS");
3642 #endif
3643 
3644 		while (!m_request_queue.empty())
3645 		{
3646 			t->picker().abort_download(m_request_queue.back().block, peer_info_struct());
3647 			m_request_queue.pop_back();
3648 		}
3649 		m_queued_time_critical = 0;
3650 
3651 		// make a local temporary copy of the download queue, since it
3652 		// may be modified when we call write_cancel (for peers that don't
3653 		// support the FAST extensions).
3654 		std::vector<pending_block> temp_copy = m_download_queue;
3655 
3656 		for (auto const& pb : temp_copy)
3657 		{
3658 			piece_block const b = pb.block;
3659 
3660 			int const block_offset = b.block_index * t->block_size();
3661 			int const block_size
3662 				= std::min(t->torrent_file().piece_size(b.piece_index)-block_offset,
3663 					t->block_size());
3664 			TORRENT_ASSERT(block_size > 0);
3665 			TORRENT_ASSERT(block_size <= t->block_size());
3666 
3667 			// we can't cancel the piece if we've started receiving it
3668 			if (m_receiving_block == b) continue;
3669 
3670 			peer_request r;
3671 			r.piece = b.piece_index;
3672 			r.start = block_offset;
3673 			r.length = block_size;
3674 
3675 #ifndef TORRENT_DISABLE_LOGGING
3676 			peer_log(peer_log_alert::outgoing_message, "CANCEL"
3677 				, "piece: %d s: %d l: %d b: %d"
3678 				, static_cast<int>(b.piece_index), block_offset, block_size, b.block_index);
3679 #endif
3680 			write_cancel(r);
3681 		}
3682 	}
3683 
cancel_request(piece_block const & block,bool const force)3684 	void peer_connection::cancel_request(piece_block const& block, bool const force)
3685 	{
3686 		TORRENT_ASSERT(is_single_thread());
3687 		INVARIANT_CHECK;
3688 
3689 		std::shared_ptr<torrent> t = m_torrent.lock();
3690 		// this peer might be disconnecting
3691 		if (!t) return;
3692 
3693 		TORRENT_ASSERT(t->valid_metadata());
3694 
3695 		TORRENT_ASSERT(block.block_index != piece_block::invalid.block_index);
3696 		TORRENT_ASSERT(block.piece_index != piece_block::invalid.piece_index);
3697 		TORRENT_ASSERT(block.piece_index < t->torrent_file().end_piece());
3698 		TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
3699 
3700 		// if all the peers that requested this block has been
3701 		// cancelled, then just ignore the cancel.
3702 		if (!t->picker().is_requested(block)) return;
3703 
3704 		auto const it = std::find_if(m_download_queue.begin(), m_download_queue.end()
3705 			, aux::has_block(block));
3706 		if (it == m_download_queue.end())
3707 		{
3708 			auto const rit = std::find_if(m_request_queue.begin()
3709 				, m_request_queue.end(), aux::has_block(block));
3710 
3711 			// when a multi block is received, it is cancelled
3712 			// from all peers, so if this one hasn't requested
3713 			// the block, just ignore to cancel it.
3714 			if (rit == m_request_queue.end()) return;
3715 
3716 			if (rit - m_request_queue.begin() < m_queued_time_critical)
3717 				--m_queued_time_critical;
3718 
3719 			t->picker().abort_download(block, peer_info_struct());
3720 			m_request_queue.erase(rit);
3721 			// since we found it in the request queue, it means it hasn't been
3722 			// sent yet, so we don't have to send a cancel.
3723 			return;
3724 		}
3725 
3726 		int const block_offset = block.block_index * t->block_size();
3727 		int const block_size
3728 			= std::min(t->torrent_file().piece_size(block.piece_index) - block_offset,
3729 			t->block_size());
3730 		TORRENT_ASSERT(block_size > 0);
3731 		TORRENT_ASSERT(block_size <= t->block_size());
3732 
3733 		it->not_wanted = true;
3734 
3735 		if (force) t->picker().abort_download(block, peer_info_struct());
3736 
3737 		if (m_outstanding_bytes < block_size) return;
3738 
3739 		peer_request r;
3740 		r.piece = block.piece_index;
3741 		r.start = block_offset;
3742 		r.length = block_size;
3743 
3744 #ifndef TORRENT_DISABLE_LOGGING
3745 			peer_log(peer_log_alert::outgoing_message, "CANCEL"
3746 				, "piece: %d s: %d l: %d b: %d"
3747 				, static_cast<int>(block.piece_index), block_offset, block_size, block.block_index);
3748 #endif
3749 		write_cancel(r);
3750 	}
3751 
send_choke()3752 	bool peer_connection::send_choke()
3753 	{
3754 		TORRENT_ASSERT(is_single_thread());
3755 		INVARIANT_CHECK;
3756 
3757 		TORRENT_ASSERT(!is_connecting());
3758 
3759 		if (m_choked)
3760 		{
3761 			TORRENT_ASSERT(m_peer_info == nullptr
3762 				|| m_peer_info->optimistically_unchoked == false);
3763 			return false;
3764 		}
3765 
3766 		if (m_peer_info && m_peer_info->optimistically_unchoked)
3767 		{
3768 			m_peer_info->optimistically_unchoked = false;
3769 			m_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
3770 		}
3771 
3772 		m_suggest_pieces.clear();
3773 		m_suggest_pieces.shrink_to_fit();
3774 
3775 #ifndef TORRENT_DISABLE_LOGGING
3776 		peer_log(peer_log_alert::outgoing_message, "CHOKE");
3777 #endif
3778 		write_choke();
3779 		m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1);
3780 		if (!ignore_unchoke_slots())
3781 			m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
3782 		m_choked = true;
3783 
3784 		m_last_choke = aux::time_now();
3785 		m_num_invalid_requests = 0;
3786 
3787 		// reject the requests we have in the queue
3788 		// except the allowed fast pieces
3789 		for (auto i = m_requests.begin(); i != m_requests.end();)
3790 		{
3791 			if (std::find(m_accept_fast.begin(), m_accept_fast.end(), i->piece)
3792 				!= m_accept_fast.end())
3793 			{
3794 				++i;
3795 				continue;
3796 			}
3797 			peer_request const& r = *i;
3798 			m_counters.inc_stats_counter(counters::choked_piece_requests);
3799 #ifndef TORRENT_DISABLE_LOGGING
3800 			peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE"
3801 				, "piece: %d s: %d l: %d choking"
3802 				, static_cast<int>(r.piece), r.start , r.length);
3803 #endif
3804 			write_reject_request(r);
3805 			i = m_requests.erase(i);
3806 
3807 			if (m_requests.empty())
3808 				m_counters.inc_stats_counter(counters::num_peers_up_requests, -1);
3809 		}
3810 		return true;
3811 	}
3812 
send_unchoke()3813 	bool peer_connection::send_unchoke()
3814 	{
3815 		TORRENT_ASSERT(is_single_thread());
3816 		INVARIANT_CHECK;
3817 
3818 		if (!m_choked) return false;
3819 		std::shared_ptr<torrent> t = m_torrent.lock();
3820 		if (!t->ready_for_connections()) return false;
3821 
3822 		if (m_settings.get_int(settings_pack::suggest_mode)
3823 			== settings_pack::suggest_read_cache)
3824 		{
3825 			// immediately before unchoking this peer, we should send some
3826 			// suggested pieces for it to request
3827 			send_piece_suggestions(2);
3828 		}
3829 
3830 		m_last_unchoke = aux::time_now();
3831 		write_unchoke();
3832 		m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all);
3833 		if (!ignore_unchoke_slots())
3834 			m_counters.inc_stats_counter(counters::num_peers_up_unchoked);
3835 		m_choked = false;
3836 
3837 		m_uploaded_at_last_unchoke = m_statistics.total_payload_upload();
3838 
3839 #ifndef TORRENT_DISABLE_LOGGING
3840 		peer_log(peer_log_alert::outgoing_message, "UNCHOKE");
3841 #endif
3842 		return true;
3843 	}
3844 
send_interested()3845 	void peer_connection::send_interested()
3846 	{
3847 		TORRENT_ASSERT(is_single_thread());
3848 		if (m_interesting) return;
3849 		std::shared_ptr<torrent> t = m_torrent.lock();
3850 		if (!t->ready_for_connections()) return;
3851 		if (!m_interesting)
3852 		{
3853 			m_interesting = true;
3854 			m_counters.inc_stats_counter(counters::num_peers_down_interested);
3855 		}
3856 		write_interested();
3857 
3858 #ifndef TORRENT_DISABLE_LOGGING
3859 		peer_log(peer_log_alert::outgoing_message, "INTERESTED");
3860 #endif
3861 	}
3862 
send_not_interested()3863 	void peer_connection::send_not_interested()
3864 	{
3865 		TORRENT_ASSERT(is_single_thread());
3866 		// we cannot disconnect in a constructor, and
3867 		// this function may end up doing that
3868 		TORRENT_ASSERT(m_in_constructor == false);
3869 
3870 		if (!m_interesting)
3871 		{
3872 			disconnect_if_redundant();
3873 			return;
3874 		}
3875 
3876 		std::shared_ptr<torrent> t = m_torrent.lock();
3877 		if (!t->ready_for_connections()) return;
3878 		if (m_interesting)
3879 		{
3880 			m_interesting = false;
3881 			m_became_uninteresting = aux::time_now();
3882 			m_counters.inc_stats_counter(counters::num_peers_down_interested, -1);
3883 		}
3884 
3885 		m_slow_start = false;
3886 
3887 		disconnect_if_redundant();
3888 		if (m_disconnecting) return;
3889 
3890 		write_not_interested();
3891 
3892 #ifndef TORRENT_DISABLE_LOGGING
3893 		if (should_log(peer_log_alert::outgoing_message))
3894 		{
3895 			peer_log(peer_log_alert::outgoing_message, "NOT_INTERESTED");
3896 		}
3897 #endif
3898 	}
3899 
send_upload_only(bool const enabled)3900 	void peer_connection::send_upload_only(bool const enabled)
3901 	{
3902 		TORRENT_ASSERT(is_single_thread());
3903 		if (m_connecting || in_handshake()) return;
3904 
3905 #ifndef TORRENT_DISABLE_LOGGING
3906 		if (should_log(peer_log_alert::outgoing_message))
3907 		{
3908 			peer_log(peer_log_alert::outgoing_message, "UPLOAD_ONLY", "%d"
3909 				, int(enabled));
3910 		}
3911 #endif
3912 
3913 		write_upload_only(enabled);
3914 	}
3915 
send_piece_suggestions(int const num)3916 	void peer_connection::send_piece_suggestions(int const num)
3917 	{
3918 		std::shared_ptr<torrent> t = m_torrent.lock();
3919 		TORRENT_ASSERT(t);
3920 
3921 		int const new_suggestions = t->get_suggest_pieces(m_suggest_pieces
3922 			, m_have_piece, num);
3923 
3924 		// higher priority pieces are farther back in the vector, the last
3925 		// suggested piece to be received is the highest priority, so send the
3926 		// highest priority piece last.
3927 		for (auto i = m_suggest_pieces.end() - new_suggestions;
3928 			i != m_suggest_pieces.end(); ++i)
3929 		{
3930 			send_suggest(*i);
3931 		}
3932 		int const max = m_settings.get_int(settings_pack::max_suggest_pieces);
3933 		if (m_suggest_pieces.end_index() > max)
3934 		{
3935 			int const to_erase = m_suggest_pieces.end_index() - max;
3936 			m_suggest_pieces.erase(m_suggest_pieces.begin()
3937 				, m_suggest_pieces.begin() + to_erase);
3938 		}
3939 	}
3940 
send_suggest(piece_index_t const piece)3941 	void peer_connection::send_suggest(piece_index_t const piece)
3942 	{
3943 		TORRENT_ASSERT(is_single_thread());
3944 		if (m_connecting || in_handshake()) return;
3945 
3946 		// don't suggest a piece that the peer already has
3947 		if (has_piece(piece)) return;
3948 
3949 		// we cannot suggest a piece we don't have!
3950 #if TORRENT_USE_ASSERTS
3951 		{
3952 			std::shared_ptr<torrent> t = m_torrent.lock();
3953 			TORRENT_ASSERT(t);
3954 			TORRENT_ASSERT(t->has_piece_passed(piece));
3955 			TORRENT_ASSERT(piece < t->torrent_file().end_piece());
3956 		}
3957 #endif
3958 
3959 		write_suggest(piece);
3960 	}
3961 
send_block_requests()3962 	void peer_connection::send_block_requests()
3963 	{
3964 		TORRENT_ASSERT(is_single_thread());
3965 		INVARIANT_CHECK;
3966 
3967 		std::shared_ptr<torrent> t = m_torrent.lock();
3968 		TORRENT_ASSERT(t);
3969 
3970 		if (m_disconnecting) return;
3971 
3972 		// TODO: 3 once peers are properly put in graceful pause mode, they can
3973 		// cancel all outstanding requests and this test can be removed.
3974 		if (t->graceful_pause()) return;
3975 
3976 		// we can't download pieces in these states
3977 		if (t->state() == torrent_status::checking_files
3978 			|| t->state() == torrent_status::checking_resume_data
3979 			|| t->state() == torrent_status::downloading_metadata)
3980 			return;
3981 
3982 		if (int(m_download_queue.size()) >= m_desired_queue_size
3983 			|| t->upload_mode()) return;
3984 
3985 		bool const empty_download_queue = m_download_queue.empty();
3986 
3987 		while (!m_request_queue.empty()
3988 			&& (int(m_download_queue.size()) < m_desired_queue_size
3989 				|| m_queued_time_critical > 0))
3990 		{
3991 			pending_block block = m_request_queue.front();
3992 
3993 			m_request_queue.erase(m_request_queue.begin());
3994 			if (m_queued_time_critical) --m_queued_time_critical;
3995 
3996 			// if we're a seed, we don't have a piece picker
3997 			// so we don't have to worry about invariants getting
3998 			// out of sync with it
3999 			if (!t->has_picker()) continue;
4000 
4001 			// this can happen if a block times out, is re-requested and
4002 			// then arrives "unexpectedly"
4003 			if (t->picker().is_downloaded(block.block))
4004 			{
4005 				t->picker().abort_download(block.block, peer_info_struct());
4006 				continue;
4007 			}
4008 
4009 			int block_offset = block.block.block_index * t->block_size();
4010 			int bs = std::min(t->torrent_file().piece_size(
4011 				block.block.piece_index) - block_offset, t->block_size());
4012 			TORRENT_ASSERT(bs > 0);
4013 			TORRENT_ASSERT(bs <= t->block_size());
4014 
4015 			peer_request r;
4016 			r.piece = block.block.piece_index;
4017 			r.start = block_offset;
4018 			r.length = bs;
4019 
4020 			if (m_download_queue.empty())
4021 				m_counters.inc_stats_counter(counters::num_peers_down_requests);
4022 
4023 			TORRENT_ASSERT(verify_piece(t->to_req(block.block)));
4024 			block.send_buffer_offset = aux::numeric_cast<std::uint32_t>(m_send_buffer.size());
4025 			m_download_queue.push_back(block);
4026 			m_outstanding_bytes += bs;
4027 #if TORRENT_USE_INVARIANT_CHECKS
4028 			check_invariant();
4029 #endif
4030 
4031 			// if we are requesting large blocks, merge the smaller
4032 			// blocks that are in the same piece into larger requests
4033 			if (m_request_large_blocks)
4034 			{
4035 				int const blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
4036 
4037 				while (!m_request_queue.empty())
4038 				{
4039 					// check to see if this block is connected to the previous one
4040 					// if it is, merge them, otherwise, break this merge loop
4041 					pending_block const& front = m_request_queue.front();
4042 					if (static_cast<int>(front.block.piece_index) * blocks_per_piece + front.block.block_index
4043 						!= static_cast<int>(block.block.piece_index) * blocks_per_piece + block.block.block_index + 1)
4044 						break;
4045 					block = m_request_queue.front();
4046 					m_request_queue.erase(m_request_queue.begin());
4047 					TORRENT_ASSERT(verify_piece(t->to_req(block.block)));
4048 
4049 					if (m_download_queue.empty())
4050 						m_counters.inc_stats_counter(counters::num_peers_down_requests);
4051 
4052 					block.send_buffer_offset = aux::numeric_cast<std::uint32_t>(m_send_buffer.size());
4053 					m_download_queue.push_back(block);
4054 					if (m_queued_time_critical) --m_queued_time_critical;
4055 
4056 					block_offset = block.block.block_index * t->block_size();
4057 					bs = std::min(t->torrent_file().piece_size(
4058 						block.block.piece_index) - block_offset, t->block_size());
4059 					TORRENT_ASSERT(bs > 0);
4060 					TORRENT_ASSERT(bs <= t->block_size());
4061 
4062 					r.length += bs;
4063 					m_outstanding_bytes += bs;
4064 #if TORRENT_USE_INVARIANT_CHECKS
4065 					check_invariant();
4066 #endif
4067 				}
4068 
4069 #ifndef TORRENT_DISABLE_LOGGING
4070 				peer_log(peer_log_alert::info, "MERGING_REQUESTS"
4071 					, "piece: %d start: %d length: %d", static_cast<int>(r.piece)
4072 					, r.start, r.length);
4073 #endif
4074 
4075 			}
4076 
4077 			// the verification will fail for coalesced blocks
4078 			TORRENT_ASSERT(verify_piece(r) || m_request_large_blocks);
4079 
4080 #ifndef TORRENT_DISABLE_EXTENSIONS
4081 			bool handled = false;
4082 			for (auto const& e : m_extensions)
4083 			{
4084 				handled = e->write_request(r);
4085 				if (handled) break;
4086 			}
4087 			if (is_disconnecting()) return;
4088 			if (!handled)
4089 #endif
4090 			{
4091 				write_request(r);
4092 				m_last_request = aux::time_now();
4093 			}
4094 
4095 #ifndef TORRENT_DISABLE_LOGGING
4096 			if (should_log(peer_log_alert::outgoing_message))
4097 			{
4098 				peer_log(peer_log_alert::outgoing_message, "REQUEST"
4099 					, "piece: %d s: %x l: %x ds: %dB/s dqs: %d rqs: %d blk: %s"
4100 					, static_cast<int>(r.piece), r.start, r.length, statistics().download_rate()
4101 					, int(m_desired_queue_size), int(m_download_queue.size())
4102 					, m_request_large_blocks?"large":"single");
4103 			}
4104 #endif
4105 		}
4106 		m_last_piece = aux::time_now();
4107 
4108 		if (!m_download_queue.empty()
4109 			&& empty_download_queue)
4110 		{
4111 			// This means we just added a request to this connection that
4112 			// previously did not have a request. That's when we start the
4113 			// request timeout.
4114 			m_requested = aux::time_now();
4115 		}
4116 	}
4117 
connect_failed(error_code const & e)4118 	void peer_connection::connect_failed(error_code const& e)
4119 	{
4120 		TORRENT_ASSERT(is_single_thread());
4121 		TORRENT_ASSERT(e);
4122 
4123 #ifndef TORRENT_DISABLE_LOGGING
4124 		if (should_log(peer_log_alert::info))
4125 		{
4126 			peer_log(peer_log_alert::info, "CONNECTION FAILED"
4127 				, "%s %s", print_endpoint(m_remote).c_str(), print_error(e).c_str());
4128 		}
4129 #endif
4130 #ifndef TORRENT_DISABLE_LOGGING
4131 		if (m_ses.should_log())
4132 			m_ses.session_log("CONNECTION FAILED: %s", print_endpoint(m_remote).c_str());
4133 #endif
4134 
4135 		m_counters.inc_stats_counter(counters::connect_timeouts);
4136 
4137 		std::shared_ptr<torrent> t = m_torrent.lock();
4138 		TORRENT_ASSERT(!m_connecting || t);
4139 		if (m_connecting)
4140 		{
4141 			m_counters.inc_stats_counter(counters::num_peers_half_open, -1);
4142 			if (t && m_peer_info) t->dec_num_connecting(m_peer_info);
4143 			m_connecting = false;
4144 		}
4145 
4146 		// a connection attempt using uTP just failed
4147 		// mark this peer as not supporting uTP
4148 		// we'll never try it again (unless we're trying holepunch)
4149 		if (is_utp(*m_socket)
4150 			&& m_peer_info
4151 			&& m_peer_info->supports_utp
4152 			&& !m_holepunch_mode)
4153 		{
4154 			m_peer_info->supports_utp = false;
4155 			// reconnect immediately using TCP
4156 			fast_reconnect(true);
4157 			disconnect(e, operation_t::connect, normal);
4158 			if (t && m_peer_info)
4159 			{
4160 				std::weak_ptr<torrent> weak_t = t;
4161 				std::weak_ptr<peer_connection> weak_self = shared_from_this();
4162 
4163 				// we can't touch m_connections here, since we're likely looping
4164 				// over it. So defer the actual reconnection to after we've handled
4165 				// the existing message queue
4166 				m_ses.get_io_service().post([weak_t, weak_self]()
4167 				{
4168 					std::shared_ptr<torrent> tor = weak_t.lock();
4169 					std::shared_ptr<peer_connection> p = weak_self.lock();
4170 					if (tor && p)
4171 					{
4172 						torrent_peer* pi = p->peer_info_struct();
4173 						tor->connect_to_peer(pi, true);
4174 					}
4175 				});
4176 			}
4177 			return;
4178 		}
4179 
4180 		if (m_holepunch_mode)
4181 			fast_reconnect(true);
4182 
4183 #ifndef TORRENT_DISABLE_EXTENSIONS
4184 		if ((!is_utp(*m_socket)
4185 				|| !m_settings.get_bool(settings_pack::enable_outgoing_tcp))
4186 			&& m_peer_info
4187 			&& m_peer_info->supports_holepunch
4188 			&& !m_holepunch_mode)
4189 		{
4190 			// see if we can try a holepunch
4191 			bt_peer_connection* p = t->find_introducer(remote());
4192 			if (p)
4193 				p->write_holepunch_msg(bt_peer_connection::hp_message::rendezvous, remote());
4194 		}
4195 #endif
4196 
4197 		disconnect(e, operation_t::connect, failure);
4198 	}
4199 
4200 	// the error argument defaults to 0, which means deliberate disconnect
4201 	// 1 means unexpected disconnect/error
4202 	// 2 protocol error (client sent something invalid)
disconnect(error_code const & ec,operation_t const op,disconnect_severity_t const error)4203 	void peer_connection::disconnect(error_code const& ec
4204 		, operation_t const op, disconnect_severity_t const error)
4205 	{
4206 		TORRENT_ASSERT(is_single_thread());
4207 #if TORRENT_USE_ASSERTS
4208 		m_disconnect_started = true;
4209 #endif
4210 
4211 		if (m_disconnecting) return;
4212 
4213 		m_socket->set_close_reason(error_to_close_reason(ec));
4214 		close_reason_t const close_reason = m_socket->get_close_reason();
4215 #ifndef TORRENT_DISABLE_LOGGING
4216 		if (close_reason != close_reason_t::none)
4217 		{
4218 			peer_log(peer_log_alert::info, "CLOSE_REASON", "%d", int(close_reason));
4219 		}
4220 #endif
4221 
4222 		// while being disconnected, it's possible that our torrent_peer
4223 		// pointer gets cleared. Make sure we save it to be able to keep
4224 		// proper books in the piece_picker (when debugging is enabled)
4225 		torrent_peer* self_peer = peer_info_struct();
4226 
4227 #ifndef TORRENT_DISABLE_LOGGING
4228 		if (should_log(peer_log_alert::info)) try
4229 		{
4230 			static aux::array<char const*, 3, disconnect_severity_t> const str{{{
4231 				"CONNECTION_CLOSED", "CONNECTION_FAILED", "PEER_ERROR"}}};
4232 			peer_log(peer_log_alert::info, str[error], "op: %d %s"
4233 				, static_cast<int>(op), print_error(ec).c_str());
4234 
4235 			if (ec == boost::asio::error::eof
4236 				&& !in_handshake()
4237 				&& !is_connecting()
4238 				&& aux::time_now() - connected_time() < seconds(15))
4239 			{
4240 				peer_log(peer_log_alert::info, "SHORT_LIVED_DISCONNECT", "");
4241 			}
4242 		}
4243 		catch (std::exception const& err)
4244 		{
4245 			peer_log(peer_log_alert::info, "PEER_ERROR" ,"op: %d ERROR: unknown error (failed with exception) %s"
4246 				, static_cast<int>(op), err.what());
4247 		}
4248 #endif
4249 
4250 		if (!(m_channel_state[upload_channel] & peer_info::bw_network))
4251 		{
4252 			// make sure we free up all send buffers that are owned
4253 			// by the disk thread
4254 			m_send_buffer.clear();
4255 		}
4256 
4257 		// we cannot do this in a constructor
4258 		TORRENT_ASSERT(m_in_constructor == false);
4259 		if (error > normal)
4260 		{
4261 			m_failed = true;
4262 		}
4263 
4264 		if (m_connected)
4265 			m_counters.inc_stats_counter(counters::num_peers_connected, -1);
4266 		m_connected = false;
4267 
4268 		// for incoming connections, we get invalid argument errors
4269 		// when asking for the remote endpoint and the socket already
4270 		// closed, which is an edge case, but possible to happen when
4271 		// a peer makes a TCP and uTP connection in parallel.
4272 		// for outgoing connections however, why would we get this?
4273 //		TORRENT_ASSERT(ec != error::invalid_argument || !m_outgoing);
4274 
4275 		m_counters.inc_stats_counter(counters::disconnected_peers);
4276 		if (error == peer_error) m_counters.inc_stats_counter(counters::error_peers);
4277 
4278 		if (ec == error::connection_reset)
4279 			m_counters.inc_stats_counter(counters::connreset_peers);
4280 		else if (ec == error::eof)
4281 			m_counters.inc_stats_counter(counters::eof_peers);
4282 		else if (ec == error::connection_refused)
4283 			m_counters.inc_stats_counter(counters::connrefused_peers);
4284 		else if (ec == error::connection_aborted)
4285 			m_counters.inc_stats_counter(counters::connaborted_peers);
4286 		else if (ec == error::not_connected)
4287 			m_counters.inc_stats_counter(counters::notconnected_peers);
4288 		else if (ec == error::no_permission)
4289 			m_counters.inc_stats_counter(counters::perm_peers);
4290 		else if (ec == error::no_buffer_space)
4291 			m_counters.inc_stats_counter(counters::buffer_peers);
4292 		else if (ec == error::host_unreachable)
4293 			m_counters.inc_stats_counter(counters::unreachable_peers);
4294 		else if (ec == error::broken_pipe)
4295 			m_counters.inc_stats_counter(counters::broken_pipe_peers);
4296 		else if (ec == error::address_in_use)
4297 			m_counters.inc_stats_counter(counters::addrinuse_peers);
4298 		else if (ec == error::access_denied)
4299 			m_counters.inc_stats_counter(counters::no_access_peers);
4300 		else if (ec == error::invalid_argument)
4301 			m_counters.inc_stats_counter(counters::invalid_arg_peers);
4302 		else if (ec == error::operation_aborted)
4303 			m_counters.inc_stats_counter(counters::aborted_peers);
4304 		else if (ec == errors::upload_upload_connection
4305 			|| ec == errors::uninteresting_upload_peer
4306 			|| ec == errors::torrent_aborted
4307 			|| ec == errors::self_connection
4308 			|| ec == errors::torrent_paused)
4309 			m_counters.inc_stats_counter(counters::uninteresting_peers);
4310 
4311 		if (ec == errors::timed_out
4312 			|| ec == error::timed_out)
4313 			m_counters.inc_stats_counter(counters::transport_timeout_peers);
4314 
4315 		if (ec == errors::timed_out_inactivity
4316 			|| ec == errors::timed_out_no_request
4317 			|| ec == errors::timed_out_no_interest)
4318 			m_counters.inc_stats_counter(counters::timeout_peers);
4319 
4320 		if (ec == errors::no_memory)
4321 			m_counters.inc_stats_counter(counters::no_memory_peers);
4322 
4323 		if (ec == errors::too_many_connections)
4324 			m_counters.inc_stats_counter(counters::too_many_peers);
4325 
4326 		if (ec == errors::timed_out_no_handshake)
4327 			m_counters.inc_stats_counter(counters::connect_timeouts);
4328 
4329 		if (error > normal)
4330 		{
4331 			if (is_utp(*m_socket)) m_counters.inc_stats_counter(counters::error_utp_peers);
4332 			else m_counters.inc_stats_counter(counters::error_tcp_peers);
4333 
4334 			if (m_outgoing) m_counters.inc_stats_counter(counters::error_outgoing_peers);
4335 			else m_counters.inc_stats_counter(counters::error_incoming_peers);
4336 
4337 #if !defined TORRENT_DISABLE_ENCRYPTION
4338 			if (type() == connection_type::bittorrent && op != operation_t::connect)
4339 			{
4340 				auto* bt = static_cast<bt_peer_connection*>(this);
4341 				if (bt->supports_encryption()) m_counters.inc_stats_counter(
4342 					counters::error_encrypted_peers);
4343 				if (bt->rc4_encrypted() && bt->supports_encryption())
4344 					m_counters.inc_stats_counter(counters::error_rc4_peers);
4345 			}
4346 #endif // TORRENT_DISABLE_ENCRYPTION
4347 		}
4348 
4349 		std::shared_ptr<peer_connection> me(self());
4350 
4351 		INVARIANT_CHECK;
4352 
4353 		if (m_channel_state[upload_channel] & peer_info::bw_disk)
4354 		{
4355 			m_counters.inc_stats_counter(counters::num_peers_up_disk, -1);
4356 			m_channel_state[upload_channel] &= ~peer_info::bw_disk;
4357 		}
4358 		if (m_channel_state[download_channel] & peer_info::bw_disk)
4359 		{
4360 			m_counters.inc_stats_counter(counters::num_peers_down_disk, -1);
4361 			m_channel_state[download_channel] &= ~peer_info::bw_disk;
4362 		}
4363 
4364 		std::shared_ptr<torrent> t = m_torrent.lock();
4365 
4366 		// don't try to connect to ourself again
4367 		if (ec == errors::self_connection && m_peer_info && t)
4368 			t->ban_peer(m_peer_info);
4369 
4370 		if (m_connecting)
4371 		{
4372 			m_counters.inc_stats_counter(counters::num_peers_half_open, -1);
4373 			if (t) t->dec_num_connecting(m_peer_info);
4374 			m_connecting = false;
4375 		}
4376 
4377 		torrent_handle handle;
4378 		if (t) handle = t->get_handle();
4379 
4380 #ifndef TORRENT_DISABLE_EXTENSIONS
4381 		for (auto const& e : m_extensions)
4382 		{
4383 			e->on_disconnect(ec);
4384 		}
4385 #endif
4386 
4387 		if (ec == error::address_in_use
4388 			&& m_settings.get_int(settings_pack::outgoing_port) != 0
4389 			&& t)
4390 		{
4391 			if (t->alerts().should_post<performance_alert>())
4392 				t->alerts().emplace_alert<performance_alert>(
4393 					handle, performance_alert::too_few_outgoing_ports);
4394 		}
4395 
4396 		m_disconnecting = true;
4397 
4398 		if (t)
4399 		{
4400 			if (ec)
4401 			{
4402 				if ((error > failure || ec.category() == socks_category())
4403 					&& t->alerts().should_post<peer_error_alert>())
4404 				{
4405 					t->alerts().emplace_alert<peer_error_alert>(handle, remote()
4406 						, pid(), op, ec);
4407 				}
4408 
4409 				if (error <= failure && t->alerts().should_post<peer_disconnected_alert>())
4410 				{
4411 					t->alerts().emplace_alert<peer_disconnected_alert>(handle
4412 						, remote(), pid(), op, m_socket->type(), ec, close_reason);
4413 				}
4414 			}
4415 
4416 			// make sure we keep all the stats!
4417 			if (!m_ignore_stats)
4418 			{
4419 				// report any partially received payload as redundant
4420 				piece_block_progress pbp = downloading_piece_progress();
4421 				if (pbp.piece_index != piece_block_progress::invalid_index
4422 					&& pbp.bytes_downloaded > 0
4423 					&& pbp.bytes_downloaded < pbp.full_block_bytes)
4424 				{
4425 					t->add_redundant_bytes(pbp.bytes_downloaded, waste_reason::piece_closing);
4426 				}
4427 			}
4428 
4429 			if (t->has_picker())
4430 			{
4431 				clear_download_queue();
4432 				piece_picker& picker = t->picker();
4433 				while (!m_request_queue.empty())
4434 				{
4435 					pending_block const& qe = m_request_queue.back();
4436 					if (!qe.timed_out && !qe.not_wanted)
4437 						picker.abort_download(qe.block, self_peer);
4438 					m_request_queue.pop_back();
4439 				}
4440 			}
4441 			else
4442 			{
4443 				m_download_queue.clear();
4444 				m_request_queue.clear();
4445 				m_outstanding_bytes = 0;
4446 			}
4447 			m_queued_time_critical = 0;
4448 
4449 #if TORRENT_USE_INVARIANT_CHECKS
4450 			try { check_invariant(); } catch (std::exception const&) {}
4451 #endif
4452 			t->remove_peer(self());
4453 
4454 			// we need to do this here to maintain accurate accounting of number of
4455 			// unchoke slots. Ideally the updating of choked state and the
4456 			// accounting should be tighter
4457 			if (!m_choked)
4458 			{
4459 				m_choked = true;
4460 				m_counters.inc_stats_counter(counters::num_peers_up_unchoked_all, -1);
4461 				if (!ignore_unchoke_slots())
4462 					m_counters.inc_stats_counter(counters::num_peers_up_unchoked, -1);
4463 			}
4464 		}
4465 		else
4466 		{
4467 			TORRENT_ASSERT(m_download_queue.empty());
4468 			TORRENT_ASSERT(m_request_queue.empty());
4469 			m_ses.close_connection(this);
4470 		}
4471 
4472 		async_shutdown(*m_socket, m_socket);
4473 	}
4474 
ignore_unchoke_slots() const4475 	bool peer_connection::ignore_unchoke_slots() const
4476 	{
4477 		TORRENT_ASSERT(is_single_thread());
4478 		if (num_classes() == 0) return true;
4479 
4480 		if (m_ses.ignore_unchoke_slots_set(*this)) return true;
4481 		std::shared_ptr<torrent> t = m_torrent.lock();
4482 		if (t && m_ses.ignore_unchoke_slots_set(*t)) return true;
4483 		return false;
4484 	}
4485 
on_local_network() const4486 	bool peer_connection::on_local_network() const
4487 	{
4488 		TORRENT_ASSERT(is_single_thread());
4489 		return is_local(m_remote.address())
4490 			|| is_loopback(m_remote.address());
4491 	}
4492 
request_timeout() const4493 	int peer_connection::request_timeout() const
4494 	{
4495 		const int deviation = m_request_time.avg_deviation();
4496 		const int avg = m_request_time.mean();
4497 
4498 		int ret;
4499 		if (m_request_time.num_samples() < 2)
4500 		{
4501 			if (m_request_time.num_samples() == 0)
4502 				return m_settings.get_int(settings_pack::request_timeout);
4503 
4504 			ret = avg + avg / 5;
4505 		}
4506 		else
4507 		{
4508 			ret = avg + deviation * 4;
4509 		}
4510 
4511 		// ret is milliseconds, the return value is seconds. Convert to
4512 		// seconds and round up
4513 		ret = std::min((ret + 999) / 1000
4514 			, m_settings.get_int(settings_pack::request_timeout));
4515 
4516 		// timeouts should never be less than 2 seconds. The granularity is whole
4517 		// seconds, and only checked once per second. 2 is the minimum to avoid
4518 		// being considered timed out instantly
4519 		return std::max(2, ret);
4520 	}
4521 
get_peer_info(peer_info & p) const4522 	void peer_connection::get_peer_info(peer_info& p) const
4523 	{
4524 		TORRENT_ASSERT(is_single_thread());
4525 		TORRENT_ASSERT(!associated_torrent().expired());
4526 
4527 		time_point const now = aux::time_now();
4528 
4529 		p.download_rate_peak = m_download_rate_peak;
4530 		p.upload_rate_peak = m_upload_rate_peak;
4531 		p.rtt = m_request_time.mean();
4532 		p.down_speed = statistics().download_rate();
4533 		p.up_speed = statistics().upload_rate();
4534 		p.payload_down_speed = statistics().download_payload_rate();
4535 		p.payload_up_speed = statistics().upload_payload_rate();
4536 		p.pid = pid();
4537 		p.ip = remote();
4538 		p.pending_disk_bytes = m_outstanding_writing_bytes;
4539 		p.pending_disk_read_bytes = m_reading_bytes;
4540 		p.send_quota = m_quota[upload_channel];
4541 		p.receive_quota = m_quota[download_channel];
4542 		p.num_pieces = m_num_pieces;
4543 		if (m_download_queue.empty()) p.request_timeout = -1;
4544 		else p.request_timeout = int(total_seconds(m_requested - now)
4545 			+ request_timeout());
4546 
4547 		p.download_queue_time = download_queue_time();
4548 		p.queue_bytes = m_outstanding_bytes;
4549 
4550 		p.total_download = statistics().total_payload_download();
4551 		p.total_upload = statistics().total_payload_upload();
4552 #if TORRENT_ABI_VERSION == 1
4553 		p.upload_limit = -1;
4554 		p.download_limit = -1;
4555 		p.load_balancing = 0;
4556 #endif
4557 
4558 		p.download_queue_length = int(download_queue().size() + m_request_queue.size());
4559 		p.requests_in_buffer = int(std::count_if(m_download_queue.begin()
4560 			, m_download_queue.end()
4561 			, &pending_block_in_buffer));
4562 
4563 		p.target_dl_queue_length = desired_queue_size();
4564 		p.upload_queue_length = int(upload_queue().size());
4565 		p.timed_out_requests = 0;
4566 		p.busy_requests = 0;
4567 		for (auto const& pb : m_download_queue)
4568 		{
4569 			if (pb.timed_out) ++p.timed_out_requests;
4570 			if (pb.busy) ++p.busy_requests;
4571 		}
4572 
4573 		piece_block_progress const ret = downloading_piece_progress();
4574 		if (ret.piece_index != piece_block_progress::invalid_index)
4575 		{
4576 			p.downloading_piece_index = ret.piece_index;
4577 			p.downloading_block_index = ret.block_index;
4578 			p.downloading_progress = ret.bytes_downloaded;
4579 			p.downloading_total = ret.full_block_bytes;
4580 		}
4581 		else
4582 		{
4583 			p.downloading_piece_index = piece_index_t(-1);
4584 			p.downloading_block_index = -1;
4585 			p.downloading_progress = 0;
4586 			p.downloading_total = 0;
4587 		}
4588 
4589 		p.pieces = get_bitfield();
4590 		p.last_request = now - m_last_request;
4591 		p.last_active = now - std::max(m_last_sent, m_last_receive);
4592 
4593 		// this will set the flags so that we can update them later
4594 		p.flags = {};
4595 		get_specific_peer_info(p);
4596 
4597 		if (m_snubbed) p.flags |= peer_info::snubbed;
4598 		if (m_upload_only) p.flags |= peer_info::upload_only;
4599 		if (m_endgame_mode) p.flags |= peer_info::endgame_mode;
4600 		if (m_holepunch_mode) p.flags |= peer_info::holepunched;
4601 		if (peer_info_struct())
4602 		{
4603 			torrent_peer* pi = peer_info_struct();
4604 			TORRENT_ASSERT(pi->in_use);
4605 			p.source = peer_source_flags_t(pi->source);
4606 			p.failcount = pi->failcount;
4607 			p.num_hashfails = pi->hashfails;
4608 			if (pi->on_parole) p.flags |= peer_info::on_parole;
4609 			if (pi->optimistically_unchoked) p.flags |= peer_info::optimistic_unchoke;
4610 			if (pi->seed) p.flags |= peer_info::seed;
4611 		}
4612 		else
4613 		{
4614 			if (is_seed()) p.flags |= peer_info::seed;
4615 			p.source = {};
4616 			p.failcount = 0;
4617 			p.num_hashfails = 0;
4618 		}
4619 
4620 #if TORRENT_ABI_VERSION == 1
4621 		p.remote_dl_rate = 0;
4622 #endif
4623 		p.send_buffer_size = m_send_buffer.capacity();
4624 		p.used_send_buffer = m_send_buffer.size();
4625 		p.receive_buffer_size = m_recv_buffer.capacity();
4626 		p.used_receive_buffer = m_recv_buffer.pos();
4627 		p.receive_buffer_watermark = m_recv_buffer.watermark();
4628 		p.write_state = m_channel_state[upload_channel];
4629 		p.read_state = m_channel_state[download_channel];
4630 
4631 		// pieces may be empty if we don't have metadata yet
4632 		if (p.pieces.empty())
4633 		{
4634 			p.progress = 0.f;
4635 			p.progress_ppm = 0;
4636 		}
4637 		else
4638 		{
4639 #if TORRENT_NO_FPU
4640 			p.progress = 0.f;
4641 #else
4642 			p.progress = float(p.pieces.count()) / float(p.pieces.size());
4643 #endif
4644 			p.progress_ppm = int(std::int64_t(p.pieces.count()) * 1000000 / p.pieces.size());
4645 		}
4646 
4647 #if TORRENT_ABI_VERSION == 1
4648 		p.estimated_reciprocation_rate = m_est_reciprocation_rate;
4649 #endif
4650 
4651 		error_code ec;
4652 		p.local_endpoint = get_socket()->local_endpoint(ec);
4653 	}
4654 
4655 #ifndef TORRENT_DISABLE_SUPERSEEDING
4656 	// TODO: 3 new_piece should be an optional<piece_index_t>. piece index -1
4657 	// should not be allowed
superseed_piece(piece_index_t const replace_piece,piece_index_t const new_piece)4658 	void peer_connection::superseed_piece(piece_index_t const replace_piece
4659 		, piece_index_t const new_piece)
4660 	{
4661 		TORRENT_ASSERT(is_single_thread());
4662 
4663 		if (is_connecting()) return;
4664 		if (in_handshake()) return;
4665 
4666 		if (new_piece == piece_index_t(-1))
4667 		{
4668 			if (m_superseed_piece[0] == piece_index_t(-1)) return;
4669 			m_superseed_piece[0] = piece_index_t(-1);
4670 			m_superseed_piece[1] = piece_index_t(-1);
4671 
4672 #ifndef TORRENT_DISABLE_LOGGING
4673 			peer_log(peer_log_alert::info, "SUPER_SEEDING", "ending");
4674 #endif
4675 			std::shared_ptr<torrent> t = m_torrent.lock();
4676 			TORRENT_ASSERT(t);
4677 
4678 			// this will either send a full bitfield or
4679 			// a have-all message, effectively terminating
4680 			// super-seeding, since the peer may pick any piece
4681 			write_bitfield();
4682 
4683 			return;
4684 		}
4685 
4686 		TORRENT_ASSERT(!has_piece(new_piece));
4687 
4688 #ifndef TORRENT_DISABLE_LOGGING
4689 		peer_log(peer_log_alert::outgoing_message, "HAVE", "piece: %d (super seed)"
4690 			, static_cast<int>(new_piece));
4691 #endif
4692 		write_have(new_piece);
4693 
4694 		if (replace_piece >= piece_index_t(0))
4695 		{
4696 			// move the piece we're replacing to the tail
4697 			if (m_superseed_piece[0] == replace_piece)
4698 				std::swap(m_superseed_piece[0], m_superseed_piece[1]);
4699 		}
4700 
4701 		m_superseed_piece[1] = m_superseed_piece[0];
4702 		m_superseed_piece[0] = new_piece;
4703 	}
4704 #endif // TORRENT_DISABLE_SUPERSEEDING
4705 
max_out_request_queue(int s)4706 	void peer_connection::max_out_request_queue(int s)
4707 	{
4708 #ifndef TORRENT_DISABLE_LOGGING
4709 		peer_log(peer_log_alert::info, "MAX_OUT_QUEUE_SIZE", "%d -> %d"
4710 			, m_max_out_request_queue, s);
4711 #endif
4712 		m_max_out_request_queue = s;
4713 	}
4714 
max_out_request_queue() const4715 	int peer_connection::max_out_request_queue() const
4716 	{
4717 		return m_max_out_request_queue;
4718 	}
4719 
update_desired_queue_size()4720 	void peer_connection::update_desired_queue_size()
4721 	{
4722 		TORRENT_ASSERT(is_single_thread());
4723 		if (m_snubbed)
4724 		{
4725 			m_desired_queue_size = 1;
4726 			return;
4727 		}
4728 
4729 #ifndef TORRENT_DISABLE_LOGGING
4730 		int const previous_queue_size = m_desired_queue_size;
4731 #endif
4732 
4733 		int const download_rate = statistics().download_payload_rate();
4734 
4735 		// the desired download queue size
4736 		int const queue_time = m_settings.get_int(settings_pack::request_queue_time);
4737 
4738 		// when we're in slow-start mode we increase the desired queue size every
4739 		// time we receive a piece, no need to adjust it here (other than
4740 		// enforcing the upper limit)
4741 		if (!m_slow_start)
4742 		{
4743 			// (if the latency is more than this, the download will stall)
4744 			// so, the queue size is queue_time * down_rate / 16 kiB
4745 			// (16 kB is the size of each request)
4746 			// the minimum number of requests is 2 and the maximum is 48
4747 			// the block size doesn't have to be 16. So we first query the
4748 			// torrent for it
4749 			std::shared_ptr<torrent> t = m_torrent.lock();
4750 			int const bs = t->block_size();
4751 
4752 			TORRENT_ASSERT(bs > 0);
4753 
4754 			m_desired_queue_size = std::uint16_t(queue_time * download_rate / bs);
4755 		}
4756 
4757 		if (m_desired_queue_size > m_max_out_request_queue)
4758 			m_desired_queue_size = std::uint16_t(m_max_out_request_queue);
4759 		if (m_desired_queue_size < min_request_queue)
4760 			m_desired_queue_size = min_request_queue;
4761 
4762 #ifndef TORRENT_DISABLE_LOGGING
4763 		if (previous_queue_size != m_desired_queue_size)
4764 		{
4765 			peer_log(peer_log_alert::info, "UPDATE_QUEUE_SIZE"
4766 				, "dqs: %d max: %d dl: %d qt: %d snubbed: %d slow-start: %d"
4767 				, m_desired_queue_size, m_max_out_request_queue
4768 				, download_rate, queue_time, int(m_snubbed), int(m_slow_start));
4769 		}
4770 #endif
4771 	}
4772 
second_tick(int const tick_interval_ms)4773 	void peer_connection::second_tick(int const tick_interval_ms)
4774 	{
4775 		TORRENT_ASSERT(is_single_thread());
4776 		time_point const now = aux::time_now();
4777 		std::shared_ptr<peer_connection> me(self());
4778 
4779 		// the invariant check must be run before me is destructed
4780 		// in case the peer got disconnected
4781 		INVARIANT_CHECK;
4782 
4783 		std::shared_ptr<torrent> t = m_torrent.lock();
4784 
4785 		int warning = 0;
4786 		// drain the IP overhead from the bandwidth limiters
4787 		if (m_settings.get_bool(settings_pack::rate_limit_ip_overhead) && t)
4788 		{
4789 			warning |= m_ses.use_quota_overhead(*this, m_statistics.download_ip_overhead()
4790 				, m_statistics.upload_ip_overhead());
4791 			warning |= m_ses.use_quota_overhead(*t, m_statistics.download_ip_overhead()
4792 				, m_statistics.upload_ip_overhead());
4793 		}
4794 
4795 		if (warning && t->alerts().should_post<performance_alert>())
4796 		{
4797 			for (int channel = 0; channel < 2; ++channel)
4798 			{
4799 				if ((warning & (1 << channel)) == 0) continue;
4800 				t->alerts().emplace_alert<performance_alert>(t->get_handle()
4801 					, channel == peer_connection::download_channel
4802 					? performance_alert::download_limit_too_low
4803 					: performance_alert::upload_limit_too_low);
4804 			}
4805 		}
4806 
4807 		if (!t || m_disconnecting)
4808 		{
4809 			TORRENT_ASSERT(t || !m_connecting);
4810 			if (m_connecting)
4811 			{
4812 				m_counters.inc_stats_counter(counters::num_peers_half_open, -1);
4813 				if (t) t->dec_num_connecting(m_peer_info);
4814 				m_connecting = false;
4815 			}
4816 			disconnect(errors::torrent_aborted, operation_t::bittorrent);
4817 			return;
4818 		}
4819 
4820 		if (m_endgame_mode
4821 			&& m_interesting
4822 			&& m_download_queue.empty()
4823 			&& m_request_queue.empty()
4824 			&& now - seconds(5) >= m_last_request)
4825 		{
4826 			// this happens when we're in strict end-game
4827 			// mode and the peer could not request any blocks
4828 			// because they were all taken but there were still
4829 			// unrequested blocks. Now, 5 seconds later, there
4830 			// might not be any unrequested blocks anymore, so
4831 			// we should try to pick another block to see
4832 			// if we can pick a busy one
4833 			m_last_request = now;
4834 			if (request_a_block(*t, *this))
4835 				m_counters.inc_stats_counter(counters::end_game_piece_picks);
4836 			if (m_disconnecting) return;
4837 			send_block_requests();
4838 		}
4839 
4840 #ifndef TORRENT_DISABLE_SUPERSEEDING
4841 		if (t->super_seeding()
4842 			&& t->ready_for_connections()
4843 			&& !m_peer_interested
4844 			&& m_became_uninterested + seconds(10) < now)
4845 		{
4846 			// maybe we need to try another piece, to see if the peer
4847 			// become interested in us then
4848 			superseed_piece(piece_index_t(-1), t->get_piece_to_super_seed(m_have_piece));
4849 		}
4850 #endif
4851 
4852 		on_tick();
4853 		if (is_disconnecting()) return;
4854 
4855 #ifndef TORRENT_DISABLE_EXTENSIONS
4856 		for (auto const& e : m_extensions)
4857 		{
4858 			e->tick();
4859 		}
4860 		if (is_disconnecting()) return;
4861 #endif
4862 
4863 		// if the peer hasn't said a thing for a certain
4864 		// time, it is considered to have timed out
4865 		time_duration d = now - m_last_receive;
4866 
4867 		if (m_connecting)
4868 		{
4869 			int connect_timeout = m_settings.get_int(settings_pack::peer_connect_timeout);
4870 			if (m_peer_info) connect_timeout += 3 * m_peer_info->failcount;
4871 
4872 			// SSL and i2p handshakes are slow
4873 			if (is_ssl(*m_socket))
4874 				connect_timeout += 10;
4875 
4876 #if TORRENT_USE_I2P
4877 			if (is_i2p(*m_socket))
4878 				connect_timeout += 20;
4879 #endif
4880 
4881 			if (d > seconds(connect_timeout)
4882 				&& can_disconnect(errors::timed_out))
4883 			{
4884 #ifndef TORRENT_DISABLE_LOGGING
4885 				peer_log(peer_log_alert::info, "CONNECT_FAILED", "waited %d seconds"
4886 					, int(total_seconds(d)));
4887 #endif
4888 				connect_failed(errors::timed_out);
4889 				return;
4890 			}
4891 		}
4892 
4893 		// if the bw_network flag isn't set, it means we are not even trying to
4894 		// read from this peer's socket. Most likely because we're applying a
4895 		// rate limit. If the peer is "slow" because we are rate limiting it,
4896 		// don't enforce timeouts. However, as soon as we *do* read from the
4897 		// socket, we expect to receive data, and not have timed out. Then we
4898 		// can enforce the timeouts.
4899 		bool const reading_socket = bool(m_channel_state[download_channel] & peer_info::bw_network);
4900 
4901 		// TODO: 2 use a deadline_timer for timeouts. Don't rely on second_tick()!
4902 		// Hook this up to connect timeout as well. This would improve performance
4903 		// because of less work in second_tick(), and might let use remove ticking
4904 		// entirely eventually
4905 		if (reading_socket && d > seconds(timeout()) && !m_connecting && m_reading_bytes == 0
4906 			&& can_disconnect(errors::timed_out_inactivity))
4907 		{
4908 #ifndef TORRENT_DISABLE_LOGGING
4909 			peer_log(peer_log_alert::info, "LAST_ACTIVITY", "%d seconds ago"
4910 				, int(total_seconds(d)));
4911 #endif
4912 			disconnect(errors::timed_out_inactivity, operation_t::bittorrent);
4913 			return;
4914 		}
4915 
4916 		// do not stall waiting for a handshake
4917 		int timeout = m_settings.get_int (settings_pack::handshake_timeout);
4918 #if TORRENT_USE_I2P
4919 		timeout *= is_i2p(*m_socket) ? 4 : 1;
4920 #endif
4921 		if (reading_socket
4922 			&& !m_connecting
4923 			&& in_handshake()
4924 			&& d > seconds(timeout))
4925 		{
4926 #ifndef TORRENT_DISABLE_LOGGING
4927 			peer_log(peer_log_alert::info, "NO_HANDSHAKE", "waited %d seconds"
4928 				, int(total_seconds(d)));
4929 #endif
4930 			disconnect(errors::timed_out_no_handshake, operation_t::bittorrent);
4931 			return;
4932 		}
4933 
4934 		// disconnect peers that we unchoked, but they didn't send a request in
4935 		// the last 60 seconds, and we haven't been working on servicing a request
4936 		// for more than 60 seconds.
4937 		// but only if we're a seed
4938 		d = now - std::max(std::max(m_last_unchoke, m_last_incoming_request)
4939 			, m_last_sent_payload);
4940 
4941 		if (reading_socket
4942 			&& !m_connecting
4943 			&& m_requests.empty()
4944 			&& m_reading_bytes == 0
4945 			&& !m_choked
4946 			&& m_peer_interested
4947 			&& t && t->is_upload_only()
4948 			&& d > seconds(60)
4949 			&& can_disconnect(errors::timed_out_no_request))
4950 		{
4951 #ifndef TORRENT_DISABLE_LOGGING
4952 			peer_log(peer_log_alert::info, "NO_REQUEST", "waited %d seconds"
4953 				, int(total_seconds(d)));
4954 #endif
4955 			disconnect(errors::timed_out_no_request, operation_t::bittorrent);
4956 			return;
4957 		}
4958 
4959 		// if the peer hasn't become interested and we haven't
4960 		// become interested in the peer for 10 minutes, it
4961 		// has also timed out.
4962 		time_duration const d1 = now - m_became_uninterested;
4963 		time_duration const d2 = now - m_became_uninteresting;
4964 		time_duration const time_limit = seconds(
4965 			m_settings.get_int(settings_pack::inactivity_timeout));
4966 
4967 		// if we are close enough to the limit, consider the peer connection
4968 		// list full. This will enable the inactive timeout
4969 		bool const max_session_conns = m_ses.num_connections()
4970 			>= m_settings.get_int(settings_pack::connections_limit) - 5;
4971 		bool const max_torrent_conns = t && t->num_peers()
4972 			>= t->max_connections() - 5;
4973 
4974 		// don't bother disconnect peers we haven't been interested
4975 		// in (and that hasn't been interested in us) for a while
4976 		// unless we have used up all our connection slots
4977 		if (reading_socket
4978 			&& !m_interesting
4979 			&& !m_peer_interested
4980 			&& d1 > time_limit
4981 			&& d2 > time_limit
4982 			&& (max_session_conns || max_torrent_conns)
4983 			&& can_disconnect(errors::timed_out_no_interest))
4984 		{
4985 #ifndef TORRENT_DISABLE_LOGGING
4986 			if (should_log(peer_log_alert::info))
4987 			{
4988 				peer_log(peer_log_alert::info, "MUTUAL_NO_INTEREST", "t1: %d t2: %d"
4989 					, int(total_seconds(d1)), int(total_seconds(d2)));
4990 			}
4991 #endif
4992 			disconnect(errors::timed_out_no_interest, operation_t::bittorrent);
4993 			return;
4994 		}
4995 
4996 		if (reading_socket
4997 			&& !m_download_queue.empty()
4998 			&& m_quota[download_channel] > 0
4999 			&& now > m_requested + seconds(request_timeout()))
5000 		{
5001 			snub_peer();
5002 		}
5003 
5004 		// if we haven't sent something in too long, send a keep-alive
5005 		keep_alive();
5006 
5007 		// if our download rate isn't increasing significantly anymore, end slow
5008 		// start. The 10kB is to have some slack here.
5009 		// we can't do this when we're choked, because we aren't sending any
5010 		// requests yet, so there hasn't been an opportunity to ramp up the
5011 		// connection yet.
5012 		if (m_slow_start
5013 			&& !m_peer_choked
5014 			&& m_downloaded_last_second > 0
5015 			&& m_downloaded_last_second + 5000
5016 				>= m_statistics.last_payload_downloaded())
5017 		{
5018 			m_slow_start = false;
5019 #ifndef TORRENT_DISABLE_LOGGING
5020 			if (should_log(peer_log_alert::info))
5021 			{
5022 				peer_log(peer_log_alert::info, "SLOW_START", "exit slow start: "
5023 					"prev-dl: %d dl: %d"
5024 					, int(m_downloaded_last_second)
5025 					, m_statistics.last_payload_downloaded());
5026 			}
5027 #endif
5028 		}
5029 		m_downloaded_last_second = m_statistics.last_payload_downloaded();
5030 		m_uploaded_last_second = m_statistics.last_payload_uploaded();
5031 
5032 		m_statistics.second_tick(tick_interval_ms);
5033 
5034 		if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
5035 		{
5036 			m_upload_rate_peak = m_statistics.upload_payload_rate();
5037 		}
5038 		if (m_statistics.download_payload_rate() > m_download_rate_peak)
5039 		{
5040 			m_download_rate_peak = m_statistics.download_payload_rate();
5041 		}
5042 		if (is_disconnecting()) return;
5043 
5044 		if (!t->ready_for_connections()) return;
5045 
5046 		update_desired_queue_size();
5047 
5048 		if (m_desired_queue_size == m_max_out_request_queue
5049 			&& t->alerts().should_post<performance_alert>())
5050 		{
5051 			t->alerts().emplace_alert<performance_alert>(t->get_handle()
5052 				, performance_alert::outstanding_request_limit_reached);
5053 		}
5054 
5055 		int const piece_timeout = m_settings.get_int(settings_pack::piece_timeout);
5056 
5057 		if (!m_download_queue.empty()
5058 			&& m_quota[download_channel] > 0
5059 			&& now - m_last_piece > seconds(piece_timeout))
5060 		{
5061 			// this peer isn't sending the pieces we've
5062 			// requested (this has been observed by BitComet)
5063 			// in this case we'll clear our download queue and
5064 			// re-request the blocks.
5065 #ifndef TORRENT_DISABLE_LOGGING
5066 			if (should_log(peer_log_alert::info))
5067 			{
5068 				peer_log(peer_log_alert::info, "PIECE_REQUEST_TIMED_OUT"
5069 					, "%d time: %d to: %d"
5070 					, int(m_download_queue.size()), int(total_seconds(now - m_last_piece))
5071 					, piece_timeout);
5072 			}
5073 #endif
5074 
5075 			snub_peer();
5076 		}
5077 
5078 		fill_send_buffer();
5079 	}
5080 
snub_peer()5081 	void peer_connection::snub_peer()
5082 	{
5083 		TORRENT_ASSERT(is_single_thread());
5084 		INVARIANT_CHECK;
5085 
5086 		std::shared_ptr<torrent> t = m_torrent.lock();
5087 		TORRENT_ASSERT(t);
5088 
5089 		if (!m_snubbed)
5090 		{
5091 			m_snubbed = true;
5092 			m_slow_start = false;
5093 			if (t->alerts().should_post<peer_snubbed_alert>())
5094 			{
5095 				t->alerts().emplace_alert<peer_snubbed_alert>(t->get_handle()
5096 					, m_remote, m_peer_id);
5097 			}
5098 		}
5099 		m_desired_queue_size = 1;
5100 
5101 		if (on_parole()) return;
5102 
5103 		if (!t->has_picker()) return;
5104 		piece_picker& picker = t->picker();
5105 
5106 		// first, if we have any unsent requests, just
5107 		// wipe those out
5108 		while (!m_request_queue.empty())
5109 		{
5110 			t->picker().abort_download(m_request_queue.back().block, peer_info_struct());
5111 			m_request_queue.pop_back();
5112 		}
5113 		m_queued_time_critical = 0;
5114 
5115 		TORRENT_ASSERT(!m_download_queue.empty());
5116 
5117 		// time out the last request eligible
5118 		// block in the queue
5119 		int i = int(m_download_queue.size()) - 1;
5120 		for (; i >= 0; --i)
5121 		{
5122 			if (!m_download_queue[i].timed_out
5123 				&& !m_download_queue[i].not_wanted)
5124 				break;
5125 		}
5126 
5127 		if (i >= 0)
5128 		{
5129 			pending_block& qe = m_download_queue[i];
5130 			piece_block const r = qe.block;
5131 
5132 			// only cancel a request if it blocks the piece from being completed
5133 			// (i.e. no free blocks to request from it)
5134 			piece_picker::downloading_piece p;
5135 			picker.piece_info(qe.block.piece_index, p);
5136 			int const free_blocks = picker.blocks_in_piece(qe.block.piece_index)
5137 				- p.finished - p.writing - p.requested;
5138 
5139 			// if there are still blocks available for other peers to pick, we're
5140 			// still not holding up the completion of the piece and there's no
5141 			// need to cancel the requests. For more information, see:
5142 			// http://blog.libtorrent.org/2011/11/block-request-time-outs/
5143 			if (free_blocks > 0)
5144 			{
5145 				send_block_requests();
5146 				return;
5147 			}
5148 
5149 			if (t->alerts().should_post<block_timeout_alert>())
5150 			{
5151 				t->alerts().emplace_alert<block_timeout_alert>(t->get_handle()
5152 					, remote(), pid(), qe.block.block_index
5153 					, qe.block.piece_index);
5154 			}
5155 
5156 			// request a new block before removing the previous
5157 			// one, in order to prevent it from
5158 			// picking the same block again, stalling the
5159 			// same piece indefinitely.
5160 			m_desired_queue_size = 2;
5161 			if (request_a_block(*t, *this))
5162 				m_counters.inc_stats_counter(counters::snubbed_piece_picks);
5163 
5164 			// the block we just picked (potentially)
5165 			// hasn't been put in m_download_queue yet.
5166 			// it's in m_request_queue and will be sent
5167 			// once send_block_requests() is called.
5168 
5169 			m_desired_queue_size = 1;
5170 
5171 			qe.timed_out = true;
5172 			picker.abort_download(r, peer_info_struct());
5173 		}
5174 
5175 		send_block_requests();
5176 	}
5177 
fill_send_buffer()5178 	void peer_connection::fill_send_buffer()
5179 	{
5180 		TORRENT_ASSERT(is_single_thread());
5181 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
5182 		INVARIANT_CHECK;
5183 #endif
5184 
5185 #ifndef TORRENT_DISABLE_SHARE_MODE
5186 		bool sent_a_piece = false;
5187 #endif
5188 		std::shared_ptr<torrent> t = m_torrent.lock();
5189 		if (!t || t->is_aborted() || m_requests.empty()) return;
5190 
5191 		// only add new piece-chunks if the send buffer is small enough
5192 		// otherwise there will be no end to how large it will be!
5193 
5194 		int buffer_size_watermark = int(std::int64_t(m_uploaded_last_second)
5195 			* m_settings.get_int(settings_pack::send_buffer_watermark_factor) / 100);
5196 
5197 		if (buffer_size_watermark < m_settings.get_int(settings_pack::send_buffer_low_watermark))
5198 		{
5199 			buffer_size_watermark = m_settings.get_int(settings_pack::send_buffer_low_watermark);
5200 		}
5201 		else if (buffer_size_watermark > m_settings.get_int(settings_pack::send_buffer_watermark))
5202 		{
5203 			buffer_size_watermark = m_settings.get_int(settings_pack::send_buffer_watermark);
5204 		}
5205 
5206 #ifndef TORRENT_DISABLE_LOGGING
5207 		if (should_log(peer_log_alert::outgoing))
5208 		{
5209 			peer_log(peer_log_alert::outgoing, "SEND_BUFFER_WATERMARK"
5210 				, "current watermark: %d max: %d min: %d factor: %d uploaded: %d B/s"
5211 				, buffer_size_watermark
5212 				, m_ses.settings().get_int(settings_pack::send_buffer_watermark)
5213 				, m_ses.settings().get_int(settings_pack::send_buffer_low_watermark)
5214 				, m_ses.settings().get_int(settings_pack::send_buffer_watermark_factor)
5215 				, int(m_uploaded_last_second));
5216 		}
5217 #endif
5218 
5219 		// don't just pop the front element here, since in seed mode one request may
5220 		// be blocked because we have to verify the hash first, so keep going with the
5221 		// next request. However, only let each peer have one hash verification outstanding
5222 		// at any given time
5223 		for (int i = 0; i < int(m_requests.size())
5224 			&& (send_buffer_size() + m_reading_bytes < buffer_size_watermark); ++i)
5225 		{
5226 			TORRENT_ASSERT(t->ready_for_connections());
5227 			peer_request& r = m_requests[i];
5228 
5229 			TORRENT_ASSERT(r.piece >= piece_index_t(0));
5230 			TORRENT_ASSERT(r.piece < piece_index_t(m_have_piece.size()));
5231 			TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
5232 			TORRENT_ASSERT(r.length > 0 && r.start >= 0);
5233 
5234 			if (t->is_deleted())
5235 			{
5236 #ifndef TORRENT_DISABLE_LOGGING
5237 				peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE"
5238 					, "piece: %d s: %x l: %x torrent deleted"
5239 					, static_cast<int>(r.piece), r.start , r.length);
5240 #endif
5241 				write_reject_request(r);
5242 				continue;
5243 			}
5244 
5245 			bool const seed_mode = t->seed_mode();
5246 
5247 			if (seed_mode
5248 				&& !t->verified_piece(r.piece)
5249 				&& !m_settings.get_bool(settings_pack::disable_hash_checks))
5250 			{
5251 				// we're still verifying the hash of this piece
5252 				// so we can't return it yet.
5253 				if (t->verifying_piece(r.piece)) continue;
5254 
5255 				// only have three outstanding hash check per peer
5256 				if (m_outstanding_piece_verification >= 3) continue;
5257 
5258 				++m_outstanding_piece_verification;
5259 
5260 #ifndef TORRENT_DISABLE_LOGGING
5261 				peer_log(peer_log_alert::info, "SEED_MODE_FILE_ASYNC_HASH"
5262 					, "piece: %d", static_cast<int>(r.piece));
5263 #endif
5264 				// this means we're in seed mode and we haven't yet
5265 				// verified this piece (r.piece)
5266 				auto conn = self();
5267 				m_disk_thread.async_hash(t->storage(), r.piece, {}
5268 					, [conn](piece_index_t p, sha1_hash const& ph, storage_error const& e) {
5269 					conn->wrap(&peer_connection::on_seed_mode_hashed, p, ph, e); });
5270 				t->verifying(r.piece);
5271 				continue;
5272 			}
5273 
5274 			if (!t->has_piece_passed(r.piece) && !seed_mode)
5275 			{
5276 #ifndef TORRENT_DISABLE_PREDICTIVE_PIECES
5277 				// we don't have this piece yet, but we anticipate to have
5278 				// it very soon, so we have told our peers we have it.
5279 				// hold off on sending it. If the piece fails later
5280 				// we will reject this request
5281 				if (t->is_predictive_piece(r.piece)) continue;
5282 #endif
5283 #ifndef TORRENT_DISABLE_LOGGING
5284 				peer_log(peer_log_alert::outgoing_message, "REJECT_PIECE"
5285 					, "piece: %d s: %x l: %x piece not passed hash check"
5286 					, static_cast<int>(r.piece), r.start , r.length);
5287 #endif
5288 				write_reject_request(r);
5289 			}
5290 			else
5291 			{
5292 #ifndef TORRENT_DISABLE_LOGGING
5293 				peer_log(peer_log_alert::info, "FILE_ASYNC_READ"
5294 					, "piece: %d s: %x l: %x", static_cast<int>(r.piece), r.start, r.length);
5295 #endif
5296 				m_reading_bytes += r.length;
5297 #ifndef TORRENT_DISABLE_SHARE_MODE
5298 				sent_a_piece = true;
5299 #endif
5300 
5301 				// the callback function may be called immediately, instead of being posted
5302 
5303 				TORRENT_ASSERT(t->valid_metadata());
5304 				TORRENT_ASSERT(r.piece >= piece_index_t(0));
5305 				TORRENT_ASSERT(r.piece < t->torrent_file().end_piece());
5306 
5307 				auto conn = self();
5308 				m_disk_thread.async_read(t->storage(), r
5309 					, [conn, r](disk_buffer_holder buf, disk_job_flags_t f, storage_error const& ec)
5310 					{ conn->wrap(&peer_connection::on_disk_read_complete, std::move(buf), f, ec, r, clock_type::now()); });
5311 			}
5312 			m_last_sent_payload = clock_type::now();
5313 			m_requests.erase(m_requests.begin() + i);
5314 
5315 			if (m_requests.empty())
5316 				m_counters.inc_stats_counter(counters::num_peers_up_requests, -1);
5317 
5318 			--i;
5319 		}
5320 
5321 #ifndef TORRENT_DISABLE_SHARE_MODE
5322 		if (t->share_mode() && sent_a_piece)
5323 			t->recalc_share_mode();
5324 #endif
5325 	}
5326 
5327 	// this is called when a previously unchecked piece has been
5328 	// checked, while in seed-mode
on_seed_mode_hashed(piece_index_t const piece,sha1_hash const & piece_hash,storage_error const & error)5329 	void peer_connection::on_seed_mode_hashed(piece_index_t const piece
5330 		, sha1_hash const& piece_hash, storage_error const& error)
5331 	{
5332 		TORRENT_ASSERT(is_single_thread());
5333 		INVARIANT_CHECK;
5334 
5335 		std::shared_ptr<torrent> t = m_torrent.lock();
5336 
5337 		TORRENT_ASSERT(m_outstanding_piece_verification > 0);
5338 		--m_outstanding_piece_verification;
5339 
5340 		if (!t || t->is_aborted()) return;
5341 
5342 		if (error)
5343 		{
5344 			t->handle_disk_error("hash", error, this);
5345 			t->leave_seed_mode(torrent::seed_mode_t::check_files);
5346 			return;
5347 		}
5348 
5349 		// we're using the piece hashes here, we need the torrent to be loaded
5350 		if (!m_settings.get_bool(settings_pack::disable_hash_checks)
5351 			&& piece_hash != t->torrent_file().hash_for_piece(piece))
5352 		{
5353 #ifndef TORRENT_DISABLE_LOGGING
5354 			peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH"
5355 				, "piece: %d failed", static_cast<int>(piece));
5356 #endif
5357 
5358 			t->leave_seed_mode(torrent::seed_mode_t::check_files);
5359 		}
5360 		else
5361 		{
5362 			if (t->seed_mode())
5363 			{
5364 				TORRENT_ASSERT(t->verifying_piece(piece));
5365 				t->verified(piece);
5366 			}
5367 
5368 #ifndef TORRENT_DISABLE_LOGGING
5369 			peer_log(peer_log_alert::info, "SEED_MODE_FILE_HASH"
5370 				, "piece: %d passed", static_cast<int>(piece));
5371 #endif
5372 			if (t->seed_mode() && t->all_verified())
5373 				t->leave_seed_mode(torrent::seed_mode_t::skip_checking);
5374 		}
5375 
5376 		// try to service the requests again, now that the piece
5377 		// has been verified
5378 		fill_send_buffer();
5379 	}
5380 
on_disk_read_complete(disk_buffer_holder buffer,disk_job_flags_t const flags,storage_error const & error,peer_request const & r,time_point const issue_time)5381 	void peer_connection::on_disk_read_complete(disk_buffer_holder buffer
5382 		, disk_job_flags_t const flags, storage_error const& error
5383 		, peer_request const& r, time_point const issue_time)
5384 	{
5385 		TORRENT_ASSERT(is_single_thread());
5386 		// return value:
5387 		// 0: success, piece passed hash check
5388 		// -1: disk failure
5389 
5390 		int const disk_rtt = int(total_microseconds(clock_type::now() - issue_time));
5391 
5392 #ifndef TORRENT_DISABLE_LOGGING
5393 		if (should_log(peer_log_alert::info))
5394 		{
5395 			peer_log(peer_log_alert::info, "FILE_ASYNC_READ_COMPLETE"
5396 				, "piece: %d s: %x l: %x b: %p c: %s e: %s rtt: %d us"
5397 				, static_cast<int>(r.piece), r.start, r.length
5398 				, static_cast<void*>(buffer.get())
5399 				, ((flags & disk_interface::cache_hit) ? "cache hit" : "cache miss")
5400 				, error.ec.message().c_str(), disk_rtt);
5401 		}
5402 #endif
5403 
5404 		m_reading_bytes -= r.length;
5405 
5406 		std::shared_ptr<torrent> t = m_torrent.lock();
5407 		if (error)
5408 		{
5409 			if (!t)
5410 			{
5411 				disconnect(error.ec, operation_t::file_read);
5412 				return;
5413 			}
5414 
5415 			write_dont_have(r.piece);
5416 			write_reject_request(r);
5417 			if (t->alerts().should_post<file_error_alert>())
5418 				t->alerts().emplace_alert<file_error_alert>(error.ec
5419 					, t->resolve_filename(error.file())
5420 					, error.operation, t->get_handle());
5421 
5422 			++m_disk_read_failures;
5423 			if (m_disk_read_failures > 100) disconnect(error.ec, operation_t::file_read);
5424 			return;
5425 		}
5426 
5427 		// we're only interested in failures in a row.
5428 		// if we every now and then successfully send a
5429 		// block, the peer is still useful
5430 		m_disk_read_failures = 0;
5431 
5432 		if (t && m_settings.get_int(settings_pack::suggest_mode)
5433 			== settings_pack::suggest_read_cache)
5434 		{
5435 			// tell the torrent that we just read a block from this piece.
5436 			// if this piece is low-availability, it's now a candidate for being
5437 			// suggested to other peers
5438 			t->add_suggest_piece(r.piece);
5439 		}
5440 
5441 		if (m_disconnecting) return;
5442 
5443 		if (!t)
5444 		{
5445 			disconnect(error.ec, operation_t::file_read);
5446 			return;
5447 		}
5448 
5449 #ifndef TORRENT_DISABLE_LOGGING
5450 		peer_log(peer_log_alert::outgoing_message
5451 			, "PIECE", "piece: %d s: %x l: %x"
5452 			, static_cast<int>(r.piece), r.start, r.length);
5453 #endif
5454 
5455 		m_counters.blend_stats_counter(counters::request_latency, disk_rtt, 5);
5456 
5457 		// we probably just pulled this piece into the cache.
5458 		// if it's rare enough to make it into the suggested piece
5459 		// push another piece out
5460 		if (m_settings.get_int(settings_pack::suggest_mode) == settings_pack::suggest_read_cache
5461 			&& !(flags & disk_interface::cache_hit))
5462 		{
5463 			t->add_suggest_piece(r.piece);
5464 		}
5465 		write_piece(r, std::move(buffer));
5466 	}
5467 
assign_bandwidth(int const channel,int const amount)5468 	void peer_connection::assign_bandwidth(int const channel, int const amount)
5469 	{
5470 		TORRENT_ASSERT(is_single_thread());
5471 #ifndef TORRENT_DISABLE_LOGGING
5472 		peer_log(channel == upload_channel
5473 			? peer_log_alert::outgoing : peer_log_alert::incoming
5474 			, "ASSIGN_BANDWIDTH", "bytes: %d", amount);
5475 #endif
5476 
5477 		TORRENT_ASSERT(amount > 0 || is_disconnecting());
5478 		m_quota[channel] += amount;
5479 		TORRENT_ASSERT(m_channel_state[channel] & peer_info::bw_limit);
5480 		m_channel_state[channel] &= ~peer_info::bw_limit;
5481 
5482 #if TORRENT_USE_INVARIANT_CHECKS
5483 		check_invariant();
5484 #endif
5485 
5486 		if (is_disconnecting()) return;
5487 		if (channel == upload_channel)
5488 		{
5489 			setup_send();
5490 		}
5491 		else if (channel == download_channel)
5492 		{
5493 			setup_receive();
5494 		}
5495 	}
5496 
5497 	// the number of bytes we expect to receive, or want to send
5498 	// channel either refer to upload or download. This is used
5499 	// by the rate limiter to allocate quota for this peer
wanted_transfer(int const channel)5500 	int peer_connection::wanted_transfer(int const channel)
5501 	{
5502 		TORRENT_ASSERT(is_single_thread());
5503 
5504 		const int tick_interval = std::max(1, m_settings.get_int(settings_pack::tick_interval));
5505 
5506 		if (channel == download_channel)
5507 		{
5508 			std::int64_t const download_rate = std::int64_t(m_statistics.download_rate()) * 3 / 2;
5509 			return std::max({m_outstanding_bytes + 30
5510 				, m_recv_buffer.packet_bytes_remaining() + 30
5511 				, int(download_rate * tick_interval / 1000)});
5512 		}
5513 		else
5514 		{
5515 			std::int64_t const upload_rate = std::int64_t(m_statistics.upload_rate()) * 2;
5516 			return std::max({m_reading_bytes
5517 				, m_send_buffer.size()
5518 				, int(upload_rate * tick_interval / 1000)});
5519 		}
5520 	}
5521 
request_bandwidth(int const channel,int bytes)5522 	int peer_connection::request_bandwidth(int const channel, int bytes)
5523 	{
5524 		TORRENT_ASSERT(is_single_thread());
5525 		INVARIANT_CHECK;
5526 
5527 		// we can only have one outstanding bandwidth request at a time
5528 		if (m_channel_state[channel] & peer_info::bw_limit) return 0;
5529 
5530 		std::shared_ptr<torrent> t = m_torrent.lock();
5531 
5532 		bytes = std::max(wanted_transfer(channel), bytes);
5533 
5534 		// we already have enough quota
5535 		if (m_quota[channel] >= bytes) return 0;
5536 
5537 		// deduct the bytes we already have quota for
5538 		bytes -= m_quota[channel];
5539 
5540 		int const priority = get_priority(channel);
5541 
5542 		int const max_channels = num_classes() + (t ? t->num_classes() : 0) + 2;
5543 		TORRENT_ALLOCA(channels, bandwidth_channel*, max_channels);
5544 
5545 		// collect the pointers to all bandwidth channels
5546 		// that apply to this torrent
5547 		int c = 0;
5548 
5549 		c += m_ses.copy_pertinent_channels(*this, channel
5550 			, channels.subspan(c).data(), max_channels - c);
5551 		if (t)
5552 		{
5553 			c += m_ses.copy_pertinent_channels(*t, channel
5554 				, channels.subspan(c).data(), max_channels - c);
5555 		}
5556 
5557 #if TORRENT_USE_ASSERTS
5558 		// make sure we don't have duplicates
5559 		std::set<bandwidth_channel*> unique_classes;
5560 		for (int i = 0; i < c; ++i)
5561 		{
5562 			TORRENT_ASSERT(unique_classes.count(channels[i]) == 0);
5563 			unique_classes.insert(channels[i]);
5564 		}
5565 #endif
5566 
5567 		TORRENT_ASSERT(!(m_channel_state[channel] & peer_info::bw_limit));
5568 
5569 		bandwidth_manager* manager = m_ses.get_bandwidth_manager(channel);
5570 
5571 		int const ret = manager->request_bandwidth(self()
5572 			, bytes, priority, channels.data(), c);
5573 
5574 		if (ret == 0)
5575 		{
5576 #ifndef TORRENT_DISABLE_LOGGING
5577 			auto const dir = channel == download_channel ? peer_log_alert::incoming
5578 				: peer_log_alert::outgoing;
5579 			if (should_log(dir))
5580 			{
5581 				peer_log(dir,
5582 					"REQUEST_BANDWIDTH", "bytes: %d quota: %d wanted_transfer: %d "
5583 					"prio: %d num_channels: %d", bytes, m_quota[channel]
5584 					, wanted_transfer(channel), priority, c);
5585 			}
5586 #endif
5587 			m_channel_state[channel] |= peer_info::bw_limit;
5588 		}
5589 		else
5590 		{
5591 			m_quota[channel] += ret;
5592 		}
5593 
5594 		return ret;
5595 	}
5596 
setup_send()5597 	void peer_connection::setup_send()
5598 	{
5599 		TORRENT_ASSERT(is_single_thread());
5600 
5601 		if (m_disconnecting || m_send_buffer.empty()) return;
5602 
5603 		// we may want to request more quota at this point
5604 		request_bandwidth(upload_channel);
5605 
5606 		// if we already have an outstanding send operation, don't issue another
5607 		// one, instead accrue more send buffer to coalesce for the next write
5608 		if (m_channel_state[upload_channel] & peer_info::bw_network)
5609 		{
5610 #ifndef TORRENT_DISABLE_LOGGING
5611 			peer_log(peer_log_alert::outgoing, "CORKED_WRITE", "bytes: %d"
5612 				, m_send_buffer.size());
5613 #endif
5614 			return;
5615 		}
5616 
5617 		if (m_send_barrier == 0)
5618 		{
5619 			std::vector<span<char>> vec;
5620 			// limit outgoing crypto messages to 1MB
5621 			int const send_bytes = std::min(m_send_buffer.size(), 1024 * 1024);
5622 			m_send_buffer.build_mutable_iovec(send_bytes, vec);
5623 			int next_barrier;
5624 			span<span<char const>> inject_vec;
5625 			std::tie(next_barrier, inject_vec) = hit_send_barrier(vec);
5626 			for (auto i = inject_vec.rbegin(); i != inject_vec.rend(); ++i)
5627 			{
5628 				// this const_cast is a here because chained_buffer need to be
5629 				// fixed.
5630 				auto* ptr = const_cast<char*>(i->data());
5631 				m_send_buffer.prepend_buffer(span<char>(ptr, i->size())
5632 					, static_cast<int>(i->size()));
5633 			}
5634 			set_send_barrier(next_barrier);
5635 		}
5636 
5637 		if ((m_quota[upload_channel] == 0 || m_send_barrier == 0)
5638 			&& !m_send_buffer.empty()
5639 			&& !m_connecting)
5640 		{
5641 			return;
5642 		}
5643 
5644 		int const quota_left = m_quota[upload_channel];
5645 		if (m_send_buffer.empty()
5646 			&& m_reading_bytes > 0
5647 			&& quota_left > 0)
5648 		{
5649 			if (!(m_channel_state[upload_channel] & peer_info::bw_disk))
5650 				m_counters.inc_stats_counter(counters::num_peers_up_disk);
5651 			m_channel_state[upload_channel] |= peer_info::bw_disk;
5652 #ifndef TORRENT_DISABLE_LOGGING
5653 			peer_log(peer_log_alert::outgoing, "WAITING_FOR_DISK", "outstanding: %d"
5654 				, m_reading_bytes);
5655 #endif
5656 
5657 			if (!m_connecting
5658 				&& !m_requests.empty()
5659 				&& m_reading_bytes > m_settings.get_int(settings_pack::send_buffer_watermark) - 0x4000)
5660 			{
5661 				std::shared_ptr<torrent> t = m_torrent.lock();
5662 
5663 				// we're stalled on the disk. We want to write and we can write
5664 				// but our send buffer is empty, waiting to be refilled from the disk
5665 				// this either means the disk is slower than the network connection
5666 				// or that our send buffer watermark is too small, because we can
5667 				// send it all before the disk gets back to us. That's why we only
5668 				// trigger this if we've also filled the allowed send buffer. The
5669 				// first request would not fill it all the way up because of the
5670 				// upload rate being virtually 0. If m_requests is empty, it doesn't
5671 				// matter anyway, because we don't have any more requests from the
5672 				// peer to hang on to the disk
5673 				if (t && t->alerts().should_post<performance_alert>())
5674 				{
5675 					t->alerts().emplace_alert<performance_alert>(t->get_handle()
5676 						, performance_alert::send_buffer_watermark_too_low);
5677 				}
5678 			}
5679 		}
5680 		else
5681 		{
5682 			if (m_channel_state[upload_channel] & peer_info::bw_disk)
5683 				m_counters.inc_stats_counter(counters::num_peers_up_disk, -1);
5684 			m_channel_state[upload_channel] &= ~peer_info::bw_disk;
5685 		}
5686 
5687 		if (!can_write())
5688 		{
5689 #ifndef TORRENT_DISABLE_LOGGING
5690 			if (should_log(peer_log_alert::outgoing))
5691 			{
5692 				if (m_send_buffer.empty())
5693 				{
5694 					peer_log(peer_log_alert::outgoing, "SEND_BUFFER_DEPLETED"
5695 						, "quota: %d buf: %d connecting: %s disconnecting: %s "
5696 						"pending_disk: %d piece-requests: %d"
5697 						, m_quota[upload_channel]
5698 						, m_send_buffer.size(), m_connecting?"yes":"no"
5699 						, m_disconnecting?"yes":"no", m_reading_bytes
5700 						, int(m_requests.size()));
5701 				}
5702 				else
5703 				{
5704 					peer_log(peer_log_alert::outgoing, "CANNOT_WRITE"
5705 						, "quota: %d buf: %d connecting: %s disconnecting: %s "
5706 						"pending_disk: %d"
5707 						, m_quota[upload_channel]
5708 						, m_send_buffer.size(), m_connecting?"yes":"no"
5709 						, m_disconnecting?"yes":"no", m_reading_bytes);
5710 				}
5711 			}
5712 #endif
5713 			return;
5714 		}
5715 
5716 		int const amount_to_send = std::min({
5717 			m_send_buffer.size()
5718 			, quota_left
5719 			, m_send_barrier});
5720 
5721 		TORRENT_ASSERT(amount_to_send > 0);
5722 
5723 		TORRENT_ASSERT(!(m_channel_state[upload_channel] & peer_info::bw_network));
5724 #ifndef TORRENT_DISABLE_LOGGING
5725 		peer_log(peer_log_alert::outgoing, "ASYNC_WRITE", "bytes: %d", amount_to_send);
5726 #endif
5727 		auto const vec = m_send_buffer.build_iovec(amount_to_send);
5728 		ADD_OUTSTANDING_ASYNC("peer_connection::on_send_data");
5729 
5730 #if TORRENT_USE_ASSERTS
5731 		TORRENT_ASSERT(!m_socket_is_writing);
5732 		m_socket_is_writing = true;
5733 #endif
5734 
5735 		auto conn = self();
5736 		m_socket->async_write_some(vec, make_handler(
5737 				std::bind(&peer_connection::on_send_data, conn, _1, _2)
5738 				, m_write_handler_storage, *this));
5739 
5740 		m_channel_state[upload_channel] |= peer_info::bw_network;
5741 		m_last_sent = aux::time_now();
5742 	}
5743 
on_disk()5744 	void peer_connection::on_disk()
5745 	{
5746 		TORRENT_ASSERT(is_single_thread());
5747 		if (!(m_channel_state[download_channel] & peer_info::bw_disk)) return;
5748 		std::shared_ptr<peer_connection> me(self());
5749 
5750 #ifndef TORRENT_DISABLE_LOGGING
5751 		peer_log(peer_log_alert::info, "DISK", "dropped below disk buffer watermark");
5752 #endif
5753 		m_counters.inc_stats_counter(counters::num_peers_down_disk, -1);
5754 		m_channel_state[download_channel] &= ~peer_info::bw_disk;
5755 		setup_receive();
5756 	}
5757 
setup_receive()5758 	void peer_connection::setup_receive()
5759 	{
5760 		TORRENT_ASSERT(is_single_thread());
5761 		INVARIANT_CHECK;
5762 
5763 		if (m_disconnecting) return;
5764 
5765 		if (m_recv_buffer.capacity() < 100
5766 			&& m_recv_buffer.max_receive() == 0)
5767 		{
5768 			m_recv_buffer.reserve(100);
5769 		}
5770 
5771 		// we may want to request more quota at this point
5772 		int const buffer_size = m_recv_buffer.max_receive();
5773 		request_bandwidth(download_channel, buffer_size);
5774 
5775 		if (m_channel_state[download_channel] & peer_info::bw_network) return;
5776 
5777 		if (m_quota[download_channel] == 0
5778 			&& !m_connecting)
5779 		{
5780 			return;
5781 		}
5782 
5783 		if (!can_read())
5784 		{
5785 #ifndef TORRENT_DISABLE_LOGGING
5786 			if (should_log(peer_log_alert::incoming))
5787 			{
5788 				peer_log(peer_log_alert::incoming, "CANNOT_READ", "quota: %d  "
5789 					"can-write-to-disk: %s queue-limit: %d disconnecting: %s "
5790 					" connecting: %s"
5791 					, m_quota[download_channel]
5792 					, ((m_channel_state[download_channel] & peer_info::bw_disk)?"no":"yes")
5793 					, m_settings.get_int(settings_pack::max_queued_disk_bytes)
5794 					, (m_disconnecting?"yes":"no")
5795 					, (m_connecting?"yes":"no"));
5796 			}
5797 #endif
5798 			// if we block reading, waiting for the disk, we will wake up
5799 			// by the disk_io_thread posting a message every time it drops
5800 			// from being at or exceeding the limit down to below the limit
5801 			return;
5802 		}
5803 		TORRENT_ASSERT(m_connected);
5804 		if (m_quota[download_channel] == 0) return;
5805 
5806 		int const quota_left = m_quota[download_channel];
5807 		int const max_receive = std::min(buffer_size, quota_left);
5808 
5809 		if (max_receive == 0) return;
5810 
5811 		span<char> const vec = m_recv_buffer.reserve(max_receive);
5812 		TORRENT_ASSERT(!(m_channel_state[download_channel] & peer_info::bw_network));
5813 		m_channel_state[download_channel] |= peer_info::bw_network;
5814 #ifndef TORRENT_DISABLE_LOGGING
5815 		peer_log(peer_log_alert::incoming, "ASYNC_READ"
5816 			, "max: %d bytes", max_receive);
5817 #endif
5818 
5819 		ADD_OUTSTANDING_ASYNC("peer_connection::on_receive_data");
5820 		auto conn = self();
5821 		m_socket->async_read_some(
5822 			boost::asio::mutable_buffers_1(vec.data(), std::size_t(vec.size())), make_handler(
5823 				std::bind(&peer_connection::on_receive_data, conn, _1, _2)
5824 				, m_read_handler_storage, *this));
5825 	}
5826 
downloading_piece_progress() const5827 	piece_block_progress peer_connection::downloading_piece_progress() const
5828 	{
5829 #ifndef TORRENT_DISABLE_LOGGING
5830 		peer_log(peer_log_alert::info, "ERROR"
5831 			, "downloading_piece_progress() dispatched to the base class!");
5832 #endif
5833 		return {};
5834 	}
5835 
send_buffer(span<char const> buf)5836 	void peer_connection::send_buffer(span<char const> buf)
5837 	{
5838 		TORRENT_ASSERT(is_single_thread());
5839 
5840 		int const free_space = std::min(
5841 			m_send_buffer.space_in_last_buffer(), int(buf.size()));
5842 		if (free_space > 0)
5843 		{
5844 			char* dst = m_send_buffer.append(buf.first(free_space));
5845 
5846 			// this should always succeed, because we checked how much space
5847 			// there was up-front
5848 			TORRENT_UNUSED(dst);
5849 			TORRENT_ASSERT(dst != nullptr);
5850 			buf = buf.subspan(free_space);
5851 		}
5852 		if (buf.empty()) return;
5853 
5854 		// allocate a buffer and initialize the beginning of it with 'buf'
5855 		buffer snd_buf(std::max(int(buf.size()), 128), buf);
5856 		m_send_buffer.append_buffer(std::move(snd_buf), int(buf.size()));
5857 
5858 		setup_send();
5859 	}
5860 
5861 	// --------------------------
5862 	// RECEIVE DATA
5863 	// --------------------------
5864 
account_received_bytes(int const bytes_transferred)5865 	void peer_connection::account_received_bytes(int const bytes_transferred)
5866 	{
5867 		// tell the receive buffer we just fed it this many bytes of incoming data
5868 		TORRENT_ASSERT(bytes_transferred > 0);
5869 		m_recv_buffer.received(bytes_transferred);
5870 
5871 		// update the dl quota
5872 		TORRENT_ASSERT(bytes_transferred <= m_quota[download_channel]);
5873 		m_quota[download_channel] -= bytes_transferred;
5874 
5875 		// account receiver buffer size stats to the session
5876 		m_ses.received_buffer(bytes_transferred);
5877 
5878 		// estimate transport protocol overhead
5879 		trancieve_ip_packet(bytes_transferred, is_v6(m_remote));
5880 
5881 #ifndef TORRENT_DISABLE_LOGGING
5882 		peer_log(peer_log_alert::incoming, "READ"
5883 			, "%d bytes", bytes_transferred);
5884 #endif
5885 	}
5886 
on_receive_data(const error_code & error,std::size_t bytes_transferred)5887 	void peer_connection::on_receive_data(const error_code& error
5888 		, std::size_t bytes_transferred)
5889 	{
5890 		TORRENT_ASSERT(is_single_thread());
5891 		COMPLETE_ASYNC("peer_connection::on_receive_data");
5892 
5893 #ifndef TORRENT_DISABLE_LOGGING
5894 		if (should_log(peer_log_alert::incoming))
5895 		{
5896 			peer_log(peer_log_alert::incoming, "ON_RECEIVE_DATA"
5897 				, "bytes: %d %s"
5898 				, int(bytes_transferred), print_error(error).c_str());
5899 		}
5900 #endif
5901 
5902 		// leave this bit set until we're done looping, reading from the socket.
5903 		// that way we don't trigger any async read calls until the end of this
5904 		// function.
5905 		TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
5906 
5907 		TORRENT_ASSERT(bytes_transferred > 0 || error);
5908 
5909 		m_counters.inc_stats_counter(counters::on_read_counter);
5910 
5911 		INVARIANT_CHECK;
5912 
5913 		if (error)
5914 		{
5915 #ifndef TORRENT_DISABLE_LOGGING
5916 			if (should_log(peer_log_alert::info))
5917 			{
5918 				peer_log(peer_log_alert::info, "ERROR"
5919 					, "in peer_connection::on_receive_data_impl %s"
5920 					, print_error(error).c_str());
5921 			}
5922 #endif
5923 			on_receive(error, bytes_transferred);
5924 			disconnect(error, operation_t::sock_read);
5925 			return;
5926 		}
5927 
5928 		m_last_receive = aux::time_now();
5929 
5930 		// submit all disk jobs later
5931 		m_ses.deferred_submit_jobs();
5932 
5933 		// keep ourselves alive in until this function exits in
5934 		// case we disconnect
5935 		// this needs to be created before the invariant check,
5936 		// to keep the object alive through the exit check
5937 		std::shared_ptr<peer_connection> me(self());
5938 
5939 		TORRENT_ASSERT(bytes_transferred > 0);
5940 
5941 		// flush the send buffer at the end of this function
5942 		cork _c(*this);
5943 
5944 		// if we received exactly as many bytes as we provided a receive buffer
5945 		// for. There most likely are more bytes to read, and we should grow our
5946 		// receive buffer.
5947 		TORRENT_ASSERT(int(bytes_transferred) <= m_recv_buffer.max_receive());
5948 		bool const grow_buffer = (int(bytes_transferred) == m_recv_buffer.max_receive());
5949 		account_received_bytes(int(bytes_transferred));
5950 
5951 		if (m_extension_outstanding_bytes > 0)
5952 			m_extension_outstanding_bytes -= std::min(m_extension_outstanding_bytes, int(bytes_transferred));
5953 
5954 		check_graceful_pause();
5955 		if (m_disconnecting) return;
5956 
5957 		// this is the case where we try to grow the receive buffer and try to
5958 		// drain the socket
5959 		if (grow_buffer)
5960 		{
5961 			error_code ec;
5962 			int buffer_size = int(m_socket->available(ec));
5963 			if (ec)
5964 			{
5965 				disconnect(ec, operation_t::available);
5966 				return;
5967 			}
5968 
5969 #ifndef TORRENT_DISABLE_LOGGING
5970 			peer_log(peer_log_alert::incoming, "AVAILABLE"
5971 				, "%d bytes", buffer_size);
5972 #endif
5973 
5974 			request_bandwidth(download_channel, buffer_size);
5975 
5976 			int const quota_left = m_quota[download_channel];
5977 			if (buffer_size > quota_left) buffer_size = quota_left;
5978 			if (buffer_size > 0)
5979 			{
5980 				span<char> const vec = m_recv_buffer.reserve(buffer_size);
5981 				std::size_t const bytes = m_socket->read_some(
5982 					boost::asio::mutable_buffers_1(vec.data(), std::size_t(vec.size())), ec);
5983 
5984 				// this is weird. You would imagine read_some() would do this
5985 				if (bytes == 0 && !ec) ec = boost::asio::error::eof;
5986 
5987 #ifndef TORRENT_DISABLE_LOGGING
5988 				if (should_log(peer_log_alert::incoming))
5989 				{
5990 					peer_log(peer_log_alert::incoming, "SYNC_READ", "max: %d ret: %d e: %s"
5991 						, buffer_size, int(bytes), ec ? ec.message().c_str() : "");
5992 				}
5993 #endif
5994 
5995 				TORRENT_ASSERT(bytes > 0 || ec);
5996 				if (ec)
5997 				{
5998 					if (ec != boost::asio::error::would_block
5999 						&& ec != boost::asio::error::try_again)
6000 					{
6001 						disconnect(ec, operation_t::sock_read);
6002 						return;
6003 					}
6004 				}
6005 				else
6006 				{
6007 					account_received_bytes(int(bytes));
6008 					bytes_transferred += bytes;
6009 				}
6010 			}
6011 		}
6012 
6013 		// feed bytes in receive buffer to upper layer by calling on_receive()
6014 
6015 		bool const prev_choked = m_peer_choked;
6016 		int bytes = int(bytes_transferred);
6017 		int sub_transferred = 0;
6018 		do {
6019 			sub_transferred = m_recv_buffer.advance_pos(bytes);
6020 			TORRENT_ASSERT(sub_transferred > 0);
6021 			on_receive(error, std::size_t(sub_transferred));
6022 			bytes -= sub_transferred;
6023 			if (m_disconnecting) return;
6024 		} while (bytes > 0 && sub_transferred > 0);
6025 
6026 		// if the peer went from unchoked to choked, suggest to the receive
6027 		// buffer that it shrinks to 100 bytes
6028 		int const force_shrink = (m_peer_choked && !prev_choked)
6029 			? 100 : 0;
6030 		m_recv_buffer.normalize(force_shrink);
6031 
6032 		if (m_recv_buffer.max_receive() == 0)
6033 		{
6034 			// the message we're receiving is larger than our receive
6035 			// buffer, we must grow.
6036 			int const buffer_size_limit
6037 				= m_settings.get_int(settings_pack::max_peer_recv_buffer_size);
6038 			m_recv_buffer.grow(buffer_size_limit);
6039 #ifndef TORRENT_DISABLE_LOGGING
6040 			peer_log(peer_log_alert::incoming, "GROW_BUFFER", "%d bytes"
6041 				, m_recv_buffer.capacity());
6042 #endif
6043 		}
6044 
6045 		TORRENT_ASSERT(m_recv_buffer.pos_at_end());
6046 		TORRENT_ASSERT(m_recv_buffer.packet_size() > 0);
6047 
6048 		if (is_seed())
6049 		{
6050 			std::shared_ptr<torrent> t = m_torrent.lock();
6051 			if (t) t->seen_complete();
6052 		}
6053 
6054 		// allow reading from the socket again
6055 		TORRENT_ASSERT(m_channel_state[download_channel] & peer_info::bw_network);
6056 		m_channel_state[download_channel] &= ~peer_info::bw_network;
6057 
6058 		setup_receive();
6059 	}
6060 
can_write() const6061 	bool peer_connection::can_write() const
6062 	{
6063 		TORRENT_ASSERT(is_single_thread());
6064 		// if we have requests or pending data to be sent or announcements to be made
6065 		// we want to send data
6066 		return !m_send_buffer.empty()
6067 			&& m_quota[upload_channel] > 0
6068 			&& (m_send_barrier > 0)
6069 			&& !m_connecting;
6070 	}
6071 
can_read()6072 	bool peer_connection::can_read()
6073 	{
6074 		TORRENT_ASSERT(is_single_thread());
6075 		INVARIANT_CHECK;
6076 
6077 		std::shared_ptr<torrent> t = m_torrent.lock();
6078 
6079 		bool bw_limit = m_quota[download_channel] > 0;
6080 
6081 		if (!bw_limit) return false;
6082 
6083 		if (m_outstanding_bytes > 0)
6084 		{
6085 			// if we're expecting to download piece data, we might not
6086 			// want to read from the socket in case we're out of disk
6087 			// cache space right now
6088 
6089 			if (m_channel_state[download_channel] & peer_info::bw_disk) return false;
6090 		}
6091 
6092 		return !m_connecting && !m_disconnecting;
6093 	}
6094 
on_connection_complete(error_code const & e)6095 	void peer_connection::on_connection_complete(error_code const& e)
6096 	{
6097 		TORRENT_ASSERT(is_single_thread());
6098 		COMPLETE_ASYNC("peer_connection::on_connection_complete");
6099 
6100 		INVARIANT_CHECK;
6101 
6102 		// if t is nullptr, we better not be connecting, since
6103 		// we can't decrement the connecting counter
6104 		std::shared_ptr<torrent> t = m_torrent.lock();
6105 		TORRENT_ASSERT(t || !m_connecting);
6106 		if (m_connecting)
6107 		{
6108 			m_counters.inc_stats_counter(counters::num_peers_half_open, -1);
6109 			if (t) t->dec_num_connecting(m_peer_info);
6110 			m_connecting = false;
6111 		}
6112 
6113 		if (m_disconnecting) return;
6114 
6115 		if (e)
6116 		{
6117 			connect_failed(e);
6118 			return;
6119 		}
6120 
6121 		TORRENT_ASSERT(!m_connected);
6122 		m_connected = true;
6123 		m_counters.inc_stats_counter(counters::num_peers_connected);
6124 
6125 		if (m_disconnecting) return;
6126 		m_last_receive = aux::time_now();
6127 
6128 		error_code ec;
6129 		m_local = m_socket->local_endpoint(ec);
6130 		if (ec)
6131 		{
6132 			disconnect(ec, operation_t::getname);
6133 			return;
6134 		}
6135 
6136 		// if there are outgoing interfaces specified, verify this
6137 		// peer is correctly bound to one of them
6138 		if (!m_settings.get_str(settings_pack::outgoing_interfaces).empty())
6139 		{
6140 			if (!m_ses.verify_bound_address(m_local.address()
6141 				, is_utp(*m_socket), ec))
6142 			{
6143 				if (ec)
6144 				{
6145 					disconnect(ec, operation_t::get_interface);
6146 					return;
6147 				}
6148 				disconnect(error_code(
6149 					boost::system::errc::no_such_device, generic_category())
6150 					, operation_t::connect);
6151 				return;
6152 			}
6153 		}
6154 
6155 		if (is_utp(*m_socket) && m_peer_info)
6156 		{
6157 			m_peer_info->confirmed_supports_utp = true;
6158 			m_peer_info->supports_utp = false;
6159 		}
6160 
6161 		// this means the connection just succeeded
6162 
6163 		received_synack(is_v6(m_remote));
6164 
6165 		TORRENT_ASSERT(m_socket);
6166 #ifndef TORRENT_DISABLE_LOGGING
6167 		if (should_log(peer_log_alert::outgoing))
6168 		{
6169 			peer_log(peer_log_alert::outgoing, "COMPLETED"
6170 				, "ep: %s", print_endpoint(m_remote).c_str());
6171 		}
6172 #endif
6173 
6174 		// set the socket to non-blocking, so that we can
6175 		// read the entire buffer on each read event we get
6176 #ifndef TORRENT_DISABLE_LOGGING
6177 		peer_log(peer_log_alert::info, "SET_NON_BLOCKING");
6178 #endif
6179 		m_socket->non_blocking(true, ec);
6180 		if (ec)
6181 		{
6182 			disconnect(ec, operation_t::iocontrol);
6183 			return;
6184 		}
6185 
6186 		if (m_remote == m_socket->local_endpoint(ec))
6187 		{
6188 			disconnect(errors::self_connection, operation_t::bittorrent, failure);
6189 			return;
6190 		}
6191 
6192 		if (is_v4(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0)
6193 		{
6194 			error_code err;
6195 			m_socket->set_option(type_of_service(char(m_settings.get_int(settings_pack::peer_tos))), err);
6196 #ifndef TORRENT_DISABLE_LOGGING
6197 			if (should_log(peer_log_alert::outgoing))
6198 			{
6199 				peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s"
6200 					, m_settings.get_int(settings_pack::peer_tos), err.message().c_str());
6201 			}
6202 #endif
6203 		}
6204 #if defined IPV6_TCLASS
6205 		else if (is_v6(m_remote) && m_settings.get_int(settings_pack::peer_tos) != 0)
6206 		{
6207 			error_code err;
6208 			m_socket->set_option(traffic_class(char(m_settings.get_int(settings_pack::peer_tos))), err);
6209 #ifndef TORRENT_DISABLE_LOGGING
6210 			if (should_log(peer_log_alert::outgoing))
6211 			{
6212 				peer_log(peer_log_alert::outgoing, "SET_TOS", "tos: %d e: %s"
6213 					, m_settings.get_int(settings_pack::peer_tos), err.message().c_str());
6214 			}
6215 #endif
6216 		}
6217 #endif
6218 
6219 #ifndef TORRENT_DISABLE_EXTENSIONS
6220 		for (auto const& ext : m_extensions)
6221 		{
6222 			ext->on_connected();
6223 		}
6224 #endif
6225 
6226 		on_connected();
6227 		setup_send();
6228 		setup_receive();
6229 	}
6230 
6231 	// --------------------------
6232 	// SEND DATA
6233 	// --------------------------
6234 
on_send_data(error_code const & error,std::size_t const bytes_transferred)6235 	void peer_connection::on_send_data(error_code const& error
6236 		, std::size_t const bytes_transferred)
6237 	{
6238 		TORRENT_ASSERT(is_single_thread());
6239 		m_counters.inc_stats_counter(counters::on_write_counter);
6240 		m_ses.sent_buffer(int(bytes_transferred));
6241 
6242 #if TORRENT_USE_ASSERTS
6243 		TORRENT_ASSERT(m_socket_is_writing);
6244 		m_socket_is_writing = false;
6245 #endif
6246 
6247 		// submit all disk jobs when we've processed all messages
6248 		// in the current message queue
6249 		m_ses.deferred_submit_jobs();
6250 
6251 #ifndef TORRENT_DISABLE_LOGGING
6252 		if (should_log(peer_log_alert::info))
6253 		{
6254 			peer_log(peer_log_alert::info, "ON_SEND_DATA", "bytes: %d %s"
6255 				, int(bytes_transferred), print_error(error).c_str());
6256 		}
6257 #endif
6258 
6259 		INVARIANT_CHECK;
6260 
6261 		COMPLETE_ASYNC("peer_connection::on_send_data");
6262 		// keep ourselves alive in until this function exits in
6263 		// case we disconnect
6264 		std::shared_ptr<peer_connection> me(self());
6265 
6266 		TORRENT_ASSERT(m_channel_state[upload_channel] & peer_info::bw_network);
6267 
6268 		m_send_buffer.pop_front(int(bytes_transferred));
6269 
6270 		time_point const now = clock_type::now();
6271 
6272 		for (auto& block : m_download_queue)
6273 		{
6274 			if (block.send_buffer_offset == pending_block::not_in_buffer)
6275 				continue;
6276 			if (block.send_buffer_offset < int(bytes_transferred))
6277 				block.send_buffer_offset = pending_block::not_in_buffer;
6278 			else
6279 				block.send_buffer_offset -= int(bytes_transferred);
6280 		}
6281 
6282 		m_channel_state[upload_channel] &= ~peer_info::bw_network;
6283 
6284 		TORRENT_ASSERT(int(bytes_transferred) <= m_quota[upload_channel]);
6285 		m_quota[upload_channel] -= int(bytes_transferred);
6286 
6287 		trancieve_ip_packet(int(bytes_transferred), is_v6(m_remote));
6288 
6289 		if (m_send_barrier != INT_MAX)
6290 			m_send_barrier -= int(bytes_transferred);
6291 
6292 #ifndef TORRENT_DISABLE_LOGGING
6293 		peer_log(peer_log_alert::outgoing, "WROTE"
6294 			, "%d bytes", int(bytes_transferred));
6295 #endif
6296 
6297 		if (error)
6298 		{
6299 #ifndef TORRENT_DISABLE_LOGGING
6300 			if (should_log(peer_log_alert::info))
6301 			{
6302 				peer_log(peer_log_alert::info, "ERROR"
6303 					, "%s in peer_connection::on_send_data", error.message().c_str());
6304 			}
6305 #endif
6306 			disconnect(error, operation_t::sock_write);
6307 			return;
6308 		}
6309 		if (m_disconnecting)
6310 		{
6311 			// make sure we free up all send buffers that are owned
6312 			// by the disk thread
6313 			m_send_buffer.clear();
6314 			return;
6315 		}
6316 
6317 		TORRENT_ASSERT(!m_connecting);
6318 		TORRENT_ASSERT(bytes_transferred > 0);
6319 
6320 		m_last_sent = now;
6321 
6322 #if TORRENT_USE_ASSERTS
6323 		std::int64_t const cur_payload_ul = m_statistics.last_payload_uploaded();
6324 		std::int64_t const cur_protocol_ul = m_statistics.last_protocol_uploaded();
6325 #endif
6326 		on_sent(error, bytes_transferred);
6327 #if TORRENT_USE_ASSERTS
6328 		TORRENT_ASSERT(m_statistics.last_payload_uploaded() - cur_payload_ul >= 0);
6329 		TORRENT_ASSERT(m_statistics.last_protocol_uploaded() - cur_protocol_ul >= 0);
6330 		std::int64_t stats_diff = m_statistics.last_payload_uploaded() - cur_payload_ul
6331 			+ m_statistics.last_protocol_uploaded() - cur_protocol_ul;
6332 		TORRENT_ASSERT(stats_diff == int(bytes_transferred));
6333 #endif
6334 
6335 		fill_send_buffer();
6336 
6337 		setup_send();
6338 	}
6339 
6340 #if TORRENT_USE_INVARIANT_CHECKS
6341 	struct peer_count_t
6342 	{
peer_count_tlibtorrent::peer_count_t6343 		peer_count_t(): num_peers(0), num_peers_with_timeouts(0), num_peers_with_nowant(0), num_not_requested(0) {}
6344 		int num_peers;
6345 		int num_peers_with_timeouts;
6346 		int num_peers_with_nowant;
6347 		int num_not_requested;
6348 //		std::vector<peer_connection const*> peers;
6349 	};
6350 
check_invariant() const6351 	void peer_connection::check_invariant() const
6352 	{
6353 		TORRENT_ASSERT(is_single_thread());
6354 		TORRENT_ASSERT(m_in_use == 1337);
6355 		TORRENT_ASSERT(m_queued_time_critical <= int(m_request_queue.size()));
6356 		TORRENT_ASSERT(m_accept_fast.size() == m_accept_fast_piece_cnt.size());
6357 
6358 		m_recv_buffer.check_invariant();
6359 
6360 		for (int i = 0; i < 2; ++i)
6361 		{
6362 			if (m_channel_state[i] & peer_info::bw_limit)
6363 			{
6364 				// if we're waiting for bandwidth, we should be in the
6365 				// bandwidth manager's queue
6366 				TORRENT_ASSERT(m_ses.get_bandwidth_manager(i)->is_queued(this));
6367 			}
6368 		}
6369 
6370 		std::shared_ptr<torrent> t = m_torrent.lock();
6371 
6372 #if TORRENT_USE_INVARIANT_CHECKS \
6373 	&& !defined TORRENT_NO_EXPENSIVE_INVARIANT_CHECK
6374 		if (t && t->has_picker() && !m_disconnecting)
6375 			t->picker().check_peer_invariant(m_have_piece, peer_info_struct());
6376 #endif
6377 
6378 		if (!m_disconnect_started && m_initialized)
6379 		{
6380 			// none of this matters if we're disconnecting anyway
6381 			if (t->is_finished())
6382 				TORRENT_ASSERT(!is_interesting() || m_need_interest_update);
6383 			if (is_seed())
6384 				TORRENT_ASSERT(upload_only());
6385 		}
6386 
6387 		if (m_disconnecting)
6388 		{
6389 			TORRENT_ASSERT(m_download_queue.empty());
6390 			TORRENT_ASSERT(m_request_queue.empty());
6391 			TORRENT_ASSERT(m_disconnect_started);
6392 		}
6393 
6394 		TORRENT_ASSERT(m_outstanding_bytes >= 0);
6395 		if (t && t->valid_metadata() && !m_disconnecting)
6396 		{
6397 			torrent_info const& ti = t->torrent_file();
6398 			// if the piece is fully downloaded, we might have popped it from the
6399 			// download queue already
6400 			int outstanding_bytes = 0;
6401 //			bool in_download_queue = false;
6402 			int const bs = t->block_size();
6403 			piece_block last_block(ti.last_piece()
6404 				, (ti.piece_size(ti.last_piece()) + bs - 1) / bs);
6405 			for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
6406 				, end(m_download_queue.end()); i != end; ++i)
6407 			{
6408 				TORRENT_ASSERT(i->block.piece_index <= last_block.piece_index);
6409 				TORRENT_ASSERT(i->block.piece_index < last_block.piece_index
6410 					|| i->block.block_index <= last_block.block_index);
6411 				if (m_received_in_piece && i == m_download_queue.begin())
6412 				{
6413 //					in_download_queue = true;
6414 					// this assert is not correct since block may have different sizes
6415 					// and may not be returned in the order they were requested
6416 //					TORRENT_ASSERT(t->to_req(i->block).length >= m_received_in_piece);
6417 					outstanding_bytes += t->to_req(i->block).length - m_received_in_piece;
6418 				}
6419 				else
6420 				{
6421 					outstanding_bytes += t->to_req(i->block).length;
6422 				}
6423 			}
6424 			//if (p && p->bytes_downloaded < p->full_block_bytes) TORRENT_ASSERT(in_download_queue);
6425 
6426 			if (m_outstanding_bytes != outstanding_bytes)
6427 			{
6428 				std::fprintf(stderr, "m_outstanding_bytes = %d\noutstanding_bytes = %d\n"
6429 					, m_outstanding_bytes, outstanding_bytes);
6430 			}
6431 
6432 			TORRENT_ASSERT(m_outstanding_bytes == outstanding_bytes);
6433 		}
6434 
6435 		std::set<piece_block> unique;
6436 		std::transform(m_download_queue.begin(), m_download_queue.end()
6437 			, std::inserter(unique, unique.begin()), std::bind(&pending_block::block, _1));
6438 		std::transform(m_request_queue.begin(), m_request_queue.end()
6439 			, std::inserter(unique, unique.begin()), std::bind(&pending_block::block, _1));
6440 		TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
6441 		if (m_peer_info)
6442 		{
6443 			TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0);
6444 			TORRENT_ASSERT(m_peer_info->prev_amount_download == 0);
6445 			TORRENT_ASSERT(m_peer_info->connection == this
6446 				|| m_peer_info->connection == nullptr);
6447 
6448 			if (m_peer_info->optimistically_unchoked)
6449 				TORRENT_ASSERT(!is_choked());
6450 		}
6451 
6452 		TORRENT_ASSERT(m_have_piece.count() == m_num_pieces);
6453 
6454 		if (!t)
6455 		{
6456 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
6457 			// since this connection doesn't have a torrent reference
6458 			// no torrent should have a reference to this connection either
6459 			TORRENT_ASSERT(!m_ses.any_torrent_has_peer(this));
6460 #endif
6461 			return;
6462 		}
6463 
6464 		if (t->ready_for_connections() && m_initialized)
6465 			TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size()));
6466 
6467 		// in share mode we don't close redundant connections
6468 		if (m_settings.get_bool(settings_pack::close_redundant_connections)
6469 #ifndef TORRENT_DISABLE_SHARE_MODE
6470 			&& !t->share_mode()
6471 #endif
6472 			)
6473 		{
6474 			bool const ok_to_disconnect =
6475 				can_disconnect(errors::upload_upload_connection)
6476 					|| can_disconnect(errors::uninteresting_upload_peer)
6477 					|| can_disconnect(errors::too_many_requests_when_choked)
6478 					|| can_disconnect(errors::timed_out_no_interest)
6479 					|| can_disconnect(errors::timed_out_no_request)
6480 					|| can_disconnect(errors::timed_out_inactivity);
6481 
6482 			// make sure upload only peers are disconnected
6483 			if (t->is_upload_only()
6484 				&& m_upload_only
6485 				&& !m_need_interest_update
6486 				&& t->valid_metadata()
6487 				&& has_metadata()
6488 				&& ok_to_disconnect)
6489 				TORRENT_ASSERT(m_disconnect_started || t->graceful_pause() || t->has_error());
6490 
6491 			if (m_upload_only
6492 				&& !m_interesting
6493 				&& !m_need_interest_update
6494 				&& m_bitfield_received
6495 				&& t->are_files_checked()
6496 				&& t->valid_metadata()
6497 				&& has_metadata()
6498 				&& ok_to_disconnect)
6499 				TORRENT_ASSERT(m_disconnect_started);
6500 		}
6501 
6502 		if (!m_disconnect_started && m_initialized
6503 			&& m_settings.get_bool(settings_pack::close_redundant_connections))
6504 		{
6505 			// none of this matters if we're disconnecting anyway
6506 			if (t->is_upload_only() && !m_need_interest_update)
6507 				TORRENT_ASSERT(!m_interesting || t->graceful_pause() || t->has_error());
6508 			if (is_seed())
6509 				TORRENT_ASSERT(m_upload_only);
6510 		}
6511 
6512 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
6513 		if (t->has_picker())
6514 		{
6515 			std::map<piece_block, peer_count_t> num_requests;
6516 			for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i)
6517 			{
6518 				// make sure this peer is not a dangling pointer
6519 				TORRENT_ASSERT(m_ses.has_peer(*i));
6520 				peer_connection const& p = *(*i);
6521 				for (std::vector<pending_block>::const_iterator j = p.request_queue().begin()
6522 					, end(p.request_queue().end()); j != end; ++j)
6523 				{
6524 					++num_requests[j->block].num_peers;
6525 					++num_requests[j->block].num_peers_with_timeouts;
6526 					++num_requests[j->block].num_peers_with_nowant;
6527 					++num_requests[j->block].num_not_requested;
6528 //					num_requests[j->block].peers.push_back(&p);
6529 				}
6530 				for (std::vector<pending_block>::const_iterator j = p.download_queue().begin()
6531 					, end(p.download_queue().end()); j != end; ++j)
6532 				{
6533 					if (!j->not_wanted && !j->timed_out) ++num_requests[j->block].num_peers;
6534 					if (j->timed_out) ++num_requests[j->block].num_peers_with_timeouts;
6535 					if (j->not_wanted) ++num_requests[j->block].num_peers_with_nowant;
6536 //					num_requests[j->block].peers.push_back(&p);
6537 				}
6538 			}
6539 			for (std::map<piece_block, peer_count_t>::iterator j = num_requests.begin()
6540 				, end(num_requests.end()); j != end; ++j)
6541 			{
6542 				piece_block b = j->first;
6543 				peer_count_t const& pc = j->second;
6544 				int count = pc.num_peers;
6545 				int count_with_timeouts = pc.num_peers_with_timeouts;
6546 				int count_with_nowant = pc.num_peers_with_nowant;
6547 				(void)count_with_timeouts;
6548 				(void)count_with_nowant;
6549 				int picker_count = t->picker().num_peers(b);
6550 				if (!t->picker().is_downloaded(b))
6551 					TORRENT_ASSERT(picker_count == count);
6552 			}
6553 		}
6554 #endif
6555 /*
6556 		if (t->has_picker() && !t->is_aborted())
6557 		{
6558 			for (std::vector<pending_block>::const_iterator i = m_download_queue.begin()
6559 				, end(m_download_queue.end()); i != end; ++i)
6560 			{
6561 				pending_block const& pb = *i;
6562 				if (pb.timed_out || pb.not_wanted) continue;
6563 				TORRENT_ASSERT(t->picker().get_block_state(pb.block) != piece_picker::block_info::state_none);
6564 				TORRENT_ASSERT(complete);
6565 			}
6566 		}
6567 */
6568 // extremely expensive invariant check
6569 /*
6570 		if (!t->is_seed())
6571 		{
6572 			piece_picker& p = t->picker();
6573 			const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
6574 			const int blocks_per_piece = static_cast<int>(
6575 				t->torrent_file().piece_length() / t->block_size());
6576 
6577 			for (std::vector<piece_picker::downloading_piece>::const_iterator i =
6578 				dlq.begin(); i != dlq.end(); ++i)
6579 			{
6580 				for (int j = 0; j < blocks_per_piece; ++j)
6581 				{
6582 					if (std::find(m_request_queue.begin(), m_request_queue.end()
6583 						, piece_block(i->index, j)) != m_request_queue.end()
6584 						||
6585 						std::find(m_download_queue.begin(), m_download_queue.end()
6586 						, piece_block(i->index, j)) != m_download_queue.end())
6587 					{
6588 						TORRENT_ASSERT(i->info[j].peer == m_remote);
6589 					}
6590 					else
6591 					{
6592 						TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
6593 					}
6594 				}
6595 			}
6596 		}
6597 */
6598 	}
6599 #endif
6600 
set_holepunch_mode()6601 	void peer_connection::set_holepunch_mode()
6602 	{
6603 		m_holepunch_mode = true;
6604 #ifndef TORRENT_DISABLE_LOGGING
6605 		peer_log(peer_log_alert::info, "HOLEPUNCH_MODE", "[ on ]");
6606 #endif
6607 	}
6608 
keep_alive()6609 	void peer_connection::keep_alive()
6610 	{
6611 		TORRENT_ASSERT(is_single_thread());
6612 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
6613 		INVARIANT_CHECK;
6614 #endif
6615 
6616 		time_duration const d = aux::time_now() - m_last_sent;
6617 		if (total_seconds(d) < timeout() / 2) return;
6618 
6619 		if (m_connecting) return;
6620 		if (in_handshake()) return;
6621 
6622 		// if the last send has not completed yet, do not send a keep
6623 		// alive
6624 		if (m_channel_state[upload_channel] & peer_info::bw_network) return;
6625 
6626 #ifndef TORRENT_DISABLE_LOGGING
6627 		peer_log(peer_log_alert::outgoing_message, "KEEPALIVE");
6628 #endif
6629 
6630 		write_keepalive();
6631 	}
6632 
is_seed() const6633 	bool peer_connection::is_seed() const
6634 	{
6635 		TORRENT_ASSERT(is_single_thread());
6636 
6637 		// if m_num_pieces == 0, we probably don't have the
6638 		// metadata yet.
6639 		std::shared_ptr<torrent> t = m_torrent.lock();
6640 		return m_num_pieces == m_have_piece.size()
6641 			&& m_num_pieces > 0 && t && t->valid_metadata();
6642 	}
6643 
6644 #ifndef TORRENT_DISABLE_SHARE_MODE
set_share_mode(bool u)6645 	void peer_connection::set_share_mode(bool u)
6646 	{
6647 		TORRENT_ASSERT(is_single_thread());
6648 		// if the peer is a seed, ignore share mode messages
6649 		if (is_seed()) return;
6650 
6651 		m_share_mode = u;
6652 	}
6653 #endif
6654 
set_upload_only(bool u)6655 	void peer_connection::set_upload_only(bool u)
6656 	{
6657 		TORRENT_ASSERT(is_single_thread());
6658 		// if the peer is a seed, don't allow setting
6659 		// upload_only to false
6660 		if (m_upload_only && is_seed()) return;
6661 
6662 		m_upload_only = u;
6663 		disconnect_if_redundant();
6664 	}
6665 
6666 }
6667