1 // rTorrent - BitTorrent client
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL.  If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so.  If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact:  Jari Sundell <jaris@ifi.uio.no>
33 //
34 //           Skomakerveien 33
35 //           3185 Skoppum, NORWAY
36 
37 #include "config.h"
38 
39 #include <fstream>
40 #include <sstream>
41 #include <torrent/object.h>
42 #include <torrent/dht_manager.h>
43 #include <torrent/object_stream.h>
44 #include <torrent/rate.h>
45 #include <torrent/utils/log.h>
46 
47 #include "rpc/parse_commands.h"
48 
49 #include "globals.h"
50 
51 #include "control.h"
52 #include "dht_manager.h"
53 #include "download.h"
54 #include "download_store.h"
55 #include "manager.h"
56 
57 #define LT_LOG_THIS(log_fmt, ...)                                       \
58   lt_log_print_subsystem(torrent::LOG_DHT_MANAGER, "dht_manager", log_fmt, __VA_ARGS__);
59 
60 namespace core {
61 
62 const char* DhtManager::dht_settings[dht_settings_num] = { "disable", "off", "auto", "on" };
63 
~DhtManager()64 DhtManager::~DhtManager() {
65   priority_queue_erase(&taskScheduler, &m_updateTimeout);
66   priority_queue_erase(&taskScheduler, &m_stopTimeout);
67 }
68 
69 void
load_dht_cache()70 DhtManager::load_dht_cache() {
71   if (m_start == dht_disable || !control->core()->download_store()->is_enabled()) {
72     LT_LOG_THIS("ignoring cache file", 0);
73     return;
74   }
75 
76   std::string cache_filename = control->core()->download_store()->path() + "rtorrent.dht_cache";
77   std::fstream cache_stream(cache_filename.c_str(), std::ios::in | std::ios::binary);
78 
79   torrent::Object cache = torrent::Object::create_map();
80 
81   if (cache_stream.is_open()) {
82     cache_stream >> cache;
83 
84     // If the cache file is corrupted we will just discard it with an
85     // error message.
86     if (cache_stream.fail()) {
87       LT_LOG_THIS("cache file corrupted, discarding (path:%s)", cache_filename.c_str());
88       cache = torrent::Object::create_map();
89     } else {
90       LT_LOG_THIS("cache file read (path:%s)", cache_filename.c_str());
91     }
92 
93   } else {
94     LT_LOG_THIS("could not open cache file (path:%s)", cache_filename.c_str());
95   }
96 
97   torrent::dht_manager()->initialize(cache);
98 
99   if (m_start == dht_on)
100     start_dht();
101 }
102 
103 void
start_dht()104 DhtManager::start_dht() {
105   priority_queue_erase(&taskScheduler, &m_stopTimeout);
106 
107   if (!torrent::dht_manager()->is_valid()) {
108     LT_LOG_THIS("server start skipped, manager is uninitialized", 0);
109     return;
110   }
111 
112   if (torrent::dht_manager()->is_active()) {
113     LT_LOG_THIS("server start skipped, already active", 0);
114     return;
115   }
116 
117   torrent::ThrottlePair throttles = control->core()->get_throttle(m_throttleName);
118   torrent::dht_manager()->set_upload_throttle(throttles.first);
119   torrent::dht_manager()->set_download_throttle(throttles.second);
120 
121   int port = rpc::call_command_value("dht.port");
122 
123   if (port <= 0)
124     return;
125 
126   if (!torrent::dht_manager()->start(port)) {
127     m_start = dht_off;
128     return;
129   }
130 
131   torrent::dht_manager()->reset_statistics();
132 
133   m_updateTimeout.slot() = std::bind(&DhtManager::update, this);
134   priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(60)).round_seconds());
135 
136   m_dhtPrevCycle = 0;
137   m_dhtPrevQueriesSent = 0;
138   m_dhtPrevRepliesReceived = 0;
139   m_dhtPrevQueriesReceived = 0;
140   m_dhtPrevBytesUp = 0;
141   m_dhtPrevBytesDown = 0;
142 }
143 
144 void
stop_dht()145 DhtManager::stop_dht() {
146   priority_queue_erase(&taskScheduler, &m_updateTimeout);
147   priority_queue_erase(&taskScheduler, &m_stopTimeout);
148 
149   if (torrent::dht_manager()->is_active()) {
150     LT_LOG_THIS("stopping server", 0);
151 
152     log_statistics(true);
153     torrent::dht_manager()->stop();
154   }
155 }
156 
157 void
save_dht_cache()158 DhtManager::save_dht_cache() {
159   if (!control->core()->download_store()->is_enabled() || !torrent::dht_manager()->is_valid())
160     return;
161 
162   std::string filename = control->core()->download_store()->path() + "rtorrent.dht_cache";
163   std::string filename_tmp = filename + ".new";
164   std::fstream cache_file(filename_tmp.c_str(), std::ios::out | std::ios::trunc);
165 
166   if (!cache_file.is_open())
167     return;
168 
169   torrent::Object cache = torrent::Object::create_map();
170   cache_file << *torrent::dht_manager()->store_cache(&cache);
171 
172   if (!cache_file.good())
173     return;
174 
175   cache_file.close();
176 
177   ::rename(filename_tmp.c_str(), filename.c_str());
178 }
179 
180 void
set_mode(const std::string & arg)181 DhtManager::set_mode(const std::string& arg) {
182   int i;
183   for (i = 0; i < dht_settings_num; i++) {
184     if (arg == dht_settings[i]) {
185       m_start = i;
186       break;
187     }
188   }
189 
190   if (i == dht_settings_num)
191     throw torrent::input_error("Invalid argument.");
192 
193   if (m_start == dht_off)
194     stop_dht();
195   else if (m_start == dht_on)
196     start_dht();
197 }
198 
199 void
update()200 DhtManager::update() {
201   if (!torrent::dht_manager()->is_active())
202     throw torrent::internal_error("DhtManager::update called with DHT inactive.");
203 
204   if (m_start == dht_auto && !m_stopTimeout.is_queued()) {
205     DownloadList::const_iterator itr, end;
206 
207     for (itr = control->core()->download_list()->begin(), end = control->core()->download_list()->end(); itr != end; ++itr)
208       if ((*itr)->download()->info()->is_active() && !(*itr)->download()->info()->is_private())
209         break;
210 
211     if (itr == end) {
212       m_stopTimeout.slot() = std::bind(&DhtManager::stop_dht, this);
213       priority_queue_insert(&taskScheduler, &m_stopTimeout, (cachedTime + rak::timer::from_seconds(15 * 60)).round_seconds());
214     }
215   }
216 
217   // While bootstrapping (log_statistics returns true), check every minute if it completed, otherwise update every 15 minutes.
218   if (log_statistics(false))
219     priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(60)).round_seconds());
220   else
221     priority_queue_insert(&taskScheduler, &m_updateTimeout, (cachedTime + rak::timer::from_seconds(15 * 60)).round_seconds());
222 }
223 
224 bool
log_statistics(bool force)225 DhtManager::log_statistics(bool force) {
226   torrent::DhtManager::statistics_type stats = torrent::dht_manager()->get_statistics();
227 
228   // Check for firewall problems.
229 
230   if (stats.cycle > 2 && stats.queries_sent - m_dhtPrevQueriesSent > 100 && stats.queries_received == m_dhtPrevQueriesReceived) {
231     // We should have had clients ping us at least but have received
232     // nothing, that means the UDP port is probably unreachable.
233     if (torrent::dht_manager()->can_receive_queries())
234       LT_LOG_THIS("listening port appears to be unreachable, no queries received", 0);
235 
236     torrent::dht_manager()->set_can_receive(false);
237   }
238 
239   if (stats.queries_sent - m_dhtPrevQueriesSent > stats.num_nodes * 2 + 20 && stats.replies_received == m_dhtPrevRepliesReceived) {
240     // No replies to over 20 queries plus two per node we have. Probably firewalled.
241     if (!m_warned)
242       LT_LOG_THIS("listening port appears to be firewalled, no replies received", 0);
243 
244     m_warned = true;
245     return false;
246   }
247 
248   m_warned = false;
249 
250   if (stats.queries_received > m_dhtPrevQueriesReceived)
251     torrent::dht_manager()->set_can_receive(true);
252 
253   // Nothing to log while bootstrapping, but check again every minute.
254   if (stats.cycle <= 1) {
255     m_dhtPrevCycle = stats.cycle;
256     return true;
257   }
258 
259   // If bootstrap completed between now and the previous check, notify user.
260   if (m_dhtPrevCycle == 1) {
261     char buffer[128];
262     snprintf(buffer, sizeof(buffer), "DHT bootstrap complete, have %d nodes in %d buckets.", stats.num_nodes, stats.num_buckets);
263     control->core()->push_log_complete(buffer);
264     m_dhtPrevCycle = stats.cycle;
265     return false;
266   };
267 
268   // Standard DHT statistics on first real cycle, and every 8th cycle
269   // afterwards (i.e. every 2 hours), or when forced.
270   if ((force && stats.cycle != m_dhtPrevCycle) || stats.cycle == 3 || stats.cycle > m_dhtPrevCycle + 7) {
271     char buffer[256];
272     snprintf(buffer, sizeof(buffer),
273              "DHT statistics: %d queries in, %d queries out, %d replies received, %lld bytes read, %lld bytes sent, "
274              "%d known nodes in %d buckets, %d peers (highest: %d) tracked in %d torrents.",
275              stats.queries_received - m_dhtPrevQueriesReceived,
276              stats.queries_sent - m_dhtPrevQueriesSent,
277              stats.replies_received - m_dhtPrevRepliesReceived,
278              (long long unsigned int)(stats.down_rate.total() - m_dhtPrevBytesDown),
279              (long long unsigned int)(stats.up_rate.total() - m_dhtPrevBytesUp),
280              stats.num_nodes,
281              stats.num_buckets,
282              stats.num_peers,
283              stats.max_peers,
284              stats.num_trackers);
285 
286     control->core()->push_log_complete(buffer);
287 
288     m_dhtPrevCycle = stats.cycle;
289     m_dhtPrevQueriesSent = stats.queries_sent;
290     m_dhtPrevRepliesReceived = stats.replies_received;
291     m_dhtPrevQueriesReceived = stats.queries_received;
292     m_dhtPrevBytesUp = stats.up_rate.total();
293     m_dhtPrevBytesDown = stats.down_rate.total();
294   }
295 
296  return false;
297 }
298 
299 torrent::Object
dht_statistics()300 DhtManager::dht_statistics() {
301   torrent::Object dhtStats = torrent::Object::create_map();
302 
303   dhtStats.insert_key("dht",              dht_settings[m_start]);
304   dhtStats.insert_key("active",           torrent::dht_manager()->is_active());
305   dhtStats.insert_key("throttle",         m_throttleName);
306 
307   if (torrent::dht_manager()->is_active()) {
308     torrent::DhtManager::statistics_type stats = torrent::dht_manager()->get_statistics();
309 
310     dhtStats.insert_key("cycle",            stats.cycle);
311     dhtStats.insert_key("queries_received", stats.queries_received);
312     dhtStats.insert_key("queries_sent",     stats.queries_sent);
313     dhtStats.insert_key("replies_received", stats.replies_received);
314     dhtStats.insert_key("errors_received",  stats.errors_received);
315     dhtStats.insert_key("errors_caught",    stats.errors_caught);
316     dhtStats.insert_key("bytes_read",       stats.down_rate.total());
317     dhtStats.insert_key("bytes_written",    stats.up_rate.total());
318     dhtStats.insert_key("nodes",            stats.num_nodes);
319     dhtStats.insert_key("buckets",          stats.num_buckets);
320     dhtStats.insert_key("peers",            stats.num_peers);
321     dhtStats.insert_key("peers_max",        stats.max_peers);
322     dhtStats.insert_key("torrents",         stats.num_trackers);
323   }
324 
325   return dhtStats;
326 }
327 
328 void
set_throttle_name(const std::string & throttleName)329 DhtManager::set_throttle_name(const std::string& throttleName) {
330   if (torrent::dht_manager()->is_active())
331     throw torrent::input_error("Cannot set DHT throttle while active.");
332 
333   m_throttleName = throttleName;
334 }
335 
336 }
337