1 /*
2 
3 Copyright (c) 2015, Steven Siloti
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include "test.hpp"
34 
35 #include "simulator/simulator.hpp"
36 
37 #include "libtorrent/aux_/listen_socket_handle.hpp"
38 #include "libtorrent/aux_/session_impl.hpp"
39 #include "libtorrent/udp_socket.hpp"
40 #include "libtorrent/kademlia/dht_tracker.hpp"
41 #include "libtorrent/kademlia/dht_state.hpp"
42 #include "libtorrent/performance_counters.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/kademlia/dht_settings.hpp"
45 #include "libtorrent/span.hpp"
46 #include "libtorrent/kademlia/dht_observer.hpp"
47 
48 #include <functional>
49 #include <cstdarg>
50 #include <cmath>
51 
52 using namespace lt;
53 using namespace sim;
54 using namespace std::placeholders;
55 
56 #if !defined TORRENT_DISABLE_DHT
57 
58 struct obs : dht::dht_observer
59 {
set_external_addressobs60 	void set_external_address(lt::aux::listen_socket_handle const&, address const& /* addr */
61 		, address const& /* source */) override
62 	{}
get_listen_portobs63 	int get_listen_port(lt::aux::transport, lt::aux::listen_socket_handle const& s) override
64 	{ return s.get()->udp_external_port(); }
get_peersobs65 	void get_peers(sha1_hash const&) override {}
outgoing_get_peersobs66 	void outgoing_get_peers(sha1_hash const& /* target */
67 		, sha1_hash const& /* sent_target */, udp::endpoint const& /* ep */) override {}
announceobs68 	void announce(sha1_hash const& /* ih */
69 		, address const& /* addr */, int /* port */) override {}
on_dht_requestobs70 	bool on_dht_request(string_view /* query */
71 		, dht::msg const& /* request */, entry& /* response */) override
72 	{ return false; }
73 
74 #ifndef TORRENT_DISABLE_LOGGING
should_logobs75 	bool should_log(module_t) const override { return true; }
logobs76 	void log(dht_logger::module_t, char const* fmt, ...) override
77 	{
78 		va_list v;
79 		va_start(v, fmt);
80 		vprintf(fmt, v);
81 		va_end(v);
82 		puts("\n");
83 	}
log_packetobs84 	void log_packet(message_direction_t /* dir */
85 		, span<char const> /* pkt */
86 		, udp::endpoint const& /* node */) override {}
87 #endif
88 };
89 
send_packet(lt::udp_socket & sock,lt::aux::listen_socket_handle const &,udp::endpoint const & ep,span<char const> p,error_code & ec,udp_send_flags_t const flags)90 void send_packet(lt::udp_socket& sock, lt::aux::listen_socket_handle const&, udp::endpoint const& ep
91 	, span<char const> p, error_code& ec, udp_send_flags_t const flags)
92 {
93 	sock.send(ep, p, ec, flags);
94 }
95 
96 #endif // #if !defined TORRENT_DISABLE_DHT
97 
TORRENT_TEST(dht_rate_limit)98 TORRENT_TEST(dht_rate_limit)
99 {
100 #if !defined TORRENT_DISABLE_DHT
101 
102 	default_config cfg;
103 	simulation sim(cfg);
104 	asio::io_service dht_ios(sim, address_v4::from_string("40.30.20.10"));
105 
106 	// receiver (the DHT under test)
107 	lt::udp_socket sock(dht_ios, lt::aux::listen_socket_handle{});
108 	obs o;
109 	auto ls = std::make_shared<lt::aux::listen_socket_t>();
110 	ls->external_address.cast_vote(address_v4::from_string("40.30.20.10")
111 		, lt::aux::session_interface::source_dht, lt::address());
112 	ls->local_endpoint = tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888);
113 	error_code ec;
114 	sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec);
115 	dht::settings dhtsett;
116 	dhtsett.block_ratelimit = 100000; // disable the DOS blocker
117 	dhtsett.ignore_dark_internet = false;
118 	dhtsett.upload_rate_limit = 400;
119 	float const target_upload_rate = 400;
120 	int const num_packets = 2000;
121 
122 	counters cnt;
123 	dht::dht_state state;
124 	std::unique_ptr<lt::dht::dht_storage_interface> dht_storage(dht::dht_default_storage_constructor(dhtsett));
125 	auto dht = std::make_shared<lt::dht::dht_tracker>(
126 		&o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5)
127 		, dhtsett, cnt, *dht_storage, std::move(state));
128 	dht->new_socket(ls);
129 
130 	bool stop = false;
131 	std::function<void(error_code const&, size_t)> on_read
132 		= [&](error_code const& ec, size_t const /* bytes */)
133 	{
134 		if (ec) return;
135 		udp_socket::packet p;
136 		error_code err;
137 		int const num = int(sock.read(lt::span<udp_socket::packet>(&p, 1), err));
138 		if (num) dht->incoming_packet(ls, p.from, p.data);
139 		if (stop || err) return;
140 		sock.async_read(on_read);
141 	};
142 	sock.async_read(on_read);
143 
144 	// sender
145 	int num_packets_sent = 0;
146 	asio::io_service sender_ios(sim, address_v4::from_string("10.20.30.40"));
147 	udp::socket sender_sock(sender_ios);
148 	sender_sock.open(udp::v4());
149 	sender_sock.bind(udp::endpoint(address_v4(), 4444));
150 	sender_sock.non_blocking(true);
151 	asio::high_resolution_timer timer(sender_ios);
152 	std::function<void(error_code const&)> sender_tick = [&](error_code const&)
153 	{
154 		if (num_packets_sent == num_packets)
155 		{
156 			// we're done. shut down (a second from now, to let the dust settle)
157 			timer.expires_from_now(chrono::seconds(1));
158 			timer.async_wait([&](error_code const&)
159 			{
160 				dht->stop();
161 				stop = true;
162 				sender_sock.close();
163 				sock.close();
164 			});
165 			return;
166 		}
167 
168 		char const packet[] = "d1:ad2:id20:ababababababababababe1:y1:q1:q4:pinge";
169 		sender_sock.send_to(asio::const_buffers_1(packet, sizeof(packet)-1)
170 			, udp::endpoint(address_v4::from_string("40.30.20.10"), 8888));
171 		++num_packets_sent;
172 
173 		timer.expires_from_now(chrono::milliseconds(10));
174 		timer.async_wait(sender_tick);
175 	};
176 	timer.expires_from_now(chrono::milliseconds(10));
177 	timer.async_wait(sender_tick);
178 
179 	udp::endpoint from;
180 	int num_bytes_received = 0;
181 	int num_packets_received = 0;
182 	char buffer[1500];
183 	std::function<void(error_code const&, std::size_t)> on_receive
184 		= [&](error_code const& ec, std::size_t const bytes)
185 	{
186 		if (ec) return;
187 
188 		num_bytes_received += int(bytes);
189 		++num_packets_received;
190 
191 		sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer))
192 			, from, on_receive);
193 	};
194 	sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer))
195 		, from, on_receive);
196 
197 	// run simulation
198 	lt::clock_type::time_point start = lt::clock_type::now();
199 	sim.run();
200 	lt::clock_type::time_point end = lt::clock_type::now();
201 
202 	// subtract one target_upload_rate here, since we initialize the quota to one
203 	// full second worth of bandwidth
204 	float const average_upload_rate = (num_bytes_received - target_upload_rate)
205 		/ (duration_cast<chrono::milliseconds>(end - start).count() * 0.001f);
206 
207 	std::printf("send %d packets. received %d packets (%d bytes). average rate: %f (target: %f)\n"
208 		, num_packets_sent, num_packets_received, num_bytes_received
209 		, average_upload_rate, target_upload_rate);
210 
211 	// the actual upload rate should be within 5% of the target
212 	TEST_CHECK(std::abs(average_upload_rate - target_upload_rate) < target_upload_rate * 0.05);
213 
214 	TEST_EQUAL(cnt[counters::dht_messages_in], num_packets);
215 
216 	// the number of dropped packets + the number of received pings, should equal
217 	// exactly the number of packets we sent
218 	TEST_EQUAL(cnt[counters::dht_messages_in_dropped]
219 		+ cnt[counters::dht_ping_in], num_packets);
220 
221 #endif // #if !defined TORRENT_DISABLE_DHT
222 }
223 
224 // TODO: put test here to take advantage of existing code, refactor
TORRENT_TEST(dht_delete_socket)225 TORRENT_TEST(dht_delete_socket)
226 {
227 #ifndef TORRENT_DISABLE_DHT
228 
229 	sim::default_config cfg;
230 	sim::simulation sim(cfg);
231 	sim::asio::io_service dht_ios(sim, lt::address_v4::from_string("40.30.20.10"));
232 
233 	lt::udp_socket sock(dht_ios, lt::aux::listen_socket_handle{});
234 	error_code ec;
235 	sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec);
236 
237 	obs o;
238 	auto ls = std::make_shared<lt::aux::listen_socket_t>();
239 	ls->external_address.cast_vote(address_v4::from_string("40.30.20.10")
240 		, lt::aux::session_interface::source_dht, lt::address());
241 	ls->local_endpoint = tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888);
242 	dht::settings dhtsett;
243 	counters cnt;
244 	dht::dht_state state;
245 	std::unique_ptr<lt::dht::dht_storage_interface> dht_storage(dht::dht_default_storage_constructor(dhtsett));
246 	auto dht = std::make_shared<lt::dht::dht_tracker>(
247 		&o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5)
248 		, dhtsett, cnt, *dht_storage, std::move(state));
249 
250 	dht->start([](std::vector<std::pair<dht::node_entry, std::string>> const&){});
251 	dht->new_socket(ls);
252 
253 	// schedule the removal of the socket at exactly 2 second,
254 	// this simulates the fact that the internal scheduled call
255 	// to connection_timeout will be executed right after leaving
256 	// the state of cancellable
257 	asio::high_resolution_timer t1(dht_ios);
258 	t1.expires_from_now(chrono::seconds(2));
259 	t1.async_wait([&](error_code const&)
260 		{
261 			dht->delete_socket(ls);
262 		});
263 
264 	// stop the DHT
265 	asio::high_resolution_timer t2(dht_ios);
266 	t2.expires_from_now(chrono::seconds(3));
267 	t2.async_wait([&](error_code const&) { dht->stop(); });
268 
269 	sim.run();
270 
271 #endif // TORRENT_DISABLE_DHT
272 }
273