1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL.  If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so.  If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact:  Jari Sundell <jaris@ifi.uio.no>
33 //
34 //           Skomakerveien 33
35 //           3185 Skoppum, NORWAY
36 
37 #include "config.h"
38 #include "globals.h"
39 
40 #include <algorithm>
41 #include <cstdio>
42 #include <rak/functional.h>
43 
44 #include "torrent/exceptions.h"
45 #include "torrent/connection_manager.h"
46 #include "torrent/download_info.h"
47 #include "torrent/object.h"
48 #include "torrent/object_stream.h"
49 #include "torrent/poll.h"
50 #include "torrent/object_static_map.h"
51 #include "torrent/throttle.h"
52 #include "torrent/utils/log.h"
53 #include "tracker/tracker_dht.h"
54 
55 #include "dht_bucket.h"
56 #include "dht_router.h"
57 #include "dht_transaction.h"
58 
59 #include "manager.h"
60 
61 #define LT_LOG_THIS(log_fmt, ...)                                       \
62   lt_log_print_subsystem(torrent::LOG_DHT_SERVER, "dht_server", log_fmt, __VA_ARGS__);
63 
64 namespace torrent {
65 
66 const char* DhtServer::queries[] = {
67   "ping",
operatorhashstring_ptr_hash68   "find_node",
69   "get_peers",
70   "announce_peer",
71 };
72 
73 // List of all possible keys we need/support in a DHT message.
74 // Unsupported keys we receive are dropped (ignored) while decoding.
75 // See torrent/object_static_map.h for how this works.
76 template <>
77 const DhtMessage::key_list_type DhtMessage::base_type::keys = {
78   { key_a_id,       "a::id*S" },
79   { key_a_infoHash, "a::info_hash*S" },
80   { key_a_port,     "a::port", },
81   { key_a_target,   "a::target*S" },
82   { key_a_token,    "a::token*S" },
83 
84   { key_e_0,        "e[]*" },
operatorhashstring_hash85   { key_e_1,        "e[]*" },
86 
87   { key_q,          "q*S" },
88 
89   { key_r_id,       "r::id*S" },
90   { key_r_nodes,    "r::nodes*S" },
91   { key_r_token,    "r::token*S" },
92   { key_r_values,   "r::values*L" },
93 
94   { key_t,          "t*S" },
95   { key_v,          "v*" },
96   { key_y,          "y*S" },
97 };
98 
99 // Error in DHT protocol, avoids std::string ctor from communication_error
100 class dht_error : public network_error {
101 public:
102   dht_error(int code, const char* message) : m_message(message), m_code(code) {}
operatorhashstring_ptr_equal103 
104   virtual int          code() const throw()   { return m_code; }
105   virtual const char*  what() const throw()   { return m_message; }
106 
107 private:
108   const char*  m_message;
109   int          m_code;
110 };
111 
112 DhtServer::DhtServer(DhtRouter* router) :
113   m_router(router),
114 
accessor_wrapperaccessor_wrapper115   m_uploadNode(60),
116   m_downloadNode(60),
117 
118   m_uploadThrottle(manager->upload_throttle()->throttle_list()),
119   m_downloadThrottle(manager->download_throttle()->throttle_list()),
120 
121   m_networkUp(false) {
122 
123   get_fd().clear();
124   reset_statistics();
125 
126   // Reserve a socket for the DHT server, even though we don't
127   // actually open it until the server is started, which may not
128   // happen until the first non-private torrent is started.
129   manager->connection_manager()->inc_socket_count();
130 }
131 
132 DhtServer::~DhtServer() {
133   stop();
accessor_wrapperaccessor_wrapper134 
135   std::for_each(m_highQueue.begin(), m_highQueue.end(), rak::call_delete<DhtTransactionPacket>());
idaccessor_wrapper136   std::for_each(m_lowQueue.begin(), m_lowQueue.end(), rak::call_delete<DhtTransactionPacket>());
trackeraccessor_wrapper137 
138   manager->connection_manager()->dec_socket_count();
139 }
140 
141 void
142 DhtServer::start(int port) {
143   try {
144     if (!get_fd().open_datagram() || !get_fd().set_nonblock())
145       throw resource_error("Could not allocate datagram socket.");
146 
147     if (!get_fd().set_reuse_address(true))
148       throw resource_error("Could not set listening port to reuse address.");
149 
150     rak::socket_address sa = *m_router->address();
151 
152     if (sa.family() == rak::socket_address::af_unspec)
153       sa.sa_inet6()->clear();
154 
155     sa.set_port(port);
156 
157     LT_LOG_THIS("starting (address:%s)", sa.pretty_address_str().c_str());
158 
159     // Figure out how to bind to both inet and inet6.
160     if (!get_fd().bind(sa))
161       throw resource_error("Could not bind datagram socket.");
162 
163   } catch (torrent::base_error& e) {
164     get_fd().close();
165     get_fd().clear();
166     throw;
167   }
168 
169   m_taskTimeout.slot() = std::bind(&DhtServer::receive_timeout, this);
170 
171   m_uploadNode.set_list_iterator(m_uploadThrottle->end());
172   m_uploadNode.slot_activate() = std::bind(&SocketBase::receive_throttle_up_activate, static_cast<SocketBase*>(this));
173 
174   m_downloadNode.set_list_iterator(m_downloadThrottle->end());
175   m_downloadThrottle->insert(&m_downloadNode);
176 
177   manager->poll()->open(this);
178   manager->poll()->insert_read(this);
179   manager->poll()->insert_error(this);
180 }
181 
182 void
183 DhtServer::stop() {
184   if (!is_active())
185     return;
186 
187   LT_LOG_THIS("stopping", 0);
188 
189   clear_transactions();
190 
191   priority_queue_erase(&taskScheduler, &m_taskTimeout);
192 
add_node(DhtNode * n)193   m_uploadThrottle->erase(&m_uploadNode);
194   m_downloadThrottle->erase(&m_downloadNode);
195 
196   manager->poll()->remove_read(this);
197   manager->poll()->remove_write(this);
198   manager->poll()->remove_error(this);
199   manager->poll()->close(this);
200 
201   get_fd().close();
202   get_fd().clear();
203 
204   m_networkUp = false;
205 }
206 
207 void
208 DhtServer::reset_statistics() {
209   m_queriesReceived = 0;
210   m_queriesSent = 0;
211   m_repliesReceived = 0;
212   m_errorsReceived = 0;
213   m_errorsCaught = 0;
214 
215   m_uploadNode.rate()->set_total(0);
216   m_downloadNode.rate()->set_total(0);
217 }
218 
219 // Ping a node whose ID we know.
220 void
221 DhtServer::ping(const HashString& id, const rak::socket_address* sa) {
222   // No point pinging a node that we're already contacting otherwise.
223   transaction_itr itr = m_transactions.lower_bound(DhtTransaction::key(sa, 0));
224   if (itr == m_transactions.end() || !DhtTransaction::key_match(itr->first, sa))
225     add_transaction(new DhtTransactionPing(id, sa), packet_prio_low);
226 }
227 
228 // Contact nodes in given bucket and ask for their nodes closest to target.
229 void
230 DhtServer::find_node(const DhtBucket& contacts, const HashString& target) {
231   DhtSearch* search = new DhtSearch(target, contacts);
232 
233   DhtSearch::const_accessor n;
234   while ((n = search->get_contact()) != search->end())
235     add_transaction(new DhtTransactionFindNode(n), packet_prio_low);
236 
237   // This shouldn't happen, it means we had no contactable nodes at all.
238   if (!search->start())
239     delete search;
240 }
241 
242 void
243 DhtServer::announce(const DhtBucket& contacts, const HashString& infoHash, TrackerDht* tracker) {
244   DhtAnnounce* announce = new DhtAnnounce(infoHash, tracker, contacts);
245 
246   DhtSearch::const_accessor n;
247   while ((n = announce->get_contact()) != announce->end())
248     add_transaction(new DhtTransactionFindNode(n), packet_prio_high);
249 
250   // This can only happen if all nodes we know are bad.
251   if (!announce->start())
252     delete announce;
253   else
254     announce->update_status();
255 }
256 
257 void
258 DhtServer::cancel_announce(DownloadInfo* info, const TrackerDht* tracker) {
259   transaction_itr itr = m_transactions.begin();
260 
261   while (itr != m_transactions.end()) {
262     if (itr->second->is_search() && itr->second->as_search()->search()->is_announce()) {
263       DhtAnnounce* announce = static_cast<DhtAnnounce*>(itr->second->as_search()->search());
264 
265       if ((info == NULL || announce->target() == info->hash()) && (tracker == NULL || announce->tracker() == tracker)) {
266         drop_packet(itr->second->packet());
267         delete itr->second;
268         m_transactions.erase(itr++);
269         continue;
270       }
271     }
272 
273     ++itr;
274   }
275 }
276 
277 void
278 DhtServer::update() {
279   // Reset this every 15 minutes. It'll get set back to true if we receive
280   // any valid packets. This allows detecting when the entire network goes
281   // down, and prevents all nodes from getting removed as unresponsive.
282   m_networkUp = false;
283 }
284 
285 void
286 DhtServer::process_query(const HashString& id, const rak::socket_address* sa, const DhtMessage& msg) {
287   m_queriesReceived++;
288   m_networkUp = true;
289 
290   raw_string query = msg[key_q].as_raw_string();
291 
292   // Construct reply.
293   DhtMessage reply;
294 
295   if (query == raw_string::from_c_str("find_node"))
296     create_find_node_response(msg, reply);
297 
298   else if (query == raw_string::from_c_str("get_peers"))
299     create_get_peers_response(msg, sa, reply);
300 
301   else if (query == raw_string::from_c_str("announce_peer"))
302     create_announce_peer_response(msg, sa, reply);
303 
304   else if (query != raw_string::from_c_str("ping"))
305     throw dht_error(dht_error_bad_method, "Unknown query type.");
306 
307   m_router->node_queried(id, sa);
308   create_response(msg, sa, reply);
309 }
310 
311 void
312 DhtServer::create_find_node_response(const DhtMessage& req, DhtMessage& reply) {
313   raw_string target = req[key_a_target].as_raw_string();
314 
315   if (target.size() < HashString::size_data)
316     throw dht_error(dht_error_protocol, "target string too short");
317 
318   reply[key_r_nodes] = m_router->get_closest_nodes(*HashString::cast_from(target.data()));
319 
320   if (reply[key_r_nodes].as_raw_string().empty())
321     throw dht_error(dht_error_generic, "No nodes");
322 }
323 
324 void
325 DhtServer::create_get_peers_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
326   reply[key_r_token] = m_router->make_token(sa, reply.data_end);
327   reply.data_end += reply[key_r_token].as_raw_string().size();
328 
329   raw_string info_hash_str = req[key_a_infoHash].as_raw_string();
330 
331   if (info_hash_str.size() < HashString::size_data)
332     throw dht_error(dht_error_protocol, "info hash too short");
333 
334   const HashString* info_hash = HashString::cast_from(info_hash_str.data());
335 
336   DhtTracker* tracker = m_router->get_tracker(*info_hash, false);
337 
338   // If we're not tracking or have no peers, send closest nodes.
339   if (!tracker || tracker->empty()) {
340     raw_string nodes = m_router->get_closest_nodes(*info_hash);
341 
342     if (nodes.empty())
343       throw dht_error(dht_error_generic, "No peers nor nodes");
344 
345     reply[key_r_nodes] = nodes;
346 
347   } else {
348     reply[key_r_values] = tracker->get_peers();
349   }
350 }
351 
352 void
353 DhtServer::create_announce_peer_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
354   raw_string info_hash = req[key_a_infoHash].as_raw_string();
355 
356   if (info_hash.size() < HashString::size_data)
357     throw dht_error(dht_error_protocol, "info hash too short");
358 
359   if (!m_router->token_valid(req[key_a_token].as_raw_string(), sa))
360     throw dht_error(dht_error_protocol, "Token invalid.");
361 
362   DhtTracker* tracker = m_router->get_tracker(*HashString::cast_from(info_hash.data()), true);
363   tracker->add_peer(sa->sa_inet()->address_n(), req[key_a_port].as_value());
364 }
365 
366 void
367 DhtServer::process_response(const HashString& id, const rak::socket_address* sa, const DhtMessage& response) {
368   int transactionId = (unsigned char)response[key_t].as_raw_string().data()[0];
369   transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
370 
371   // Response to a transaction we don't have in our table. At this point it's
372   // impossible to tell whether it used to be a valid transaction but timed out
373   // the node did not return the ID we sent it, or it returned it with a
374   // different address than we sent it o. Best we can do is ignore the reply,
375   // since the protocol doesn't call for returning errors in responses.
376   if (itr == m_transactions.end())
377     return;
378 
379   m_repliesReceived++;
380   m_networkUp = true;
381 
382   // Make sure transaction is erased even if an exception is thrown.
383   try {
384     DhtTransaction* transaction = itr->second;
385 #ifdef USE_EXTRA_DEBUG
386     if (DhtTransaction::key(sa, transactionId) != transaction->key(transactionId))
387       throw internal_error("DhtServer::process_response key mismatch.");
388 #endif
389 
390     // If we contact a node but its ID is not the one we expect, ignore the reply
391     // to prevent interference from rogue nodes.
392     if ((id != transaction->id() && transaction->id() != m_router->zero_id))
393       return;
394 
395     switch (transaction->type()) {
396       case DhtTransaction::DHT_FIND_NODE:
397         parse_find_node_reply(transaction->as_find_node(), response[key_r_nodes].as_raw_string());
398         break;
399 
400       case DhtTransaction::DHT_GET_PEERS:
401         parse_get_peers_reply(transaction->as_get_peers(), response);
402         break;
403 
404       // Nothing to do for DHT_PING and DHT_ANNOUNCE_PEER
405       default:
406         break;
407     }
408 
409     // Mark node responsive only if all processing was successful, without errors.
410     m_router->node_replied(id, sa);
411 
412   } catch (std::exception& e) {
413     drop_packet(itr->second->packet());
414     delete itr->second;
415     m_transactions.erase(itr);
416 
417     m_errorsCaught++;
418     throw;
419   }
420 
421   drop_packet(itr->second->packet());
422   delete itr->second;
423   m_transactions.erase(itr);
424 }
425 
426 void
427 DhtServer::process_error(const rak::socket_address* sa, const DhtMessage& error) {
428   int transactionId = (unsigned char)error[key_t].as_raw_string().data()[0];
429   transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId));
430 
431   if (itr == m_transactions.end())
432     return;
433 
434   m_repliesReceived++;
435   m_errorsReceived++;
436   m_networkUp = true;
437 
438   // Don't mark node as good (because it replied) or bad (because it returned an error).
439   // If it consistently returns errors for valid queries it's probably broken.  But a
440   // few error messages are acceptable. So we do nothing and pretend the query never happened.
441 
442   drop_packet(itr->second->packet());
443   delete itr->second;
444   m_transactions.erase(itr);
445 }
446 
447 void
448 DhtServer::parse_find_node_reply(DhtTransactionSearch* transaction, raw_string nodes) {
449   transaction->complete(true);
450 
451   if (sizeof(const compact_node_info) != 26)
452     throw internal_error("DhtServer::parse_find_node_reply(...) bad struct size.");
453 
454   node_info_list list;
455   std::copy(reinterpret_cast<const compact_node_info*>(nodes.data()),
456             reinterpret_cast<const compact_node_info*>(nodes.data() + nodes.size() - nodes.size() % sizeof(compact_node_info)),
457             std::back_inserter(list));
458 
459   for (node_info_list::iterator itr = list.begin(); itr != list.end(); ++itr) {
460     if (itr->id() != m_router->id()) {
461       rak::socket_address sa = itr->address();
462       transaction->search()->add_contact(itr->id(), &sa);
463     }
464   }
465 
466   find_node_next(transaction);
467 }
468 
469 void
470 DhtServer::parse_get_peers_reply(DhtTransactionGetPeers* transaction, const DhtMessage& response) {
471   DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->as_search()->search());
472 
473   transaction->complete(true);
474 
475   if (response[key_r_values].is_raw_list())
476     announce->receive_peers(response[key_r_values].as_raw_list());
477 
478   if (response[key_r_token].is_raw_string())
479     add_transaction(new DhtTransactionAnnouncePeer(transaction->id(),
480                                                    transaction->address(),
481                                                    announce->target(),
482                                                    response[key_r_token].as_raw_string()),
483                     packet_prio_low);
484 
485   announce->update_status();
486 }
487 
488 void
489 DhtServer::find_node_next(DhtTransactionSearch* transaction) {
490   int priority = packet_prio_low;
491   if (transaction->search()->is_announce())
492     priority = packet_prio_high;
493 
494   DhtSearch::const_accessor node;
495   while ((node = transaction->search()->get_contact()) != transaction->search()->end())
496     add_transaction(new DhtTransactionFindNode(node), priority);
497 
498   if (!transaction->search()->is_announce())
499     return;
500 
501   DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->search());
502   if (announce->complete()) {
503     // We have found the 8 closest nodes to the info hash. Retrieve peers
504     // from them and announce to them.
505     for (node = announce->start_announce(); node != announce->end(); ++node)
506       add_transaction(new DhtTransactionGetPeers(node), packet_prio_high);
507   }
508 
509   announce->update_status();
510 }
511 
512 void
513 DhtServer::add_packet(DhtTransactionPacket* packet, int priority) {
514   switch (priority) {
515     // High priority packets are for important queries, and quite small.
516     // They're added to front of high priority queue and thus will be the
517     // next packets sent.
518     case packet_prio_high:
519       m_highQueue.push_front(packet);
520       break;
521 
522     // Low priority query packets are added to the back of the high priority
523     // queue and will be sent when all high priority packets have been transmitted.
524     case packet_prio_low:
525       m_highQueue.push_back(packet);
526       break;
527 
528     // Reply packets will be processed after all of our own packets have been send.
529     case packet_prio_reply:
530       m_lowQueue.push_back(packet);
531       break;
532 
533     default:
534       throw internal_error("DhtServer::add_packet called with invalid priority.");
535   }
536 }
537 
538 void
539 DhtServer::drop_packet(DhtTransactionPacket* packet) {
540     m_highQueue.erase(std::remove(m_highQueue.begin(), m_highQueue.end(), packet), m_highQueue.end());
541     m_lowQueue.erase(std::remove(m_lowQueue.begin(), m_lowQueue.end(), packet), m_lowQueue.end());
542 }
543 
544 void
545 DhtServer::create_query(transaction_itr itr, int tID, const rak::socket_address* sa, int priority) {
546   if (itr->second->id() == m_router->id())
547     throw internal_error("DhtServer::create_query trying to send to itself.");
548 
549   DhtMessage query;
550 
551   // Transaction ID is a bencode string.
552   query[key_t] = raw_bencode(query.data_end, 3);
553   *query.data_end++ = '1';
554   *query.data_end++ = ':';
555   *query.data_end++ = tID;
556 
557   DhtTransaction* transaction = itr->second;
558   query[key_q] = raw_string::from_c_str(queries[transaction->type()]);
559   query[key_y] = raw_bencode::from_c_str("1:q");
560   query[key_v] = raw_bencode("4:" PEER_VERSION, 6);
561   query[key_a_id] = m_router->id_raw_string();
562 
563   switch (transaction->type()) {
564     case DhtTransaction::DHT_PING:
565       // nothing to do
566       break;
567 
568     case DhtTransaction::DHT_FIND_NODE:
569       query[key_a_target] = transaction->as_find_node()->search()->target_raw_string();
570       break;
571 
572     case DhtTransaction::DHT_GET_PEERS:
573       query[key_a_infoHash] = transaction->as_get_peers()->search()->target_raw_string();
574       break;
575 
576     case DhtTransaction::DHT_ANNOUNCE_PEER:
577       query[key_a_infoHash] = transaction->as_announce_peer()->info_hash_raw_string();
578       query[key_a_token] = transaction->as_announce_peer()->token();
579       query[key_a_port] = manager->connection_manager()->listen_port();
580       break;
581   }
582 
583   DhtTransactionPacket* packet = new DhtTransactionPacket(transaction->address(), query, tID, transaction);
584   transaction->set_packet(packet);
585   add_packet(packet, priority);
586 
587   m_queriesSent++;
588 }
589 
590 void
591 DhtServer::create_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) {
592   reply[key_r_id] = m_router->id_raw_string();
593   reply[key_t] = req[key_t];
594   reply[key_y] = raw_bencode::from_c_str("1:r");
595   reply[key_v] = raw_bencode("4:" PEER_VERSION, 6);
596 
597   add_packet(new DhtTransactionPacket(sa, reply), packet_prio_reply);
598 }
599 
600 void
601 DhtServer::create_error(const DhtMessage& req, const rak::socket_address* sa, int num, const char* msg) {
602   DhtMessage error;
603 
604   if (req[key_t].is_raw_string() && req[key_t].as_raw_string().size() < 67)
605     error[key_t] = req[key_t];
606 
607   error[key_y] = raw_bencode::from_c_str("1:e");
608   error[key_v] = raw_bencode("4:" PEER_VERSION, 6);
609   error[key_e_0] = num;
610   error[key_e_1] = raw_string::from_c_str(msg);
611 
612   add_packet(new DhtTransactionPacket(sa, error), packet_prio_reply);
613 }
614 
615 int
616 DhtServer::add_transaction(DhtTransaction* transaction, int priority) {
617   // Try random transaction ID. This is to make it less likely that we reuse
618   // a transaction ID from an earlier transaction which timed out and we forgot
619   // about it, so that if the node replies after the timeout it's less likely
620   // that we match the reply to the wrong transaction.
621   //
622   // If there's an existing transaction with the random ID we search for the next
623   // unused one. Since normally only one or two transactions will be active per
624   // node, a collision is extremely unlikely, and a linear search for the first
625   // open one is the most efficient.
626   unsigned int rnd = (uint8_t)random();
627   unsigned int id = rnd;
628 
629   transaction_itr insertItr = m_transactions.lower_bound(transaction->key(rnd));
630 
631   // If key matches, keep trying successive IDs.
632   while (insertItr != m_transactions.end() && insertItr->first == transaction->key(id)) {
633     ++insertItr;
634     id = (uint8_t)(id + 1);
635 
636     // Give up after trying all possible IDs. This should never happen.
637     if (id == rnd) {
638       delete transaction;
639       return -1;
640     }
641 
642     // Transaction ID wrapped around, reset iterator.
643     if (id == 0)
644       insertItr = m_transactions.lower_bound(transaction->key(id));
645   }
646 
647   // We know where to insert it, so pass that as hint.
648   insertItr = m_transactions.insert(insertItr, std::make_pair(transaction->key(id), transaction));
649 
650   create_query(insertItr, id, transaction->address(), priority);
651 
652   start_write();
653 
654   return id;
655 }
656 
657 // Transaction received no reply and timed out. Mark node as bad and remove
658 // transaction (except if it was only the quick timeout).
659 DhtServer::transaction_itr
660 DhtServer::failed_transaction(transaction_itr itr, bool quick) {
661   DhtTransaction* transaction = itr->second;
662 
663   // If it was a known node, remember that it didn't reply, unless the transaction
664   // is only stalled (had quick timeout, but not full timeout).  Also if the
665   // transaction still has an associated packet, the packet never got sent due to
666   // throttling, so don't blame the remote node for not replying.
667   // Finally, if we haven't received anything whatsoever so far, assume the entire
668   // network is down and so we can't blame the node either.
669   if (!quick && m_networkUp && transaction->packet() == NULL && transaction->id() != m_router->zero_id)
670     m_router->node_inactive(transaction->id(), transaction->address());
671 
672   if (transaction->type() == DhtTransaction::DHT_FIND_NODE) {
673     if (quick)
674       transaction->as_find_node()->set_stalled();
675     else
676       transaction->as_find_node()->complete(false);
677 
678     try {
679       find_node_next(transaction->as_find_node());
680 
681     } catch (std::exception& e) {
682       if (!quick) {
683         drop_packet(transaction->packet());
684         delete itr->second;
685         m_transactions.erase(itr);
686       }
687 
688       throw;
689     }
690   }
691 
692   if (quick) {
693     return ++itr;         // don't actually delete the transaction until the final timeout
694 
695   } else {
696     drop_packet(transaction->packet());
697     delete itr->second;
698     m_transactions.erase(itr++);
699     return itr;
700   }
701 }
702 
703 void
704 DhtServer::clear_transactions() {
705   for (transaction_map::iterator itr = m_transactions.begin(), last = m_transactions.end(); itr != last; itr++) {
706     drop_packet(itr->second->packet());
707     delete itr->second;
708   }
709 
710   m_transactions.clear();
711 }
712 
713 void
714 DhtServer::event_read() {
715   uint32_t total = 0;
716 
717   while (true) {
718     Object request;
719     rak::socket_address sa;
720     int type = '?';
721     DhtMessage message;
722     const HashString* nodeId = NULL;
723 
724     try {
725       char buffer[2048];
726       int32_t read = read_datagram(buffer, sizeof(buffer), &sa);
727 
728       if (read < 0)
729         break;
730 
731       // We can currently only process mapped-IPv4 addresses, not real IPv6.
732       // Translate them to an af_inet socket_address.
733       if (sa.family() == rak::socket_address::af_inet6)
734         sa = sa.sa_inet6()->normalize_address();
735 
736       if (sa.family() != rak::socket_address::af_inet)
737         continue;
738 
739       total += read;
740 
741       // If it's not a valid bencode dictionary at all, it's probably not a DHT
742       // packet at all, so we don't throw an error to prevent bounce loops.
743       try {
744         static_map_read_bencode(buffer, buffer + read, message);
745       } catch (bencode_error& e) {
746         continue;
747       }
748 
749       if (!message[key_t].is_raw_string())
750         throw dht_error(dht_error_protocol, "No transaction ID");
751 
752       // Restrict the length of Transaction IDs. We echo them in our replies.
753       if(message[key_t].as_raw_string().size() > 20) {
754 		  throw dht_error(dht_error_protocol, "Transaction ID length too long");
755       }
756 
757       if (!message[key_y].is_raw_string())
758         throw dht_error(dht_error_protocol, "No message type");
759 
760       if (message[key_y].as_raw_string().size() != 1)
761         throw dht_error(dht_error_bad_method, "Unsupported message type");
762 
763       type = message[key_y].as_raw_string().data()[0];
764 
765       // Queries and replies have node ID in different dictionaries.
766       if (type == 'r' || type == 'q') {
767         if (!message[type == 'q' ? key_a_id : key_r_id].is_raw_string())
768           throw dht_error(dht_error_protocol, "Invalid `id' value");
769 
770         raw_string nodeIdStr = message[type == 'q' ? key_a_id : key_r_id].as_raw_string();
771 
772         if (nodeIdStr.size() < HashString::size_data)
773           throw dht_error(dht_error_protocol, "`id' value too short");
774 
775         nodeId = HashString::cast_from(nodeIdStr.data());
776       }
777 
778       // Sanity check the returned transaction ID.
779       if ((type == 'r' || type == 'e') &&
780           (!message[key_t].is_raw_string() || message[key_t].as_raw_string().size() != 1))
781         throw dht_error(dht_error_protocol, "Invalid transaction ID type/length.");
782 
783       // Stupid broken implementations.
784       if (nodeId != NULL && *nodeId == m_router->id())
785         throw dht_error(dht_error_protocol, "Send your own ID, not mine");
786 
787       switch (type) {
788         case 'q':
789           process_query(*nodeId, &sa, message);
790           break;
791 
792         case 'r':
793           process_response(*nodeId, &sa, message);
794           break;
795 
796         case 'e':
797           process_error(&sa, message);
798           break;
799 
800         default:
801           throw dht_error(dht_error_bad_method, "Unknown message type.");
802       }
803 
804     // If node was querying us, reply with error packet, otherwise mark the node as "query failed",
805     // so that if it repeatedly sends malformed replies we will drop it instead of propagating it
806     // to other nodes.
807     } catch (bencode_error& e) {
808       if ((type == 'r' || type == 'e') && nodeId != NULL) {
809         m_router->node_inactive(*nodeId, &sa);
810       } else {
811         snprintf(message.data_end, message.data + message.data_size - message.data_end - 1, "Malformed packet: %s", e.what());
812         message.data[message.data_size - 1] = '\0';
813         create_error(message, &sa, dht_error_protocol, message.data_end);
814       }
815 
816     } catch (dht_error& e) {
817       if ((type == 'r' || type == 'e') && nodeId != NULL)
818         m_router->node_inactive(*nodeId, &sa);
819       else
820         create_error(message, &sa, e.code(), e.what());
821 
822     } catch (network_error& e) {
823 
824     }
825   }
826 
827   m_downloadThrottle->node_used_unthrottled(total);
828   m_downloadNode.rate()->insert(total);
829 
830   start_write();
831 }
832 
833 bool
834 DhtServer::process_queue(packet_queue& queue, uint32_t* quota) {
835   uint32_t used = 0;
836 
837   while (!queue.empty()) {
838     DhtTransactionPacket* packet = queue.front();
839     DhtTransaction::key_type transactionKey = 0;
840     if(packet->has_transaction())
841       transactionKey = packet->transaction()->key(packet->id());
842 
843     // Make sure its transaction hasn't timed out yet, if it has/had one
844     // and don't bother sending non-transaction packets (replies) after
845     // more than 15 seconds in the queue.
846     if (packet->has_failed() || packet->age() > 15) {
847       delete packet;
848       queue.pop_front();
849       continue;
850     }
851 
852     if (packet->length() > *quota) {
853       m_uploadThrottle->node_used(&m_uploadNode, used);
854       return false;
855     }
856 
857     queue.pop_front();
858 
859     try {
860       int written = write_datagram(packet->c_str(), packet->length(), packet->address());
861 
862       if (written == -1)
863         throw network_error();
864 
865       used += written;
866       *quota -= written;
867 
868       if ((unsigned int)written != packet->length())
869         throw network_error();
870 
871     } catch (network_error& e) {
872       // Couldn't write packet, maybe something wrong with node address or routing, so mark node as bad.
873       if (packet->has_transaction()) {
874         transaction_itr itr = m_transactions.find(transactionKey);
875         if (itr == m_transactions.end())
876           throw internal_error("DhtServer::process_queue could not find transaction.");
877 
878         failed_transaction(itr, false);
879       }
880     }
881 
882     if (packet->has_transaction()) {
883       // here transaction can be already deleted by failed_transaction.
884       transaction_itr itr = m_transactions.find(transactionKey);
885       if (itr != m_transactions.end())
886         packet->transaction()->set_packet(NULL);
887     }
888 
889     delete packet;
890   }
891 
892   m_uploadThrottle->node_used(&m_uploadNode, used);
893   return true;
894 }
895 
896 void
897 DhtServer::event_write() {
898   if (m_highQueue.empty() && m_lowQueue.empty())
899     throw internal_error("DhtServer::event_write called but both write queues are empty.");
900 
901   if (!m_uploadThrottle->is_throttled(&m_uploadNode))
902     throw internal_error("DhtServer::event_write called while not in throttle list.");
903 
904   uint32_t quota = m_uploadThrottle->node_quota(&m_uploadNode);
905 
906   if (quota == 0 || !process_queue(m_highQueue, &quota) || !process_queue(m_lowQueue, &quota)) {
907     manager->poll()->remove_write(this);
908     m_uploadThrottle->node_deactivate(&m_uploadNode);
909 
910   } else if (m_highQueue.empty() && m_lowQueue.empty()) {
911     manager->poll()->remove_write(this);
912     m_uploadThrottle->erase(&m_uploadNode);
913   }
914 }
915 
916 void
917 DhtServer::event_error() {
918 }
919 
920 void
921 DhtServer::start_write() {
922   if ((!m_highQueue.empty() || !m_lowQueue.empty()) && !m_uploadThrottle->is_throttled(&m_uploadNode)) {
923     m_uploadThrottle->insert(&m_uploadNode);
924     manager->poll()->insert_write(this);
925   }
926 
927   if (!m_taskTimeout.is_queued() && !m_transactions.empty())
928     priority_queue_insert(&taskScheduler, &m_taskTimeout, (cachedTime + rak::timer::from_seconds(5)).round_seconds());
929 }
930 
931 void
932 DhtServer::receive_timeout() {
933   transaction_itr itr = m_transactions.begin();
934   while (itr != m_transactions.end()) {
935     if (itr->second->has_quick_timeout() && itr->second->quick_timeout() < cachedTime.seconds()) {
936       itr = failed_transaction(itr, true);
937 
938     } else if (itr->second->timeout() < cachedTime.seconds()) {
939       itr = failed_transaction(itr, false);
940 
941     } else {
942       ++itr;
943     }
944   }
945 
946   start_write();
947 }
948 
949 }
950