1 // rTorrent - BitTorrent client
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL. If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so. If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact: Jari Sundell <jaris@ifi.uio.no>
33 //
34 // Skomakerveien 33
35 // 3185 Skoppum, NORWAY
36
37 #include "config.h"
38
39 #include <fstream>
40 #include <sstream>
41 #include <torrent/object.h>
42 #include <torrent/dht_manager.h>
43 #include <torrent/object_stream.h>
44 #include <torrent/rate.h>
45 #include <torrent/utils/log.h>
46
47 #include "rpc/parse_commands.h"
48
49 #include "globals.h"
50
51 #include "control.h"
52 #include "dht_manager.h"
53 #include "download.h"
54 #include "download_store.h"
55 #include "manager.h"
56
57 #define LT_LOG_THIS(log_fmt, ...) \
58 lt_log_print_subsystem(torrent::LOG_DHT_MANAGER, "dht_manager", log_fmt, __VA_ARGS__);
59
60 namespace core {
61
62 const char* DhtManager::dht_settings[dht_settings_num] = { "disable", "off", "auto", "on" };
63
~DhtManager()64 DhtManager::~DhtManager() {
65 priority_queue_erase(&taskScheduler, &m_updateTimeout);
66 priority_queue_erase(&taskScheduler, &m_stopTimeout);
67 }
68
69 void
load_dht_cache()70 DhtManager::load_dht_cache() {
71 if (m_start == dht_disable || !control->core()->download_store()->is_enabled()) {
72 LT_LOG_THIS("ignoring cache file", 0);
73 return;
74 }
75
76 std::string cache_filename = control->core()->download_store()->path() + "rtorrent.dht_cache";
77 std::fstream cache_stream(cache_filename.c_str(), std::ios::in | std::ios::binary);
78
79 torrent::Object cache = torrent::Object::create_map();
80
81 if (cache_stream.is_open()) {
82 cache_stream >> cache;
83
84 // If the cache file is corrupted we will just discard it with an
85 // error message.
86 if (cache_stream.fail()) {
87 LT_LOG_THIS("cache file corrupted, discarding (path:%s)", cache_filename.c_str());
88 cache = torrent::Object::create_map();
89 } else {
90 LT_LOG_THIS("cache file read (path:%s)", cache_filename.c_str());
91 }
92
93 } else {
94 LT_LOG_THIS("could not open cache file (path:%s)", cache_filename.c_str());
95 }
96
97 torrent::dht_manager()->initialize(cache);
98
99 if (m_start == dht_on)
100 start_dht();
101 }
102
103 void
start_dht()104 DhtManager::start_dht() {
105 priority_queue_erase(&taskScheduler, &m_stopTimeout);
106
107 if (!torrent::dht_manager()->is_valid()) {
108 LT_LOG_THIS("server start skipped, manager is uninitialized", 0);
109 return;
110 }
111
112 if (torrent::dht_manager()->is_active()) {
113 LT_LOG_THIS("server start skipped, already active", 0);
114 return;
115 }
116
117 torrent::ThrottlePair throttles = control->core()->get_throttle(m_throttleName);
118 torrent::dht_manager()->set_upload_throttle(throttles.first);
119 torrent::dht_manager()->set_download_throttle(throttles.second);
120
121 int port = rpc::call_command_value("dht.port");
122
123 if (port <= 0)
124 return;
125
126 if (!torrent::dht_manager()->start(port)) {
127 m_start = dht_off;
128 return;
129 }
130
131 torrent::dht_manager()->reset_statistics();
132
133 m_updateTimeout.slot() = std::bind(&DhtManager::update, this);
134 priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(60)).round_seconds());
135
136 m_dhtPrevCycle = 0;
137 m_dhtPrevQueriesSent = 0;
138 m_dhtPrevRepliesReceived = 0;
139 m_dhtPrevQueriesReceived = 0;
140 m_dhtPrevBytesUp = 0;
141 m_dhtPrevBytesDown = 0;
142 }
143
144 void
stop_dht()145 DhtManager::stop_dht() {
146 priority_queue_erase(&taskScheduler, &m_updateTimeout);
147 priority_queue_erase(&taskScheduler, &m_stopTimeout);
148
149 if (torrent::dht_manager()->is_active()) {
150 LT_LOG_THIS("stopping server", 0);
151
152 log_statistics(true);
153 torrent::dht_manager()->stop();
154 }
155 }
156
157 void
save_dht_cache()158 DhtManager::save_dht_cache() {
159 if (!control->core()->download_store()->is_enabled() || !torrent::dht_manager()->is_valid())
160 return;
161
162 std::string filename = control->core()->download_store()->path() + "rtorrent.dht_cache";
163 std::string filename_tmp = filename + ".new";
164 std::fstream cache_file(filename_tmp.c_str(), std::ios::out | std::ios::trunc);
165
166 if (!cache_file.is_open())
167 return;
168
169 torrent::Object cache = torrent::Object::create_map();
170 cache_file << *torrent::dht_manager()->store_cache(&cache);
171
172 if (!cache_file.good())
173 return;
174
175 cache_file.close();
176
177 ::rename(filename_tmp.c_str(), filename.c_str());
178 }
179
180 void
set_mode(const std::string & arg)181 DhtManager::set_mode(const std::string& arg) {
182 int i;
183 for (i = 0; i < dht_settings_num; i++) {
184 if (arg == dht_settings[i]) {
185 m_start = i;
186 break;
187 }
188 }
189
190 if (i == dht_settings_num)
191 throw torrent::input_error("Invalid argument.");
192
193 if (m_start == dht_off)
194 stop_dht();
195 else if (m_start == dht_on)
196 start_dht();
197 }
198
199 void
update()200 DhtManager::update() {
201 if (!torrent::dht_manager()->is_active())
202 throw torrent::internal_error("DhtManager::update called with DHT inactive.");
203
204 if (m_start == dht_auto && !m_stopTimeout.is_queued()) {
205 DownloadList::const_iterator itr, end;
206
207 for (itr = control->core()->download_list()->begin(), end = control->core()->download_list()->end(); itr != end; ++itr)
208 if ((*itr)->download()->info()->is_active() && !(*itr)->download()->info()->is_private())
209 break;
210
211 if (itr == end) {
212 m_stopTimeout.slot() = std::bind(&DhtManager::stop_dht, this);
213 priority_queue_insert(&taskScheduler, &m_stopTimeout, (cachedTime + rak::timer::from_seconds(15 * 60)).round_seconds());
214 }
215 }
216
217 // While bootstrapping (log_statistics returns true), check every minute if it completed, otherwise update every 15 minutes.
218 if (log_statistics(false))
219 priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(60)).round_seconds());
220 else
221 priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(15 * 60)).round_seconds());
222 }
223
224 bool
log_statistics(bool force)225 DhtManager::log_statistics(bool force) {
226 torrent::DhtManager::statistics_type stats = torrent::dht_manager()->get_statistics();
227
228 // Check for firewall problems.
229
230 if (stats.cycle > 2 && stats.queries_sent - m_dhtPrevQueriesSent > 100 && stats.queries_received == m_dhtPrevQueriesReceived) {
231 // We should have had clients ping us at least but have received
232 // nothing, that means the UDP port is probably unreachable.
233 if (torrent::dht_manager()->can_receive_queries())
234 LT_LOG_THIS("listening port appears to be unreachable, no queries received", 0);
235
236 torrent::dht_manager()->set_can_receive(false);
237 }
238
239 if (stats.queries_sent - m_dhtPrevQueriesSent > stats.num_nodes * 2 + 20 && stats.replies_received == m_dhtPrevRepliesReceived) {
240 // No replies to over 20 queries plus two per node we have. Probably firewalled.
241 if (!m_warned)
242 LT_LOG_THIS("listening port appears to be firewalled, no replies received", 0);
243
244 m_warned = true;
245 return false;
246 }
247
248 m_warned = false;
249
250 if (stats.queries_received > m_dhtPrevQueriesReceived)
251 torrent::dht_manager()->set_can_receive(true);
252
253 // Nothing to log while bootstrapping, but check again every minute.
254 if (stats.cycle <= 1) {
255 m_dhtPrevCycle = stats.cycle;
256 return true;
257 }
258
259 // If bootstrap completed between now and the previous check, notify user.
260 if (m_dhtPrevCycle == 1) {
261 char buffer[128];
262 snprintf(buffer, sizeof(buffer), "DHT bootstrap complete, have %d nodes in %d buckets.", stats.num_nodes, stats.num_buckets);
263 control->core()->push_log_complete(buffer);
264 m_dhtPrevCycle = stats.cycle;
265 return false;
266 };
267
268 // Standard DHT statistics on first real cycle, and every 8th cycle
269 // afterwards (i.e. every 2 hours), or when forced.
270 if ((force && stats.cycle != m_dhtPrevCycle) || stats.cycle == 3 || stats.cycle > m_dhtPrevCycle + 7) {
271 char buffer[256];
272 snprintf(buffer, sizeof(buffer),
273 "DHT statistics: %d queries in, %d queries out, %d replies received, %lld bytes read, %lld bytes sent, "
274 "%d known nodes in %d buckets, %d peers (highest: %d) tracked in %d torrents.",
275 stats.queries_received - m_dhtPrevQueriesReceived,
276 stats.queries_sent - m_dhtPrevQueriesSent,
277 stats.replies_received - m_dhtPrevRepliesReceived,
278 (long long unsigned int)(stats.down_rate.total() - m_dhtPrevBytesDown),
279 (long long unsigned int)(stats.up_rate.total() - m_dhtPrevBytesUp),
280 stats.num_nodes,
281 stats.num_buckets,
282 stats.num_peers,
283 stats.max_peers,
284 stats.num_trackers);
285
286 control->core()->push_log_complete(buffer);
287
288 m_dhtPrevCycle = stats.cycle;
289 m_dhtPrevQueriesSent = stats.queries_sent;
290 m_dhtPrevRepliesReceived = stats.replies_received;
291 m_dhtPrevQueriesReceived = stats.queries_received;
292 m_dhtPrevBytesUp = stats.up_rate.total();
293 m_dhtPrevBytesDown = stats.down_rate.total();
294 }
295
296 return false;
297 }
298
299 torrent::Object
dht_statistics()300 DhtManager::dht_statistics() {
301 torrent::Object dhtStats = torrent::Object::create_map();
302
303 dhtStats.insert_key("dht", dht_settings[m_start]);
304 dhtStats.insert_key("active", torrent::dht_manager()->is_active());
305 dhtStats.insert_key("throttle", m_throttleName);
306
307 if (torrent::dht_manager()->is_active()) {
308 torrent::DhtManager::statistics_type stats = torrent::dht_manager()->get_statistics();
309
310 dhtStats.insert_key("cycle", stats.cycle);
311 dhtStats.insert_key("queries_received", stats.queries_received);
312 dhtStats.insert_key("queries_sent", stats.queries_sent);
313 dhtStats.insert_key("replies_received", stats.replies_received);
314 dhtStats.insert_key("errors_received", stats.errors_received);
315 dhtStats.insert_key("errors_caught", stats.errors_caught);
316 dhtStats.insert_key("bytes_read", stats.down_rate.total());
317 dhtStats.insert_key("bytes_written", stats.up_rate.total());
318 dhtStats.insert_key("nodes", stats.num_nodes);
319 dhtStats.insert_key("buckets", stats.num_buckets);
320 dhtStats.insert_key("peers", stats.num_peers);
321 dhtStats.insert_key("peers_max", stats.max_peers);
322 dhtStats.insert_key("torrents", stats.num_trackers);
323 }
324
325 return dhtStats;
326 }
327
328 void
set_throttle_name(const std::string & throttleName)329 DhtManager::set_throttle_name(const std::string& throttleName) {
330 if (torrent::dht_manager()->is_active())
331 throw torrent::input_error("Cannot set DHT throttle while active.");
332
333 m_throttleName = throttleName;
334 }
335
336 }
337