1 /*
2
3 Copyright (c) 2015, Steven Siloti
4 All rights reserved.
5
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30
31 */
32
33 #include "test.hpp"
34
35 #include "simulator/simulator.hpp"
36
37 #include "libtorrent/aux_/listen_socket_handle.hpp"
38 #include "libtorrent/aux_/session_impl.hpp"
39 #include "libtorrent/udp_socket.hpp"
40 #include "libtorrent/kademlia/dht_tracker.hpp"
41 #include "libtorrent/kademlia/dht_state.hpp"
42 #include "libtorrent/performance_counters.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/kademlia/dht_settings.hpp"
45 #include "libtorrent/span.hpp"
46 #include "libtorrent/kademlia/dht_observer.hpp"
47
48 #include <functional>
49 #include <cstdarg>
50 #include <cmath>
51
52 using namespace lt;
53 using namespace sim;
54 using namespace std::placeholders;
55
56 #if !defined TORRENT_DISABLE_DHT
57
58 struct obs : dht::dht_observer
59 {
set_external_addressobs60 void set_external_address(lt::aux::listen_socket_handle const&, address const& /* addr */
61 , address const& /* source */) override
62 {}
get_listen_portobs63 int get_listen_port(lt::aux::transport, lt::aux::listen_socket_handle const& s) override
64 { return s.get()->udp_external_port(); }
get_peersobs65 void get_peers(sha1_hash const&) override {}
outgoing_get_peersobs66 void outgoing_get_peers(sha1_hash const& /* target */
67 , sha1_hash const& /* sent_target */, udp::endpoint const& /* ep */) override {}
announceobs68 void announce(sha1_hash const& /* ih */
69 , address const& /* addr */, int /* port */) override {}
on_dht_requestobs70 bool on_dht_request(string_view /* query */
71 , dht::msg const& /* request */, entry& /* response */) override
72 { return false; }
73
74 #ifndef TORRENT_DISABLE_LOGGING
should_logobs75 bool should_log(module_t) const override { return true; }
logobs76 void log(dht_logger::module_t, char const* fmt, ...) override
77 {
78 va_list v;
79 va_start(v, fmt);
80 vprintf(fmt, v);
81 va_end(v);
82 puts("\n");
83 }
log_packetobs84 void log_packet(message_direction_t /* dir */
85 , span<char const> /* pkt */
86 , udp::endpoint const& /* node */) override {}
87 #endif
88 };
89
send_packet(lt::udp_socket & sock,lt::aux::listen_socket_handle const &,udp::endpoint const & ep,span<char const> p,error_code & ec,udp_send_flags_t const flags)90 void send_packet(lt::udp_socket& sock, lt::aux::listen_socket_handle const&, udp::endpoint const& ep
91 , span<char const> p, error_code& ec, udp_send_flags_t const flags)
92 {
93 sock.send(ep, p, ec, flags);
94 }
95
96 #endif // #if !defined TORRENT_DISABLE_DHT
97
TORRENT_TEST(dht_rate_limit)98 TORRENT_TEST(dht_rate_limit)
99 {
100 #if !defined TORRENT_DISABLE_DHT
101
102 default_config cfg;
103 simulation sim(cfg);
104 asio::io_service dht_ios(sim, address_v4::from_string("40.30.20.10"));
105
106 // receiver (the DHT under test)
107 lt::udp_socket sock(dht_ios, lt::aux::listen_socket_handle{});
108 obs o;
109 auto ls = std::make_shared<lt::aux::listen_socket_t>();
110 ls->external_address.cast_vote(address_v4::from_string("40.30.20.10")
111 , lt::aux::session_interface::source_dht, lt::address());
112 ls->local_endpoint = tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888);
113 error_code ec;
114 sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec);
115 dht::settings dhtsett;
116 dhtsett.block_ratelimit = 100000; // disable the DOS blocker
117 dhtsett.ignore_dark_internet = false;
118 dhtsett.upload_rate_limit = 400;
119 float const target_upload_rate = 400;
120 int const num_packets = 2000;
121
122 counters cnt;
123 dht::dht_state state;
124 std::unique_ptr<lt::dht::dht_storage_interface> dht_storage(dht::dht_default_storage_constructor(dhtsett));
125 auto dht = std::make_shared<lt::dht::dht_tracker>(
126 &o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5)
127 , dhtsett, cnt, *dht_storage, std::move(state));
128 dht->new_socket(ls);
129
130 bool stop = false;
131 std::function<void(error_code const&, size_t)> on_read
132 = [&](error_code const& ec, size_t const /* bytes */)
133 {
134 if (ec) return;
135 udp_socket::packet p;
136 error_code err;
137 int const num = int(sock.read(lt::span<udp_socket::packet>(&p, 1), err));
138 if (num) dht->incoming_packet(ls, p.from, p.data);
139 if (stop || err) return;
140 sock.async_read(on_read);
141 };
142 sock.async_read(on_read);
143
144 // sender
145 int num_packets_sent = 0;
146 asio::io_service sender_ios(sim, address_v4::from_string("10.20.30.40"));
147 udp::socket sender_sock(sender_ios);
148 sender_sock.open(udp::v4());
149 sender_sock.bind(udp::endpoint(address_v4(), 4444));
150 sender_sock.non_blocking(true);
151 asio::high_resolution_timer timer(sender_ios);
152 std::function<void(error_code const&)> sender_tick = [&](error_code const&)
153 {
154 if (num_packets_sent == num_packets)
155 {
156 // we're done. shut down (a second from now, to let the dust settle)
157 timer.expires_from_now(chrono::seconds(1));
158 timer.async_wait([&](error_code const&)
159 {
160 dht->stop();
161 stop = true;
162 sender_sock.close();
163 sock.close();
164 });
165 return;
166 }
167
168 char const packet[] = "d1:ad2:id20:ababababababababababe1:y1:q1:q4:pinge";
169 sender_sock.send_to(asio::const_buffers_1(packet, sizeof(packet)-1)
170 , udp::endpoint(address_v4::from_string("40.30.20.10"), 8888));
171 ++num_packets_sent;
172
173 timer.expires_from_now(chrono::milliseconds(10));
174 timer.async_wait(sender_tick);
175 };
176 timer.expires_from_now(chrono::milliseconds(10));
177 timer.async_wait(sender_tick);
178
179 udp::endpoint from;
180 int num_bytes_received = 0;
181 int num_packets_received = 0;
182 char buffer[1500];
183 std::function<void(error_code const&, std::size_t)> on_receive
184 = [&](error_code const& ec, std::size_t const bytes)
185 {
186 if (ec) return;
187
188 num_bytes_received += int(bytes);
189 ++num_packets_received;
190
191 sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer))
192 , from, on_receive);
193 };
194 sender_sock.async_receive_from(asio::mutable_buffers_1(buffer, sizeof(buffer))
195 , from, on_receive);
196
197 // run simulation
198 lt::clock_type::time_point start = lt::clock_type::now();
199 sim.run();
200 lt::clock_type::time_point end = lt::clock_type::now();
201
202 // subtract one target_upload_rate here, since we initialize the quota to one
203 // full second worth of bandwidth
204 float const average_upload_rate = (num_bytes_received - target_upload_rate)
205 / (duration_cast<chrono::milliseconds>(end - start).count() * 0.001f);
206
207 std::printf("send %d packets. received %d packets (%d bytes). average rate: %f (target: %f)\n"
208 , num_packets_sent, num_packets_received, num_bytes_received
209 , average_upload_rate, target_upload_rate);
210
211 // the actual upload rate should be within 5% of the target
212 TEST_CHECK(std::abs(average_upload_rate - target_upload_rate) < target_upload_rate * 0.05);
213
214 TEST_EQUAL(cnt[counters::dht_messages_in], num_packets);
215
216 // the number of dropped packets + the number of received pings, should equal
217 // exactly the number of packets we sent
218 TEST_EQUAL(cnt[counters::dht_messages_in_dropped]
219 + cnt[counters::dht_ping_in], num_packets);
220
221 #endif // #if !defined TORRENT_DISABLE_DHT
222 }
223
224 // TODO: put test here to take advantage of existing code, refactor
TORRENT_TEST(dht_delete_socket)225 TORRENT_TEST(dht_delete_socket)
226 {
227 #ifndef TORRENT_DISABLE_DHT
228
229 sim::default_config cfg;
230 sim::simulation sim(cfg);
231 sim::asio::io_service dht_ios(sim, lt::address_v4::from_string("40.30.20.10"));
232
233 lt::udp_socket sock(dht_ios, lt::aux::listen_socket_handle{});
234 error_code ec;
235 sock.bind(udp::endpoint(address_v4::from_string("40.30.20.10"), 8888), ec);
236
237 obs o;
238 auto ls = std::make_shared<lt::aux::listen_socket_t>();
239 ls->external_address.cast_vote(address_v4::from_string("40.30.20.10")
240 , lt::aux::session_interface::source_dht, lt::address());
241 ls->local_endpoint = tcp::endpoint(address_v4::from_string("40.30.20.10"), 8888);
242 dht::settings dhtsett;
243 counters cnt;
244 dht::dht_state state;
245 std::unique_ptr<lt::dht::dht_storage_interface> dht_storage(dht::dht_default_storage_constructor(dhtsett));
246 auto dht = std::make_shared<lt::dht::dht_tracker>(
247 &o, dht_ios, std::bind(&send_packet, std::ref(sock), _1, _2, _3, _4, _5)
248 , dhtsett, cnt, *dht_storage, std::move(state));
249
250 dht->start([](std::vector<std::pair<dht::node_entry, std::string>> const&){});
251 dht->new_socket(ls);
252
253 // schedule the removal of the socket at exactly 2 second,
254 // this simulates the fact that the internal scheduled call
255 // to connection_timeout will be executed right after leaving
256 // the state of cancellable
257 asio::high_resolution_timer t1(dht_ios);
258 t1.expires_from_now(chrono::seconds(2));
259 t1.async_wait([&](error_code const&)
260 {
261 dht->delete_socket(ls);
262 });
263
264 // stop the DHT
265 asio::high_resolution_timer t2(dht_ios);
266 t2.expires_from_now(chrono::seconds(3));
267 t2.async_wait([&](error_code const&) { dht->stop(); });
268
269 sim.run();
270
271 #endif // TORRENT_DISABLE_DHT
272 }
273