1 /*
2
3 Copyright (c) 2008, Arvid Norberg
4 All rights reserved.
5
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30
31 */
32
33 #include "libtorrent/peer_id.hpp"
34 #include "libtorrent/io_service.hpp"
35 #include "libtorrent/socket.hpp"
36 #include "libtorrent/address.hpp"
37 #include "libtorrent/error_code.hpp"
38 #include "libtorrent/io.hpp"
39 #include "libtorrent/torrent_info.hpp"
40 #include "libtorrent/create_torrent.hpp"
41 #include "libtorrent/hasher.hpp"
42 #include "libtorrent/socket_io.hpp"
43 #include "libtorrent/file_pool.hpp"
44 #include "libtorrent/string_view.hpp"
45 #include <random>
46 #include <cstring>
47 #include <thread>
48 #include <functional>
49 #include <iostream>
50 #include <atomic>
51 #include <array>
52 #include <chrono>
53
54 #if BOOST_ASIO_DYN_LINK
55 #include <boost/asio/impl/src.hpp>
56 #endif
57
58 using namespace lt;
59 using namespace lt::detail; // for write_* and read_*
60
61 using namespace std::placeholders;
62
generate_block(span<std::uint32_t> buffer,piece_index_t const piece,int const offset)63 void generate_block(span<std::uint32_t> buffer, piece_index_t const piece
64 , int const offset)
65 {
66 std::uint32_t const fill = (static_cast<int>(piece) << 8) | ((offset / 0x4000) & 0xff);
67 for (auto& w : buffer) w = fill;
68 }
69
70 // in order to circumvent the restricton of only
71 // one connection per IP that most clients implement
72 // all sockets created by this tester are bound to
73 // uniqe local IPs in the range (127.0.0.1 - 127.255.255.255)
74 // it's only enabled if the target is also on the loopback
75 int local_if_counter = 0;
76 bool local_bind = false;
77
78 // when set to true, blocks downloaded are verified to match
79 // the test torrents
80 bool verify_downloads = false;
81
82 // if this is true, one block in 1000 will be sent corrupt.
83 // this only applies to dual and upload tests
84 bool test_corruption = false;
85
86 // number of seeds we've spawned. The test is terminated
87 // when this reaches zero, for dual tests
88 std::atomic<int> num_seeds(0);
89
90 // the kind of test to run. Upload sends data to a
91 // bittorrent client, download requests data from
92 // a client and dual uploads and downloads from a client
93 // at the same time (this is presumably the most realistic
94 // test)
95 enum test_mode_t{ none, upload_test, download_test, dual_test };
96 test_mode_t test_mode = none;
97
98 // the number of suggest messages received (total across all peers)
99 std::atomic<int> num_suggest(0);
100
101 // the number of requests made from suggested pieces
102 std::atomic<int> num_suggested_requests(0);
103
leaf_path(std::string f)104 std::string leaf_path(std::string f)
105 {
106 if (f.empty()) return "";
107 char const* first = f.c_str();
108 char const* sep = strrchr(first, '/');
109 #if defined(TORRENT_WINDOWS) || defined(TORRENT_OS2)
110 char const* altsep = strrchr(first, '\\');
111 if (sep == 0 || altsep > sep) sep = altsep;
112 #endif
113 if (sep == nullptr) return f;
114
115 if (sep - first == int(f.size()) - 1)
116 {
117 // if the last character is a / (or \)
118 // ignore it
119 int len = 0;
120 while (sep > first)
121 {
122 --sep;
123 if (*sep == '/'
124 #if defined(TORRENT_WINDOWS) || defined(TORRENT_OS2)
125 || *sep == '\\'
126 #endif
127 )
128 return std::string(sep + 1, len);
129 ++len;
130 }
131 return std::string(first, len);
132 }
133 return std::string(sep + 1);
134 }
135
136 namespace {
137 std::random_device dev;
138 std::mt19937 rng(dev());
139 }
140
141 struct peer_conn
142 {
peer_connpeer_conn143 peer_conn(io_service& ios, int num_pieces, int blocks_pp, tcp::endpoint const& ep
144 , char const* ih, bool seed_, int churn_, bool corrupt_)
145 : s(ios)
146 , read_pos(0)
147 , state(handshaking)
148 , choked(true)
149 , current_piece(-1)
150 , current_piece_is_allowed(false)
151 , block(0)
152 , blocks_per_piece(blocks_pp)
153 , info_hash(ih)
154 , outstanding_requests(0)
155 , seed(seed_)
156 , fast_extension(false)
157 , blocks_received(0)
158 , blocks_sent(0)
159 , num_pieces(num_pieces)
160 , start_time(clock_type::now())
161 , churn(churn_)
162 , corrupt(corrupt_)
163 , endpoint(ep)
164 , restarting(false)
165 {
166 corruption_counter = rand() % 1000;
167 if (seed) ++num_seeds;
168 pieces.reserve(num_pieces);
169 start_conn();
170 }
171
start_connpeer_conn172 void start_conn()
173 {
174 if (local_bind)
175 {
176 error_code ec;
177 s.open(endpoint.protocol(), ec);
178 if (ec)
179 {
180 close("ERROR OPEN: %s", ec);
181 return;
182 }
183 tcp::endpoint bind_if(address_v4(
184 (127 << 24)
185 + ((local_if_counter / 255) << 16)
186 + ((local_if_counter % 255) + 1)), 0);
187 ++local_if_counter;
188 s.bind(bind_if, ec);
189 if (ec)
190 {
191 close("ERROR BIND: %s", ec);
192 return;
193 }
194 }
195 restarting = false;
196 s.async_connect(endpoint, std::bind(&peer_conn::on_connect, this, _1));
197 }
198
199 tcp::socket s;
200 char write_buf_proto[100];
201 std::uint32_t write_buffer[17*1024/4];
202 std::uint32_t buffer[17*1024/4];
203 int read_pos;
204 int corruption_counter;
205
206 enum state_t
207 {
208 handshaking,
209 sending_request,
210 receiving_message
211 };
212 int state;
213 std::vector<piece_index_t> pieces;
214 std::vector<piece_index_t> suggested_pieces;
215 std::vector<piece_index_t> allowed_fast;
216 bool choked;
217 piece_index_t current_piece; // the piece we're currently requesting blocks from
218 bool current_piece_is_allowed;
219 int block;
220 int blocks_per_piece;
221 char const* info_hash;
222 int outstanding_requests;
223 // if this is true, this connection is a seed
224 bool seed;
225 bool fast_extension;
226 int blocks_received;
227 int blocks_sent;
228 int num_pieces;
229 time_point start_time;
230 time_point end_time;
231 int churn;
232 bool corrupt;
233 tcp::endpoint endpoint;
234 bool restarting;
235
on_connectpeer_conn236 void on_connect(error_code const& ec)
237 {
238 if (ec)
239 {
240 close("ERROR CONNECT: %s", ec);
241 return;
242 }
243
244 char handshake[] = "\x13" "BitTorrent protocol\0\0\0\0\0\0\0\x04"
245 " " // space for info-hash
246 "aaaaaaaaaaaaaaaaaaaa" // peer-id
247 "\0\0\0\x01\x02"; // interested
248 char* h = (char*)malloc(sizeof(handshake));
249 memcpy(h, handshake, sizeof(handshake));
250 std::memcpy(h + 28, info_hash, 20);
251 std::generate(h + 48, h + 68, &rand);
252 // for seeds, don't send the interested message
253 boost::asio::async_write(s, boost::asio::buffer(h, (sizeof(handshake) - 1) - (seed ? 5 : 0))
254 , std::bind(&peer_conn::on_handshake, this, h, _1, _2));
255 }
256
on_handshakepeer_conn257 void on_handshake(char* h, error_code const& ec, size_t)
258 {
259 free(h);
260 if (ec)
261 {
262 close("ERROR SEND HANDSHAKE: %s", ec);
263 return;
264 }
265
266 // read handshake
267 boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 68)
268 , std::bind(&peer_conn::on_handshake2, this, _1, _2));
269 }
270
on_handshake2peer_conn271 void on_handshake2(error_code const& ec, size_t)
272 {
273 if (ec)
274 {
275 close("ERROR READ HANDSHAKE: %s", ec);
276 return;
277 }
278
279 // buffer is the full 68 byte handshake
280 // look at the extension bits
281
282 fast_extension = (((char*)buffer)[27] & 4) != 0;
283
284 if (seed)
285 {
286 write_have_all();
287 }
288 else
289 {
290 work_download();
291 }
292 }
293
write_have_allpeer_conn294 void write_have_all()
295 {
296 if (fast_extension)
297 {
298 char* ptr = write_buf_proto;
299 // have_all
300 write_uint32(1, ptr);
301 write_uint8(0xe, ptr);
302 // unchoke
303 write_uint32(1, ptr);
304 write_uint8(1, ptr);
305 boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, ptr - write_buf_proto)
306 , std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
307 }
308 else
309 {
310 // bitfield
311 int len = (num_pieces + 7) / 8;
312 char* ptr = (char*)buffer;
313 write_uint32(len + 1, ptr);
314 write_uint8(5, ptr);
315 memset(ptr, 255, len);
316 ptr += len;
317 // unchoke
318 write_uint32(1, ptr);
319 write_uint8(1, ptr);
320 boost::asio::async_write(s, boost::asio::buffer((char*)buffer, len + 10)
321 , std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
322 }
323 }
324
on_have_all_sentpeer_conn325 void on_have_all_sent(error_code const& ec, size_t)
326 {
327 if (ec)
328 {
329 close("ERROR SEND HAVE ALL: %s", ec);
330 return;
331 }
332
333 // read message
334 boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
335 , std::bind(&peer_conn::on_msg_length, this, _1, _2));
336 }
337
write_requestpeer_conn338 bool write_request()
339 {
340 // if we're choked (and there are no allowed-fast pieces left)
341 if (choked && allowed_fast.empty() && !current_piece_is_allowed) return false;
342
343 // if there are no pieces left to request
344 if (pieces.empty() && suggested_pieces.empty()
345 && current_piece == piece_index_t(-1))
346 {
347 return false;
348 }
349
350 if (current_piece == piece_index_t(-1))
351 {
352 // pick a new piece
353 if (choked && allowed_fast.size() > 0)
354 {
355 current_piece = allowed_fast.front();
356 allowed_fast.erase(allowed_fast.begin());
357 current_piece_is_allowed = true;
358 }
359 else if (suggested_pieces.size() > 0)
360 {
361 current_piece = suggested_pieces.front();
362 suggested_pieces.erase(suggested_pieces.begin());
363 ++num_suggested_requests;
364 current_piece_is_allowed = false;
365 }
366 else if (pieces.size() > 0)
367 {
368 current_piece = pieces.front();
369 pieces.erase(pieces.begin());
370 current_piece_is_allowed = false;
371 }
372 else
373 {
374 TORRENT_ASSERT_FAIL();
375 }
376 }
377 char msg[] = "\0\0\0\xd\x06"
378 " " // piece
379 " " // offset
380 " "; // length
381 char* m = (char*)malloc(sizeof(msg));
382 memcpy(m, msg, sizeof(msg));
383 char* ptr = m + 5;
384 write_uint32(static_cast<int>(current_piece), ptr);
385 write_uint32(block * 16 * 1024, ptr);
386 write_uint32(16 * 1024, ptr);
387 boost::asio::async_write(s, boost::asio::buffer(m, sizeof(msg) - 1)
388 , std::bind(&peer_conn::on_req_sent, this, m, _1, _2));
389
390 ++outstanding_requests;
391 ++block;
392 if (block == blocks_per_piece)
393 {
394 block = 0;
395 current_piece = piece_index_t(-1);
396 current_piece_is_allowed = false;
397 }
398 return true;
399 }
400
on_req_sentpeer_conn401 void on_req_sent(char* m, error_code const& ec, size_t)
402 {
403 free(m);
404 if (ec)
405 {
406 close("ERROR SEND REQUEST: %s", ec);
407 return;
408 }
409
410 work_download();
411 }
412
closepeer_conn413 void close(char const* fmt, error_code const& ec)
414 {
415 end_time = clock_type::now();
416 char tmp[1024];
417 std::snprintf(tmp, sizeof(tmp), fmt, ec.message().c_str());
418 int time = int(total_milliseconds(end_time - start_time));
419 if (time == 0) time = 1;
420 float up = (std::int64_t(blocks_sent) * 0x4000) / time / 1000.f;
421 float down = (std::int64_t(blocks_received) * 0x4000) / time / 1000.f;
422 error_code e;
423
424 char ep_str[200];
425 address const& addr = s.local_endpoint(e).address();
426 if (addr.is_v6())
427 std::snprintf(ep_str, sizeof(ep_str), "[%s]:%d", addr.to_string(e).c_str()
428 , s.local_endpoint(e).port());
429 else
430 std::snprintf(ep_str, sizeof(ep_str), "%s:%d", addr.to_string(e).c_str()
431 , s.local_endpoint(e).port());
432 std::printf("%s ep: %s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n"
433 , tmp, ep_str, blocks_sent, blocks_received, time, up, down);
434 if (seed) --num_seeds;
435 }
436
work_downloadpeer_conn437 void work_download()
438 {
439 if (pieces.empty()
440 && suggested_pieces.empty()
441 && current_piece == piece_index_t(-1)
442 && outstanding_requests == 0
443 && blocks_received >= num_pieces * blocks_per_piece)
444 {
445 close("COMPLETED DOWNLOAD", error_code());
446 return;
447 }
448
449 // send requests
450 if (outstanding_requests < 40)
451 {
452 if (write_request()) return;
453 }
454
455 // read message
456 boost::asio::async_read(s, boost::asio::buffer((char*)buffer, 4)
457 , std::bind(&peer_conn::on_msg_length, this, _1, _2));
458 }
459
on_msg_lengthpeer_conn460 void on_msg_length(error_code const& ec, size_t)
461 {
462 if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
463 && restarting)
464 {
465 start_conn();
466 return;
467 }
468
469 if (ec)
470 {
471 close("ERROR RECEIVE MESSAGE PREFIX: %s", ec);
472 return;
473 }
474 char* ptr = (char*)buffer;
475 unsigned int length = read_uint32(ptr);
476 if (length > sizeof(buffer))
477 {
478 std::fprintf(stderr, "len: %d\n", length);
479 close("ERROR RECEIVE MESSAGE PREFIX: packet too big", error_code());
480 return;
481 }
482 boost::asio::async_read(s, boost::asio::buffer((char*)buffer, length)
483 , std::bind(&peer_conn::on_message, this, _1, _2));
484 }
485
on_messagepeer_conn486 void on_message(error_code const& ec, size_t bytes_transferred)
487 {
488 if ((ec == boost::asio::error::operation_aborted || ec == boost::asio::error::bad_descriptor)
489 && restarting)
490 {
491 start_conn();
492 return;
493 }
494
495 if (ec)
496 {
497 close("ERROR RECEIVE MESSAGE: %s", ec);
498 return;
499 }
500 char* ptr = (char*)buffer;
501 int msg = read_uint8(ptr);
502
503 if (test_mode == dual_test && num_seeds == 0)
504 {
505 TORRENT_ASSERT(!seed);
506 close("NO MORE SEEDS, test done", error_code());
507 return;
508 }
509
510 //std::printf("msg: %d len: %d\n", msg, int(bytes_transferred));
511
512 if (seed)
513 {
514 if (msg == 6)
515 {
516 if (bytes_transferred != 13)
517 {
518 close("REQUEST packet has invalid size", error_code());
519 return;
520 }
521 piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
522 int const start = detail::read_int32(ptr);
523 int const length = detail::read_int32(ptr);
524 write_piece(piece, start, length);
525 }
526 else if (msg == 3) // not-interested
527 {
528 close("DONE", error_code());
529 return;
530 }
531 else
532 {
533 // read another message
534 boost::asio::async_read(s, boost::asio::buffer(buffer, 4)
535 , std::bind(&peer_conn::on_msg_length, this, _1, _2));
536 }
537 }
538 else
539 {
540 if (msg == 0xe) // have_all
541 {
542 // build a list of all pieces and request them all!
543 pieces.resize(num_pieces);
544 for (piece_index_t i(0); i < piece_index_t(int(pieces.size())); ++i)
545 pieces[static_cast<int>(i)] = i;
546 std::shuffle(pieces.begin(), pieces.end(), rng);
547 }
548 else if (msg == 4) // have
549 {
550 piece_index_t const piece(detail::read_int32(ptr));
551 if (pieces.empty()) pieces.push_back(piece);
552 else pieces.insert(pieces.begin() + (rand() % pieces.size()), piece);
553 }
554 else if (msg == 5) // bitfield
555 {
556 pieces.reserve(num_pieces);
557 piece_index_t piece(0);
558 for (int i = 0; i < int(bytes_transferred); ++i)
559 {
560 int mask = 0x80;
561 for (int k = 0; k < 8; ++k)
562 {
563 if (piece > piece_index_t(num_pieces)) break;
564 if (*ptr & mask) pieces.push_back(piece);
565 mask >>= 1;
566 ++piece;
567 }
568 ++ptr;
569 }
570 std::shuffle(pieces.begin(), pieces.end(), rng);
571 }
572 else if (msg == 7) // piece
573 {
574 if (verify_downloads)
575 {
576 piece_index_t const piece(read_uint32(ptr));
577 int start = read_uint32(ptr);
578 int size = int(bytes_transferred) - 9;
579 verify_piece(piece, start, ptr, size);
580 }
581 ++blocks_received;
582 --outstanding_requests;
583 piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
584 int start = detail::read_int32(ptr);
585
586 if (churn && (blocks_received % churn) == 0) {
587 outstanding_requests = 0;
588 restarting = true;
589 s.close();
590 return;
591 }
592 if (int((start + bytes_transferred) / 0x4000) == blocks_per_piece)
593 {
594 write_have(piece);
595 return;
596 }
597 }
598 else if (msg == 13) // suggest
599 {
600 piece_index_t const piece(detail::read_int32(ptr));
601 auto i = std::find(pieces.begin(), pieces.end(), piece);
602 if (i != pieces.end())
603 {
604 pieces.erase(i);
605 suggested_pieces.push_back(piece);
606 ++num_suggest;
607 }
608 }
609 else if (msg == 16) // reject request
610 {
611 piece_index_t const piece(detail::read_int32(ptr));
612 int start = detail::read_int32(ptr);
613 int length = detail::read_int32(ptr);
614
615 // put it back!
616 if (current_piece != piece)
617 {
618 if (pieces.empty() || pieces.back() != piece)
619 pieces.push_back(piece);
620 }
621 else
622 {
623 block = std::min(start / 0x4000, block);
624 if (block == 0)
625 {
626 pieces.push_back(current_piece);
627 current_piece = piece_index_t(-1);
628 current_piece_is_allowed = false;
629 }
630 }
631 --outstanding_requests;
632 std::fprintf(stderr, "REJECT: [ piece: %d start: %d length: %d ]\n"
633 , static_cast<int>(piece), start, length);
634 }
635 else if (msg == 0) // choke
636 {
637 choked = true;
638 }
639 else if (msg == 1) // unchoke
640 {
641 choked = false;
642 }
643 else if (msg == 17) // allowed_fast
644 {
645 piece_index_t const piece = piece_index_t(detail::read_int32(ptr));
646 auto i = std::find(pieces.begin(), pieces.end(), piece);
647 if (i != pieces.end())
648 {
649 pieces.erase(i);
650 allowed_fast.push_back(piece);
651 }
652 }
653 work_download();
654 }
655 }
656
verify_piecepeer_conn657 bool verify_piece(piece_index_t const piece, int start, char const* ptr, int size)
658 {
659 std::uint32_t* buf = (std::uint32_t*)ptr;
660 std::uint32_t const fill = (static_cast<int>(piece) << 8) | ((start / 0x4000) & 0xff);
661 for (int i = 0; i < size / 4; ++i)
662 {
663 if (buf[i] != fill)
664 {
665 std::fprintf(stderr, "received invalid block. piece %d block %d\n"
666 , static_cast<int>(piece), start / 0x4000);
667 exit(1);
668 }
669 }
670 return true;
671 }
672
write_piecepeer_conn673 void write_piece(piece_index_t const piece, int start, int length)
674 {
675 generate_block({write_buffer, length / 4}
676 , piece, start);
677
678 if (corrupt)
679 {
680 --corruption_counter;
681 if (corruption_counter == 0)
682 {
683 corruption_counter = 1000;
684 std::memset(write_buffer, 0, 10);
685 }
686 }
687 char* ptr = write_buf_proto;
688 write_uint32(9 + length, ptr);
689 assert(length == 0x4000);
690 write_uint8(7, ptr);
691 write_uint32(static_cast<int>(piece), ptr);
692 write_uint32(start, ptr);
693 std::array<boost::asio::const_buffer, 2> vec;
694 vec[0] = boost::asio::buffer(write_buf_proto, ptr - write_buf_proto);
695 vec[1] = boost::asio::buffer(write_buffer, length);
696 boost::asio::async_write(s, vec, std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
697 ++blocks_sent;
698 if (churn && (blocks_sent % churn) == 0 && seed) {
699 outstanding_requests = 0;
700 restarting = true;
701 s.close();
702 }
703 }
704
write_havepeer_conn705 void write_have(piece_index_t const piece)
706 {
707 char* ptr = write_buf_proto;
708 write_uint32(5, ptr);
709 write_uint8(4, ptr);
710 write_uint32(static_cast<int>(piece), ptr);
711 boost::asio::async_write(s, boost::asio::buffer(write_buf_proto, 9), std::bind(&peer_conn::on_have_all_sent, this, _1, _2));
712 }
713 };
714
print_usage()715 void print_usage()
716 {
717 std::fprintf(stderr, "usage: connection_tester command [options]\n\n"
718 "command is one of:\n"
719 " gen-torrent generate a test torrent\n"
720 " options for this command:\n"
721 " -s <size> the size of the torrent in megabytes\n"
722 " -n <num-files> the number of files in the test torrent\n"
723 " -a introduce a lot of pad-files\n"
724 " (pad files are not supported for gen-data or upload)\n"
725 " -t <file> the file to save the .torrent file to\n"
726 " -T <name> the name of the torrent (and directory\n"
727 " its files are saved in)\n\n"
728 " gen-data generate the data file(s) for the test torrent\n"
729 " options for this command:\n"
730 " -t <file> the torrent file that was previously generated\n"
731 " -P <path> the path to where the data should be stored\n\n"
732 " gen-test-torrents generate many test torrents (cannot be used for up/down tests)\n"
733 " options for this command:\n"
734 " -N <num-torrents> number of torrents to generate\n"
735 " -n <num-files> number of files in each torrent\n"
736 " -t <name> base name of torrent files (index is appended)\n\n"
737 " -T <URL> add the specified tracker URL to each torrent\n"
738 " this option may appear multiple times\n\n"
739 " upload start an uploader test\n"
740 " download start a downloader test\n"
741 " dual start a download and upload test\n"
742 " options for these commands:\n"
743 " -c <num-conns> the number of connections to make to the target\n"
744 " -d <dst> the IP address of the target\n"
745 " -p <dst-port> the port the target listens on\n"
746 " -t <torrent-file> the torrent file previously generated by gen-torrent\n"
747 " -C send corrupt pieces sometimes (applies to upload and dual)\n"
748 " -r <reconnects> churn - number of reconnects per second\n\n"
749 "examples:\n\n"
750 "connection_tester gen-torrent -s 1024 -n 4 -t test.torrent\n"
751 "connection_tester upload -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n"
752 "connection_tester download -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n"
753 "connection_tester dual -c 200 -d 127.0.0.1 -p 6881 -t test.torrent\n");
754 exit(1);
755 }
756
hasher_thread(lt::create_torrent * t,piece_index_t const start_piece,piece_index_t const end_piece,int piece_size,bool print)757 void hasher_thread(lt::create_torrent* t, piece_index_t const start_piece
758 , piece_index_t const end_piece, int piece_size, bool print)
759 {
760 if (print) std::fprintf(stderr, "\n");
761 std::uint32_t piece[0x4000 / 4];
762 for (piece_index_t i = start_piece; i < end_piece; ++i)
763 {
764 hasher ph;
765 for (int j = 0; j < piece_size; j += 0x4000)
766 {
767 generate_block(piece, i, j);
768 ph.update(reinterpret_cast<char*>(piece), 0x4000);
769 }
770 t->set_hash(i, ph.final());
771 int const range = static_cast<int>(end_piece) - static_cast<int>(start_piece);
772 if (print && (static_cast<int>(i) & 1))
773 {
774 int const delta_piece = static_cast<int>(i) - static_cast<int>(start_piece);
775 std::fprintf(stderr, "\r%.1f %% ", float(delta_piece * 100) / float(range));
776 }
777 }
778 if (print) std::fprintf(stderr, "\n");
779 }
780
781 // size is in megabytes
generate_torrent(std::vector<char> & buf,int num_pieces,int num_files,char const * torrent_name,bool with_padding)782 void generate_torrent(std::vector<char>& buf, int num_pieces, int num_files
783 , char const* torrent_name, bool with_padding)
784 {
785 file_storage fs;
786 // 1 MiB piece size
787 const int piece_size = 1024 * 1024;
788 const std::int64_t total_size = std::int64_t(piece_size) * num_pieces;
789
790 std::int64_t s = total_size;
791 int file_index = 0;
792 std::int64_t file_size = total_size / num_files;
793 while (s > 0)
794 {
795 char b[100];
796 std::snprintf(b, sizeof(b), "%s/stress_test%d", torrent_name, file_index);
797 ++file_index;
798 fs.add_file(b, std::min(s, file_size));
799 s -= file_size;
800 file_size += 200;
801 }
802
803 lt::create_torrent t(fs, piece_size, with_padding ? 100 : -1);
804
805 num_pieces = t.num_pieces();
806
807 int const num_threads = std::thread::hardware_concurrency()
808 ? std::thread::hardware_concurrency() : 4;
809 std::printf("hashing in %d threads\n", num_threads);
810
811 std::vector<std::thread> threads;
812 threads.reserve(num_threads);
813 for (int i = 0; i < num_threads; ++i)
814 {
815 threads.emplace_back(&hasher_thread, &t
816 , piece_index_t(i * num_pieces / num_threads)
817 , piece_index_t((i + 1) * num_pieces / num_threads)
818 , piece_size
819 , i == 0);
820 }
821
822 for (auto& i : threads)
823 i.join();
824
825 std::back_insert_iterator<std::vector<char>> out(buf);
826 bencode(out, t.generate());
827 }
828
generate_data(char const * path,torrent_info const & ti)829 void generate_data(char const* path, torrent_info const& ti)
830 {
831 file_storage const& fs = ti.files();
832
833 file_pool fp;
834
835 aux::vector<download_priority_t, file_index_t> priorities;
836 sha1_hash info_hash;
837 storage_params params{
838 fs,
839 nullptr,
840 path,
841 storage_mode_sparse,
842 priorities,
843 info_hash
844 };
845
846 std::unique_ptr<storage_interface> st(default_storage_constructor(params, fp));
847
848 {
849 storage_error error;
850 st->initialize(error);
851 }
852
853 std::uint32_t piece[0x4000 / 4];
854 for (piece_index_t i(0); i < piece_index_t(ti.num_pieces()); ++i)
855 {
856 for (int j = 0; j < ti.piece_size(i); j += 0x4000)
857 {
858 generate_block(piece, i, j);
859 int const left_in_piece = ti.piece_size(i) - j;
860 iovec_t const b = { reinterpret_cast<char*>(piece)
861 , std::min(left_in_piece, 0x4000)};
862 storage_error error;
863 st->writev(b, i, j, open_mode::write_only, error);
864 if (error)
865 std::fprintf(stderr, "storage error: %s\n", error.ec.message().c_str());
866 }
867 if (static_cast<int>(i) & 1)
868 {
869 std::fprintf(stderr, "\r%.1f %% ", float(static_cast<int>(i) * 100) / float(ti.num_pieces()));
870 }
871 }
872 }
873
io_thread(io_service * ios)874 void io_thread(io_service* ios)
875 {
876 error_code ec;
877 ios->run(ec);
878 if (ec) std::fprintf(stderr, "ERROR: %s\n", ec.message().c_str());
879 }
880
main(int argc,char * argv[])881 int main(int argc, char* argv[])
882 {
883 if (argc <= 1) print_usage();
884
885 char const* command = argv[1];
886 int size = 1000;
887 int num_files = 10;
888 int num_torrents = 1;
889 char const* torrent_file = "benchmark.torrent";
890 char const* data_path = ".";
891 int num_connections = 50;
892 char const* destination_ip = "127.0.0.1";
893 int destination_port = 6881;
894 int churn = 0;
895 bool gen_pad_files = false;
896 std::vector<std::string> trackers;
897
898 argv += 2;
899 argc -= 2;
900
901 while (argc > 0)
902 {
903 char const* optname = argv[0];
904 ++argv;
905 --argc;
906
907 if (optname[0] != '-' || strlen(optname) != 2)
908 {
909 std::fprintf(stderr, "unknown option: %s\n", optname);
910 continue;
911 }
912
913 // options with no arguments
914 switch (optname[1])
915 {
916 case 'C': test_corruption = true; continue;
917 case 'a': gen_pad_files = true; continue;
918 }
919
920 if (argc == 0)
921 {
922 std::fprintf(stderr, "missing argument for option: %s\n", optname);
923 break;
924 }
925
926 char const* optarg = argv[0];
927 ++argv;
928 --argc;
929
930 switch (optname[1])
931 {
932 case 's': size = atoi(optarg); break;
933 case 'n': num_files = atoi(optarg); break;
934 case 'N': num_torrents = atoi(optarg); break;
935 case 't': torrent_file = optarg; break;
936 case 'T': trackers.push_back(optarg); break;
937 case 'P': data_path = optarg; break;
938 case 'c': num_connections = atoi(optarg); break;
939 case 'p': destination_port = atoi(optarg); break;
940 case 'd': destination_ip = optarg; break;
941 case 'r': churn = atoi(optarg); break;
942 default: std::fprintf(stderr, "unknown option: %s\n", optname);
943 }
944 }
945
946 if (command == "gen-torrent"_sv)
947 {
948 std::vector<char> tmp;
949 std::string name = leaf_path(torrent_file);
950 name = name.substr(0, name.find_last_of('.'));
951 std::printf("generating torrent: %s\n", name.c_str());
952 generate_torrent(tmp, size ? size : 1024, num_files ? num_files : 1
953 , name.c_str(), gen_pad_files);
954
955 FILE* output = stdout;
956 if ("-"_sv != torrent_file)
957 {
958 if( (output = std::fopen(torrent_file, "wb+")) == nullptr)
959 {
960 std::fprintf(stderr, "Could not open file '%s' for writing: %s\n"
961 , torrent_file, std::strerror(errno));
962 exit(2);
963 }
964 }
965 std::fprintf(stderr, "writing file to: %s\n", torrent_file);
966 fwrite(&tmp[0], 1, tmp.size(), output);
967 if (output != stdout)
968 std::fclose(output);
969
970 return 0;
971 }
972 else if (command == "gen-data"_sv)
973 {
974 error_code ec;
975 torrent_info ti(torrent_file, ec);
976 if (ec)
977 {
978 std::fprintf(stderr, "ERROR LOADING .TORRENT: %s\n", ec.message().c_str());
979 return 1;
980 }
981 generate_data(data_path, ti);
982 return 0;
983 }
984 else if (command == "gen-test-torrents"_sv)
985 {
986 std::vector<char> buf;
987 for (int i = 0; i < num_torrents; ++i)
988 {
989 char torrent_name[100];
990 std::snprintf(torrent_name, sizeof(torrent_name), "%s-%d.torrent", torrent_file, i);
991
992 file_storage fs;
993 for (int j = 0; j < num_files; ++j)
994 {
995 char file_name[100];
996 std::snprintf(file_name, sizeof(file_name), "%s-%d/file-%d", torrent_file, i, j);
997 fs.add_file(file_name, std::int64_t(j + i + 1) * 251);
998 }
999 // 1 MiB piece size
1000 const int piece_size = 1024 * 1024;
1001 lt::create_torrent t(fs, piece_size);
1002 sha1_hash zero(nullptr);
1003 for (auto const k : fs.piece_range())
1004 t.set_hash(k, zero);
1005
1006 int tier = 0;
1007 for (auto const& tr : trackers)
1008 t.add_tracker(tr, tier++);
1009
1010 buf.clear();
1011 std::back_insert_iterator<std::vector<char>> out(buf);
1012 bencode(out, t.generate());
1013 FILE* f = std::fopen(torrent_name, "w+");
1014 if (f == nullptr)
1015 {
1016 std::fprintf(stderr, "Could not open file '%s' for writing: %s\n"
1017 , torrent_name, std::strerror(errno));
1018 return 1;
1019 }
1020 size_t ret = fwrite(buf.data(), 1, buf.size(), f);
1021 if (ret != buf.size())
1022 {
1023 std::fprintf(stderr, "write returned: %d (expected %d)\n", int(ret), int(buf.size()));
1024 std::fclose(f);
1025 return 1;
1026 }
1027 std::printf("wrote %s\n", torrent_name);
1028 std::fclose(f);
1029 }
1030 return 0;
1031 }
1032 else if (command == "upload"_sv)
1033 {
1034 test_mode = upload_test;
1035 }
1036 else if (command == "download"_sv)
1037 {
1038 test_mode = download_test;
1039 }
1040 else if (command == "dual"_sv)
1041 {
1042 test_mode = dual_test;
1043 }
1044 else
1045 {
1046 std::fprintf(stderr, "unknown command: %s\n\n", command);
1047 print_usage();
1048 }
1049
1050 error_code ec;
1051 address_v4 addr = address_v4::from_string(destination_ip, ec);
1052 if (ec)
1053 {
1054 std::fprintf(stderr, "ERROR RESOLVING %s: %s\n", destination_ip, ec.message().c_str());
1055 return 1;
1056 }
1057 tcp::endpoint ep(addr, std::uint16_t(destination_port));
1058
1059 #if !defined __APPLE__
1060 // apparently darwin doesn't seems to let you bind to
1061 // loopback on any other IP than 127.0.0.1
1062 std::uint32_t const ip = addr.to_ulong();
1063 if ((ip & 0xff000000) == 0x7f000000)
1064 {
1065 local_bind = true;
1066 }
1067 #endif
1068
1069 torrent_info ti(torrent_file, ec);
1070 if (ec)
1071 {
1072 std::fprintf(stderr, "ERROR LOADING .TORRENT: %s\n", ec.message().c_str());
1073 return 1;
1074 }
1075
1076 std::vector<peer_conn*> conns;
1077 conns.reserve(num_connections);
1078 int const num_threads = 2;
1079 io_service ios[num_threads];
1080 for (int i = 0; i < num_connections; ++i)
1081 {
1082 bool corrupt = test_corruption && (i & 1) == 0;
1083 bool seed = false;
1084 if (test_mode == upload_test) seed = true;
1085 else if (test_mode == dual_test) seed = (i & 1);
1086 conns.push_back(new peer_conn(ios[i % num_threads], ti.num_pieces(), ti.piece_length() / 16 / 1024
1087 , ep, (char const*)&ti.info_hash()[0], seed, churn, corrupt));
1088 std::this_thread::sleep_for(std::chrono::milliseconds(1));
1089 ios[i % num_threads].poll_one(ec);
1090 if (ec)
1091 {
1092 std::fprintf(stderr, "ERROR: %s\n", ec.message().c_str());
1093 break;
1094 }
1095 }
1096
1097 std::thread t1(&io_thread, &ios[0]);
1098 std::thread t2(&io_thread, &ios[1]);
1099
1100 t1.join();
1101 t2.join();
1102
1103 float up = 0.f;
1104 float down = 0.f;
1105 std::uint64_t total_sent = 0;
1106 std::uint64_t total_received = 0;
1107
1108 for (std::vector<peer_conn*>::iterator i = conns.begin()
1109 , end(conns.end()); i != end; ++i)
1110 {
1111 peer_conn* p = *i;
1112 int time = int(total_milliseconds(p->end_time - p->start_time));
1113 if (time == 0) time = 1;
1114 total_sent += p->blocks_sent;
1115 up += (std::int64_t(p->blocks_sent) * 0x4000) / time / 1000.f;
1116 down += (std::int64_t(p->blocks_received) * 0x4000) / time / 1000.f;
1117 delete p;
1118 }
1119
1120 std::printf("=========================\n"
1121 "suggests: %d suggested-requests: %d\n"
1122 "total sent: %.1f %% received: %.1f %%\n"
1123 "rate sent: %.1f MB/s received: %.1f MB/s\n"
1124 , int(num_suggest), int(num_suggested_requests)
1125 , total_sent * 0x4000 * 100.f / float(ti.total_size())
1126 , total_received * 0x4000 * 100.f / float(ti.total_size())
1127 , up, down);
1128
1129 return 0;
1130 }
1131