1 /*
2 
3 Copyright (c) 2008, Arvid Norberg
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include "libtorrent/peer_id.hpp"
34 #include "libtorrent/io_service.hpp"
35 #include "libtorrent/socket.hpp"
36 #include "libtorrent/address.hpp"
37 #include "libtorrent/error_code.hpp"
38 #include "libtorrent/io.hpp"
39 #include "libtorrent/torrent_info.hpp"
40 #include "libtorrent/create_torrent.hpp"
41 #include "libtorrent/hasher.hpp"
42 #include "libtorrent/socket_io.hpp"
43 #include "libtorrent/file_pool.hpp"
44 #include "libtorrent/string_view.hpp"
45 #include <random>
46 #include <cstring>
47 #include <thread>
48 #include <functional>
49 #include <iostream>
50 #include <atomic>
51 #include <array>
52 #include <chrono>
53 
54 #if BOOST_ASIO_DYN_LINK
55 #include <boost/asio/impl/src.hpp>
56 #endif
57 
58 using namespace lt;
59 using namespace lt::detail; // for write_* and read_*
60 
61 using namespace std::placeholders;
62 
generate_block(span<std::uint32_t> buffer,piece_index_t const piece,int const offset)63 void generate_block(span<std::uint32_t> buffer, piece_index_t const piece
64 	, int const offset)
65 {
66 	std::uint32_t const fill = (static_cast<int>(piece) << 8) | ((offset / 0x4000) & 0xff);
67 	for (auto& w : buffer) w = fill;
68 }
69 
70 // in order to circumvent the restricton of only
71 // one connection per IP that most clients implement
72 // all sockets created by this tester are bound to
73 // uniqe local IPs in the range (127.0.0.1 - 127.255.255.255)
74 // it's only enabled if the target is also on the loopback
75 int local_if_counter = 0;
76 bool local_bind = false;
77 
78 // when set to true, blocks downloaded are verified to match
79 // the test torrents
80 bool verify_downloads = false;
81 
82 // if this is true, one block in 1000 will be sent corrupt.
83 // this only applies to dual and upload tests
84 bool test_corruption = false;
85 
86 // number of seeds we've spawned. The test is terminated
87 // when this reaches zero, for dual tests
88 std::atomic<int> num_seeds(0);
89 
90 // the kind of test to run. Upload sends data to a
91 // bittorrent client, download requests data from
92 // a client and dual uploads and downloads from a client
93 // at the same time (this is presumably the most realistic
94 // test)
95 enum test_mode_t{ none, upload_test, download_test, dual_test };
96 test_mode_t test_mode = none;
97 
98 // the number of suggest messages received (total across all peers)
99 std::atomic<int> num_suggest(0);
100 
101 // the number of requests made from suggested pieces
102 std::atomic<int> num_suggested_requests(0);
103 
leaf_path(std::string f)104 std::string leaf_path(std::string f)
105 {
106 	if (f.empty()) return "";
107 	char const* first = f.c_str();
108 	char const* sep = strrchr(first, '/');
109 #if defined(TORRENT_WINDOWS) || defined(TORRENT_OS2)
110 	char const* altsep = strrchr(first, '\\');
111 	if (sep == 0 || altsep > sep) sep = altsep;
112 #endif
113 	if (sep == nullptr) return f;
114 
115 	if (sep - first == int(f.size()) - 1)
116 	{
117 		// if the last character is a / (or \)
118 		// ignore it
119 		int len = 0;
120 		while (sep > first)
121 		{
122 			--sep;
123 			if (*sep == '/'
124 #if defined(TORRENT_WINDOWS) || defined(TORRENT_OS2)
125 				|| *sep == '\\'
126 #endif
127 				)
128 				return std::string(sep + 1, len);
129 			++len;
130 		}
131 		return std::string(first, len);
132 	}
133 	return std::string(sep + 1);
134 }
135 
136 namespace {
137 std::random_device dev;
138 std::mt19937 rng(dev());
139 }
140 
141 struct peer_conn
142 {
peer_connpeer_conn143 	peer_conn(io_service& ios, int num_pieces, int blocks_pp, tcp::endpoint const& ep
144 		, char const* ih, bool seed_, int churn_, bool corrupt_)
145 		: s(ios)
146 		, read_pos(0)
147 		, state(handshaking)
148 		, choked(true)
149 		, current_piece(-1)
150 		, current_piece_is_allowed(false)
151 		, block(0)
152 		, blocks_per_piece(blocks_pp)
153 		, info_hash(ih)
154 		, outstanding_requests(0)
155 		, seed(seed_)
156 		, fast_extension(false)
157 		, blocks_received(0)
158 		, blocks_sent(0)
159 		, num_pieces(num_pieces)
160 		, start_time(clock_type::now())
161 		, churn(churn_)
162 		, corrupt(corrupt_)
163 		, endpoint(ep)
164 		, restarting(false)
165 	{
166 		corruption_counter = rand() % 1000;
167 		if (seed) ++num_seeds;
168 		pieces.reserve(num_pieces);
169 		start_conn();
170 	}
171 
start_connpeer_conn172 	void start_conn()
173 	{
174 		if (local_bind)
175 		{
176 			error_code ec;
177 			s.open(endpoint.protocol(), ec);
178 			if (ec)
179 			{
180 				close("ERROR OPEN: %s", ec);
181 				return;
182 			}
183 			tcp::endpoint bind_if(address_v4(
184 				(127 << 24)
185 				+ ((local_if_counter / 255) << 16)
186 				+ ((local_if_counter % 255) + 1)), 0);
187 			++local_if_counter;
188 			s.bind(bind_if, ec);
189 			if (ec)
190 			{
191 				close("ERROR BIND: %s", ec);
192 				return;
193 			}
194 		}
195 		restarting = false;
196 		s.async_connect(endpoint, std::bind(&peer_conn::on_connect, this, _1));
197 	}
198 
199 	tcp::socket s;
200 	char write_buf_proto[100];
201 	std::uint32_t write_buffer[17*1024/4];
202 	std::uint32_t buffer[17*1024/4];
203 	int read_pos;
204 	int corruption_counter;
205 
206 	enum state_t
207 	{
208 		handshaking,
209 		sending_request,
210 		receiving_message
211 	};
212 	int state;
213 	std::vector<piece_index_t> pieces;
214 	std::vector<piece_index_t> suggested_pieces;
215 	std::vector<piece_index_t> allowed_fast;
216 	bool choked;
217 	piece_index_t current_piece; // the piece we're currently requesting blocks from
218 	bool current_piece_is_allowed;
219 	int block;
220 	int blocks_per_piece;
221 	char const* info_hash;
222 	int outstanding_requests;
223 	// if this is true, this connection is a seed
224 	bool seed;
225 	bool fast_extension;
226 	int blocks_received;
227 	int blocks_sent;
228 	int num_pieces;
229 	time_point start_time;
230 	time_point end_time;
231 	int churn;
232 	bool corrupt;
233 	tcp::endpoint endpoint;
234 	bool restarting;
235 
on_connectpeer_conn236 	void on_connect(error_code const& ec)
237 	{
238 		if (ec)
239 		{
240 			close("ERROR CONNECT: %s", ec);
241 			return;
242 		}
243 
244 		char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04"
245 			"                    " // space for info-hash
246 			"aaaaaaaaaaaaaaaaaaaa" // peer-id
247 			"\0\0\0\x01\x02"; // interested
248 		char* h = (char*)malloc(sizeof(handshake));
249 		memcpy(h, handshake, sizeof(handshake));
250 		std::memcpy(h + 28, info_hash, 20);
251 		std::generate(h + 48, h + 68, &rand);
252 		// for seeds, don't send the interested message
253 		boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1) - (seed ? 5 : 0))
254 			, std::bind(&peer_conn::on_handshake, this, h, _1, _2));
255 	}
256 
on_handshakepeer_conn257 	void on_handshake(char* h, error_code const& ec, size_t)
258 	{
259 		free(h);
260 		if (ec)
261 		{
262 			close("ERROR SEND HANDSHAKE: %s", ec);
263 			return;
264 		}
265 
266 		// read handshake
267 		boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68)
268 			, std::bind(&peer_conn::on_handshake2, this, _1, _2));
269 	}
270 
on_handshake2peer_conn271 	void on_handshake2(error_code const& ec, size_t)
272 	{
273 		if (ec)
274 		{
275 			close("ERROR READ HANDSHAKE: %s", ec);
276 			return;
277 		}
278 
279 		// buffer is the full 68 byte handshake
280 		// look at the extension bits
281 
282 		fast_extension = (((char*)buffer)[27] & 4) != 0;
283 
284 		if (seed)
285 		{
286 			write_have_all();
287 		}
288 		else
289 		{
290 			work_download();
291 		}
292 	}
293 
write_have_allpeer_conn294 	void write_have_all()
295 	{
296 		if (fast_extension)
297 		{
298 			char* ptr = write_buf_proto;
299 			// have_all
300 			write_uint32(1, ptr);
301 			write_uint8(0xe, ptr);
302 			// unchoke
303 			write_uint32(1, ptr);
304 			write_uint8(1, ptr);
305 			boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto)
306 				, std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
307 		}
308 		else
309 		{
310 			// bitfield
311 			int len = (num_pieces + 7) / 8;
312 			char* ptr = (char*)buffer;
313 			write_uint32(len + 1, ptr);
314 			write_uint8(5, ptr);
315 			memset(ptr, 255, len);
316 			ptr += len;
317 			// unchoke
318 			write_uint32(1, ptr);
319 			write_uint8(1, ptr);
320 			boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10)
321 				, std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
322 		}
323 	}
324 
on_have_all_sentpeer_conn325 	void on_have_all_sent(error_code const& ec, size_t)
326 	{
327 		if (ec)
328 		{
329 			close("ERROR SEND HAVE ALL: %s", ec);
330 			return;
331 		}
332 
333 		// read message
334 		boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
335 			, std::bind(&peer_conn::on_msg_length, this, _1, _2));
336 	}
337 
write_requestpeer_conn338 	bool write_request()
339 	{
340 		// if we're choked (and there are no allowed-fast pieces left)
341 		if (choked && allowed_fast.empty() && !current_piece_is_allowed) return false;
342 
343 		// if there are no pieces left to request
344 		if (pieces.empty() && suggested_pieces.empty()
345 			&& current_piece == piece_index_t(-1))
346 		{
347 			return false;
348 		}
349 
350 		if (current_piece == piece_index_t(-1))
351 		{
352 			// pick a new piece
353 			if (choked && allowed_fast.size() > 0)
354 			{
355 				current_piece = allowed_fast.front();
356 				allowed_fast.erase(allowed_fast.begin());
357 				current_piece_is_allowed = true;
358 			}
359 			else if (suggested_pieces.size() > 0)
360 			{
361 				current_piece = suggested_pieces.front();
362 				suggested_pieces.erase(suggested_pieces.begin());
363 				++num_suggested_requests;
364 				current_piece_is_allowed = false;
365 			}
366 			else if (pieces.size() > 0)
367 			{
368 				current_piece = pieces.front();
369 				pieces.erase(pieces.begin());
370 				current_piece_is_allowed = false;
371 			}
372 			else
373 			{
374 				TORRENT_ASSERT_FAIL();
375 			}
376 		}
377 		char msg[] = "\0\0\0\xd\x06"
378 			"    " // piece
379 			"    " // offset
380 			"    "; // length
381 		char* m = (char*)malloc(sizeof(msg));
382 		memcpy(m, msg, sizeof(msg));
383 		char* ptr = m + 5;
384 		write_uint32(static_cast<int>(current_piece), ptr);
385 		write_uint32(block * 16 * 1024, ptr);
386 		write_uint32(16 * 1024, ptr);
387 		boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1)
388 			, std::bind(&peer_conn::on_req_sent, this, m, _1, _2));
389 
390 		++outstanding_requests;
391 		++block;
392 		if (block == blocks_per_piece)
393 		{
394 			block = 0;
395 			current_piece = piece_index_t(-1);
396 			current_piece_is_allowed = false;
397 		}
398 		return true;
399 	}
400 
on_req_sentpeer_conn401 	void on_req_sent(char* m, error_code const& ec, size_t)
402 	{
403 		free(m);
404 		if (ec)
405 		{
406 			close("ERROR SEND REQUEST: %s", ec);
407 			return;
408 		}
409 
410 		work_download();
411 	}
412 
closepeer_conn413 	void close(char const* fmt, error_code const& ec)
414 	{
415 		end_time = clock_type::now();
416 		char tmp[1024];
417 		std::snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str());
418 		int time = int(total_milliseconds(end_time - start_time));
419 		if (time == 0) time = 1;
420 		float up = (std::int64_t(blocks_sent) * 0x4000) / time / 1000.f;
421 		float down = (std::int64_t(blocks_received) * 0x4000) / time / 1000.f;
422 		error_code e;
423 
424 		char ep_str[200];
425 		address const& addr = s.local_endpoint(e).address();
426 		if (addr.is_v6())
427 			std::snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str()
428 				, s.local_endpoint(e).port());
429 		else
430 			std::snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str()
431 				, s.local_endpoint(e).port());
432 		std::printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n"
433 			, tmp, ep_str, blocks_sent, blocks_received, time, up, down);
434 		if (seed) --num_seeds;
435 	}
436 
work_downloadpeer_conn437 	void work_download()
438 	{
439 		if (pieces.empty()
440 			&& suggested_pieces.empty()
441 			&& current_piece == piece_index_t(-1)
442 			&& outstanding_requests == 0
443 			&& blocks_received >= num_pieces * blocks_per_piece)
444 		{
445 			close("COMPLETED DOWNLOAD", error_code());
446 			return;
447 		}
448 
449 		// send requests
450 		if (outstanding_requests < 40)
451 		{
452 			if (write_request()) return;
453 		}
454 
455 		// read message
456 		boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
457 			, std::bind(&peer_conn::on_msg_length, this, _1, _2));
458 	}
459 
on_msg_lengthpeer_conn460 	void on_msg_length(error_code const& ec, size_t)
461 	{
462 		if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
463 			&& restarting)
464 		{
465 			start_conn();
466 			return;
467 		}
468 
469 		if (ec)
470 		{
471 			close("ERROR RECEIVE MESSAGE PREFIX: %s", ec);
472 			return;
473 		}
474 		char* ptr = (char*)buffer;
475 		unsigned int length = read_uint32(ptr);
476 		if (length > sizeof(buffer))
477 		{
478 			std::fprintf(stderr, "len: %d\n", length);
479 			close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code());
480 			return;
481 		}
482 		boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length)
483 			, std::bind(&peer_conn::on_message, this, _1, _2));
484 	}
485 
on_messagepeer_conn486 	void on_message(error_code const& ec, size_t bytes_transferred)
487 	{
488 		if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
489 			&& restarting)
490 		{
491 			start_conn();
492 			return;
493 		}
494 
495 		if (ec)
496 		{
497 			close("ERROR RECEIVE MESSAGE: %s", ec);
498 			return;
499 		}
500 		char* ptr = (char*)buffer;
501 		int msg = read_uint8(ptr);
502 
503 		if (test_mode == dual_test && num_seeds == 0)
504 		{
505 			TORRENT_ASSERT(!seed);
506 			close("NO MORE SEEDS, test done", error_code());
507 			return;
508 		}
509 
510 		//std::printf("msg: %d len: %d\n", msg, int(bytes_transferred));
511 
512 		if (seed)
513 		{
514 			if (msg == 6)
515 			{
516 				if (bytes_transferred != 13)
517 				{
518 					close("REQUEST packet has invalid size", error_code());
519 					return;
520 				}
521 				piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
522 				int const start = detail::read_int32(ptr);
523 				int const length = detail::read_int32(ptr);
524 				write_piece(piece, start, length);
525 			}
526 			else if (msg == 3) // not-interested
527 			{
528 				close("DONE", error_code());
529 				return;
530 			}
531 			else
532 			{
533 				// read another message
534 				boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
535 					, std::bind(&peer_conn::on_msg_length, this, _1, _2));
536 			}
537 		}
538 		else
539 		{
540 			if (msg == 0xe) // have_all
541 			{
542 				// build a list of all pieces and request them all!
543 				pieces.resize(num_pieces);
544 				for (piece_index_t i(0); i < piece_index_t(int(pieces.size())); ++i)
545 					pieces[static_cast<int>(i)] = i;
546 				std::shuffle(pieces.begin(), pieces.end(), rng);
547 			}
548 			else if (msg == 4) // have
549 			{
550 				piece_index_t const piece(detail::read_int32(ptr));
551 				if (pieces.empty()) pieces.push_back(piece);
552 				else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece);
553 			}
554 			else if (msg == 5) // bitfield
555 			{
556 				pieces.reserve(num_pieces);
557 				piece_index_t piece(0);
558 				for (int i = 0; i < int(bytes_transferred); ++i)
559 				{
560 					int mask = 0x80;
561 					for (int k = 0; k < 8; ++k)
562 					{
563 						if (piece > piece_index_t(num_pieces)) break;
564 						if (*ptr & mask) pieces.push_back(piece);
565 						mask >>= 1;
566 						++piece;
567 					}
568 					++ptr;
569 				}
570 				std::shuffle(pieces.begin(), pieces.end(), rng);
571 			}
572 			else if (msg == 7) // piece
573 			{
574 				if (verify_downloads)
575 				{
576 					piece_index_t const piece(read_uint32(ptr));
577 					int start = read_uint32(ptr);
578 					int size = int(bytes_transferred) - 9;
579 					verify_piece(piece, start, ptr, size);
580 				}
581 				++blocks_received;
582 				--outstanding_requests;
583 				piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
584 				int start = detail::read_int32(ptr);
585 
586 				if (churn && (blocks_received % churn) == 0) {
587 					outstanding_requests = 0;
588 					restarting = true;
589 					s.close();
590 					return;
591 				}
592 				if (int((start + bytes_transferred) / 0x4000) == blocks_per_piece)
593 				{
594 					write_have(piece);
595 					return;
596 				}
597 			}
598 			else if (msg == 13) // suggest
599 			{
600 				piece_index_t const piece(detail::read_int32(ptr));
601 				auto i = std::find(pieces.begin(), pieces.end(), piece);
602 				if (i != pieces.end())
603 				{
604 					pieces.erase(i);
605 					suggested_pieces.push_back(piece);
606 					++num_suggest;
607 				}
608 			}
609 			else if (msg == 16) // reject request
610 			{
611 				piece_index_t const piece(detail::read_int32(ptr));
612 				int start = detail::read_int32(ptr);
613 				int length = detail::read_int32(ptr);
614 
615 				// put it back!
616 				if (current_piece != piece)
617 				{
618 					if (pieces.empty() || pieces.back() != piece)
619 						pieces.push_back(piece);
620 				}
621 				else
622 				{
623 					block = std::min(start / 0x4000, block);
624 					if (block == 0)
625 					{
626 						pieces.push_back(current_piece);
627 						current_piece = piece_index_t(-1);
628 						current_piece_is_allowed = false;
629 					}
630 				}
631 				--outstanding_requests;
632 				std::fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n"
633 					, static_cast<int>(piece), start, length);
634 			}
635 			else if (msg == 0) // choke
636 			{
637 				choked = true;
638 			}
639 			else if (msg == 1) // unchoke
640 			{
641 				choked = false;
642 			}
643 			else if (msg == 17) // allowed_fast
644 			{
645 				piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
646 				auto i = std::find(pieces.begin(), pieces.end(), piece);
647 				if (i != pieces.end())
648 				{
649 					pieces.erase(i);
650 					allowed_fast.push_back(piece);
651 				}
652 			}
653 			work_download();
654 		}
655 	}
656 
verify_piecepeer_conn657 	bool verify_piece(piece_index_t const piece, int start, char const* ptr, int size)
658 	{
659 		std::uint32_t* buf = (std::uint32_t*)ptr;
660 		std::uint32_t const fill = (static_cast<int>(piece) << 8) | ((start / 0x4000) & 0xff);
661 		for (int i = 0; i < size / 4; ++i)
662 		{
663 			if (buf[i] != fill)
664 			{
665 				std::fprintf(stderr, "received invalid block. piece %d block %d\n"
666 					, static_cast<int>(piece), start / 0x4000);
667 				exit(1);
668 			}
669 		}
670 		return true;
671 	}
672 
write_piecepeer_conn673 	void write_piece(piece_index_t const piece, int start, int length)
674 	{
675 		generate_block({write_buffer, length / 4}
676 			, piece, start);
677 
678 		if (corrupt)
679 		{
680 			--corruption_counter;
681 			if (corruption_counter == 0)
682 			{
683 				corruption_counter = 1000;
684 				std::memset(write_buffer, 0, 10);
685 			}
686 		}
687 		char* ptr = write_buf_proto;
688 		write_uint32(9 + length, ptr);
689 		assert(length == 0x4000);
690 		write_uint8(7, ptr);
691 		write_uint32(static_cast<int>(piece), ptr);
692 		write_uint32(start, ptr);
693 		std::array<boost::asio::const_buffer, 2> vec;
694 		vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto);
695 		vec[1] = boost::asio::buffer(write_buffer, length);
696 		boost::asio::async_write(s, vec, std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
697 		++blocks_sent;
698 		if (churn && (blocks_sent % churn) == 0 && seed) {
699 			outstanding_requests = 0;
700 			restarting = true;
701 			s.close();
702 		}
703 	}
704 
write_havepeer_conn705 	void write_have(piece_index_t const piece)
706 	{
707 		char* ptr = write_buf_proto;
708 		write_uint32(5, ptr);
709 		write_uint8(4, ptr);
710 		write_uint32(static_cast<int>(piece), ptr);
711 		boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
712 	}
713 };
714 
print_usage()715 void print_usage()
716 {
717 	std::fprintf(stderr, "usage: connection_tester command [options]\n\n"
718 		"command is one of:\n"
719 		"  gen-torrent        generate a test torrent\n"
720 		"    options for this command:\n"
721 		"    -s <size>          the size of the torrent in megabytes\n"
722 		"    -n <num-files>     the number of files in the test torrent\n"
723 		"    -a                 introduce a lot of pad-files\n"
724 		"                       (pad files are not supported for gen-data or upload)\n"
725 		"    -t <file>          the file to save the .torrent file to\n"
726 		"    -T <name>          the name of the torrent (and directory\n"
727 		"                       its files are saved in)\n\n"
728 		"  gen-data             generate the data file(s) for the test torrent\n"
729 		"    options for this command:\n"
730 		"    -t <file>          the torrent file that was previously generated\n"
731 		"    -P <path>          the path to where the data should be stored\n\n"
732 		"  gen-test-torrents    generate many test torrents (cannot be used for up/down tests)\n"
733 		"    options for this command:\n"
734 		"    -N <num-torrents>  number of torrents to generate\n"
735 		"    -n <num-files>     number of files in each torrent\n"
736 		"    -t <name>          base name of torrent files (index is appended)\n\n"
737 		"    -T <URL>           add the specified tracker URL to each torrent\n"
738 		"                       this option may appear multiple times\n\n"
739 		"  upload               start an uploader test\n"
740 		"  download             start a downloader test\n"
741 		"  dual                 start a download and upload test\n"
742 		"    options for these commands:\n"
743 		"    -c <num-conns>     the number of connections to make to the target\n"
744 		"    -d <dst>           the IP address of the target\n"
745 		"    -p <dst-port>      the port the target listens on\n"
746 		"    -t <torrent-file>  the torrent file previously generated by gen-torrent\n"
747 		"    -C                 send corrupt pieces sometimes (applies to upload and dual)\n"
748 		"    -r <reconnects>    churn - number of reconnects per second\n\n"
749 		"examples:\n\n"
750 		"connection_tester gen-torrent -s 1024 -n 4 -t test.torrent\n"
751 		"connection_tester upload -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n"
752 		"connection_tester download -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n"
753 		"connection_tester dual -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n");
754 	exit(1);
755 }
756 
hasher_thread(lt::create_torrent * t,piece_index_t const start_piece,piece_index_t const end_piece,int piece_size,bool print)757 void hasher_thread(lt::create_torrent* t, piece_index_t const start_piece
758 	, piece_index_t const end_piece, int piece_size, bool print)
759 {
760 	if (print) std::fprintf(stderr, "\n");
761 	std::uint32_t piece[0x4000 / 4];
762 	for (piece_index_t i = start_piece; i < end_piece; ++i)
763 	{
764 		hasher ph;
765 		for (int j = 0; j < piece_size; j += 0x4000)
766 		{
767 			generate_block(piece, i, j);
768 			ph.update(reinterpret_cast<char*>(piece), 0x4000);
769 		}
770 		t->set_hash(i, ph.final());
771 		int const range = static_cast<int>(end_piece) - static_cast<int>(start_piece);
772 		if (print && (static_cast<int>(i) & 1))
773 		{
774 			int const delta_piece = static_cast<int>(i) - static_cast<int>(start_piece);
775 			std::fprintf(stderr, "\r%.1f %% ", float(delta_piece * 100) / float(range));
776 		}
777 	}
778 	if (print) std::fprintf(stderr, "\n");
779 }
780 
781 // size is in megabytes
generate_torrent(std::vector<char> & buf,int num_pieces,int num_files,char const * torrent_name,bool with_padding)782 void generate_torrent(std::vector<char>& buf, int num_pieces, int num_files
783 	, char const* torrent_name, bool with_padding)
784 {
785 	file_storage fs;
786 	// 1 MiB piece size
787 	const int piece_size = 1024 * 1024;
788 	const std::int64_t total_size = std::int64_t(piece_size) * num_pieces;
789 
790 	std::int64_t s = total_size;
791 	int file_index = 0;
792 	std::int64_t file_size = total_size / num_files;
793 	while (s > 0)
794 	{
795 		char b[100];
796 		std::snprintf(b, sizeof(b), "%s/stress_test%d", torrent_name, file_index);
797 		++file_index;
798 		fs.add_file(b, std::min(s, file_size));
799 		s -= file_size;
800 		file_size += 200;
801 	}
802 
803 	lt::create_torrent t(fs, piece_size, with_padding ? 100 : -1);
804 
805 	num_pieces = t.num_pieces();
806 
807 	int const num_threads = std::thread::hardware_concurrency()
808 		? std::thread::hardware_concurrency() : 4;
809 	std::printf("hashing in %d threads\n", num_threads);
810 
811 	std::vector<std::thread> threads;
812 	threads.reserve(num_threads);
813 	for (int i = 0; i < num_threads; ++i)
814 	{
815 		threads.emplace_back(&hasher_thread, &t
816 			, piece_index_t(i * num_pieces / num_threads)
817 			, piece_index_t((i + 1) * num_pieces / num_threads)
818 			, piece_size
819 			, i == 0);
820 	}
821 
822 	for (auto& i : threads)
823 		i.join();
824 
825 	std::back_insert_iterator<std::vector<char>> out(buf);
826 	bencode(out, t.generate());
827 }
828 
generate_data(char const * path,torrent_info const & ti)829 void generate_data(char const* path, torrent_info const& ti)
830 {
831 	file_storage const& fs = ti.files();
832 
833 	file_pool fp;
834 
835 	aux::vector<download_priority_t, file_index_t> priorities;
836 	sha1_hash info_hash;
837 	storage_params params{
838 		fs,
839 		nullptr,
840 		path,
841 		storage_mode_sparse,
842 		priorities,
843 		info_hash
844 	};
845 
846 	std::unique_ptr<storage_interface> st(default_storage_constructor(params, fp));
847 
848 	{
849 		storage_error error;
850 		st->initialize(error);
851 	}
852 
853 	std::uint32_t piece[0x4000 / 4];
854 	for (piece_index_t i(0); i < piece_index_t(ti.num_pieces()); ++i)
855 	{
856 		for (int j = 0; j < ti.piece_size(i); j += 0x4000)
857 		{
858 			generate_block(piece, i, j);
859 			int const left_in_piece = ti.piece_size(i) - j;
860 			iovec_t const b = { reinterpret_cast<char*>(piece)
861 				, std::min(left_in_piece, 0x4000)};
862 			storage_error error;
863 			st->writev(b, i, j, open_mode::write_only, error);
864 			if (error)
865 				std::fprintf(stderr, "storage error: %s\n", error.ec.message().c_str());
866 		}
867 		if (static_cast<int>(i) & 1)
868 		{
869 			std::fprintf(stderr, "\r%.1f %% ", float(static_cast<int>(i) * 100) / float(ti.num_pieces()));
870 		}
871 	}
872 }
873 
io_thread(io_service * ios)874 void io_thread(io_service* ios)
875 {
876 	error_code ec;
877 	ios->run(ec);
878 	if (ec) std::fprintf(stderr, "ERROR: %s\n", ec.message().c_str());
879 }
880 
main(int argc,char * argv[])881 int main(int argc, char* argv[])
882 {
883 	if (argc <= 1) print_usage();
884 
885 	char const* command = argv[1];
886 	int size = 1000;
887 	int num_files = 10;
888 	int num_torrents = 1;
889 	char const* torrent_file = "benchmark.torrent";
890 	char const* data_path = ".";
891 	int num_connections = 50;
892 	char const* destination_ip = "127.0.0.1";
893 	int destination_port = 6881;
894 	int churn = 0;
895 	bool gen_pad_files = false;
896 	std::vector<std::string> trackers;
897 
898 	argv += 2;
899 	argc -= 2;
900 
901 	while (argc > 0)
902 	{
903 		char const* optname = argv[0];
904 		++argv;
905 		--argc;
906 
907 		if (optname[0] != '-' || strlen(optname) != 2)
908 		{
909 			std::fprintf(stderr, "unknown option: %s\n", optname);
910 			continue;
911 		}
912 
913 		// options with no arguments
914 		switch (optname[1])
915 		{
916 			case 'C': test_corruption = true; continue;
917 			case 'a': gen_pad_files = true; continue;
918 		}
919 
920 		if (argc == 0)
921 		{
922 			std::fprintf(stderr, "missing argument for option: %s\n", optname);
923 			break;
924 		}
925 
926 		char const* optarg = argv[0];
927 		++argv;
928 		--argc;
929 
930 		switch (optname[1])
931 		{
932 			case 's': size = atoi(optarg); break;
933 			case 'n': num_files = atoi(optarg); break;
934 			case 'N': num_torrents = atoi(optarg); break;
935 			case 't': torrent_file = optarg; break;
936 			case 'T': trackers.push_back(optarg); break;
937 			case 'P': data_path = optarg; break;
938 			case 'c': num_connections = atoi(optarg); break;
939 			case 'p': destination_port = atoi(optarg); break;
940 			case 'd': destination_ip = optarg; break;
941 			case 'r': churn = atoi(optarg); break;
942 			default: std::fprintf(stderr, "unknown option: %s\n", optname);
943 		}
944 	}
945 
946 	if (command == "gen-torrent"_sv)
947 	{
948 		std::vector<char> tmp;
949 		std::string name = leaf_path(torrent_file);
950 		name = name.substr(0, name.find_last_of('.'));
951 		std::printf("generating torrent: %s\n", name.c_str());
952 		generate_torrent(tmp, size ? size : 1024, num_files ? num_files : 1
953 			, name.c_str(), gen_pad_files);
954 
955 		FILE* output = stdout;
956 		if ("-"_sv != torrent_file)
957 		{
958 			if( (output = std::fopen(torrent_file, "wb+")) == nullptr)
959 			{
960 				std::fprintf(stderr, "Could not open file '%s' for writing: %s\n"
961 					, torrent_file, std::strerror(errno));
962 				exit(2);
963 			}
964 		}
965 		std::fprintf(stderr, "writing file to: %s\n", torrent_file);
966 		fwrite(&tmp[0], 1, tmp.size(), output);
967 		if (output != stdout)
968 			std::fclose(output);
969 
970 		return 0;
971 	}
972 	else if (command == "gen-data"_sv)
973 	{
974 		error_code ec;
975 		torrent_info ti(torrent_file, ec);
976 		if (ec)
977 		{
978 			std::fprintf(stderr, "ERROR LOADING .TORRENT: %s\n", ec.message().c_str());
979 			return 1;
980 		}
981 		generate_data(data_path, ti);
982 		return 0;
983 	}
984 	else if (command == "gen-test-torrents"_sv)
985 	{
986 		std::vector<char> buf;
987 		for (int i = 0; i < num_torrents; ++i)
988 		{
989 			char torrent_name[100];
990 			std::snprintf(torrent_name, sizeof(torrent_name), "%s-%d.torrent", torrent_file, i);
991 
992 			file_storage fs;
993 			for (int j = 0; j < num_files; ++j)
994 			{
995 				char file_name[100];
996 				std::snprintf(file_name, sizeof(file_name), "%s-%d/file-%d", torrent_file, i, j);
997 				fs.add_file(file_name, std::int64_t(j + i + 1) * 251);
998 			}
999 			// 1 MiB piece size
1000 			const int piece_size = 1024 * 1024;
1001 			lt::create_torrent t(fs, piece_size);
1002 			sha1_hash zero(nullptr);
1003 			for (auto const k : fs.piece_range())
1004 				t.set_hash(k, zero);
1005 
1006 			int tier = 0;
1007 			for (auto const& tr : trackers)
1008 				t.add_tracker(tr, tier++);
1009 
1010 			buf.clear();
1011 			std::back_insert_iterator<std::vector<char>> out(buf);
1012 			bencode(out, t.generate());
1013 			FILE* f = std::fopen(torrent_name, "w+");
1014 			if (f == nullptr)
1015 			{
1016 				std::fprintf(stderr, "Could not open file '%s' for writing: %s\n"
1017 					, torrent_name, std::strerror(errno));
1018 				return 1;
1019 			}
1020 			size_t ret = fwrite(buf.data(), 1, buf.size(), f);
1021 			if (ret != buf.size())
1022 			{
1023 				std::fprintf(stderr, "write returned: %d (expected %d)\n", int(ret), int(buf.size()));
1024 				std::fclose(f);
1025 				return 1;
1026 			}
1027 			std::printf("wrote %s\n", torrent_name);
1028 			std::fclose(f);
1029 		}
1030 		return 0;
1031 	}
1032 	else if (command == "upload"_sv)
1033 	{
1034 		test_mode = upload_test;
1035 	}
1036 	else if (command == "download"_sv)
1037 	{
1038 		test_mode = download_test;
1039 	}
1040 	else if (command == "dual"_sv)
1041 	{
1042 		test_mode = dual_test;
1043 	}
1044 	else
1045 	{
1046 		std::fprintf(stderr, "unknown command: %s\n\n", command);
1047 		print_usage();
1048 	}
1049 
1050 	error_code ec;
1051 	address_v4 addr = address_v4::from_string(destination_ip, ec);
1052 	if (ec)
1053 	{
1054 		std::fprintf(stderr, "ERROR RESOLVING %s: %s\n", destination_ip, ec.message().c_str());
1055 		return 1;
1056 	}
1057 	tcp::endpoint ep(addr, std::uint16_t(destination_port));
1058 
1059 #if !defined __APPLE__
1060 	// apparently darwin doesn't seems to let you bind to
1061 	// loopback on any other IP than 127.0.0.1
1062 	std::uint32_t const ip = addr.to_ulong();
1063 	if ((ip & 0xff000000) == 0x7f000000)
1064 	{
1065 		local_bind = true;
1066 	}
1067 #endif
1068 
1069 	torrent_info ti(torrent_file, ec);
1070 	if (ec)
1071 	{
1072 		std::fprintf(stderr, "ERROR LOADING .TORRENT: %s\n", ec.message().c_str());
1073 		return 1;
1074 	}
1075 
1076 	std::vector<peer_conn*> conns;
1077 	conns.reserve(num_connections);
1078 	int const num_threads = 2;
1079 	io_service ios[num_threads];
1080 	for (int i = 0; i < num_connections; ++i)
1081 	{
1082 		bool corrupt = test_corruption && (i & 1) == 0;
1083 		bool seed = false;
1084 		if (test_mode == upload_test) seed = true;
1085 		else if (test_mode == dual_test) seed = (i & 1);
1086 		conns.push_back(new peer_conn(ios[i % num_threads], ti.num_pieces(), ti.piece_length() / 16 / 1024
1087 			, ep, (char const*)&ti.info_hash()[0], seed, churn, corrupt));
1088 		std::this_thread::sleep_for(std::chrono::milliseconds(1));
1089 		ios[i % num_threads].poll_one(ec);
1090 		if (ec)
1091 		{
1092 			std::fprintf(stderr, "ERROR: %s\n", ec.message().c_str());
1093 			break;
1094 		}
1095 	}
1096 
1097 	std::thread t1(&io_thread, &ios[0]);
1098 	std::thread t2(&io_thread, &ios[1]);
1099 
1100 	t1.join();
1101 	t2.join();
1102 
1103 	float up = 0.f;
1104 	float down = 0.f;
1105 	std::uint64_t total_sent = 0;
1106 	std::uint64_t total_received = 0;
1107 
1108 	for (std::vector<peer_conn*>::iterator i = conns.begin()
1109 		, end(conns.end()); i != end; ++i)
1110 	{
1111 		peer_conn* p = *i;
1112 		int time = int(total_milliseconds(p->end_time - p->start_time));
1113 		if (time == 0) time = 1;
1114 		total_sent += p->blocks_sent;
1115 		up += (std::int64_t(p->blocks_sent) * 0x4000) / time / 1000.f;
1116 		down += (std::int64_t(p->blocks_received) * 0x4000) / time / 1000.f;
1117 		delete p;
1118 	}
1119 
1120 	std::printf("=========================\n"
1121 		"suggests: %d suggested-requests: %d\n"
1122 		"total sent: %.1f %% received: %.1f %%\n"
1123 		"rate sent: %.1f MB/s received: %.1f MB/s\n"
1124 		, int(num_suggest), int(num_suggested_requests)
1125 		, total_sent * 0x4000 * 100.f / float(ti.total_size())
1126 		, total_received * 0x4000 * 100.f / float(ti.total_size())
1127 		, up, down);
1128 
1129 	return 0;
1130 }
1131