1 // libTorrent - BitTorrent library 2 // Copyright (C) 2005-2011, Jari Sundell 3 // 4 // This program is free software; you can redistribute it and/or modify 5 // it under the terms of the GNU General Public License as published by 6 // the Free Software Foundation; either version 2 of the License, or 7 // (at your option) any later version. 8 // 9 // This program is distributed in the hope that it will be useful, 10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 // GNU General Public License for more details. 13 // 14 // You should have received a copy of the GNU General Public License 15 // along with this program; if not, write to the Free Software 16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 17 // 18 // In addition, as a special exception, the copyright holders give 19 // permission to link the code of portions of this program with the 20 // OpenSSL library under certain conditions as described in each 21 // individual source file, and distribute linked combinations 22 // including the two. 23 // 24 // You must obey the GNU General Public License in all respects for 25 // all of the code used other than OpenSSL. If you modify file(s) 26 // with this exception, you may extend this exception to your version 27 // of the file(s), but you are not obligated to do so. If you do not 28 // wish to do so, delete this exception statement from your version. 29 // If you delete this exception statement from all source files in the 30 // program, then also delete it here. 31 // 32 // Contact: Jari Sundell <jaris@ifi.uio.no> 33 // 34 // Skomakerveien 33 35 // 3185 Skoppum, NORWAY 36 37 #include "config.h" 38 #include "globals.h" 39 40 #include <algorithm> 41 #include <cstdio> 42 #include <rak/functional.h> 43 44 #include "torrent/exceptions.h" 45 #include "torrent/connection_manager.h" 46 #include "torrent/download_info.h" 47 #include "torrent/object.h" 48 #include "torrent/object_stream.h" 49 #include "torrent/poll.h" 50 #include "torrent/object_static_map.h" 51 #include "torrent/throttle.h" 52 #include "torrent/utils/log.h" 53 #include "tracker/tracker_dht.h" 54 55 #include "dht_bucket.h" 56 #include "dht_router.h" 57 #include "dht_transaction.h" 58 59 #include "manager.h" 60 61 #define LT_LOG_THIS(log_fmt, ...) \ 62 lt_log_print_subsystem(torrent::LOG_DHT_SERVER, "dht_server", log_fmt, __VA_ARGS__); 63 64 namespace torrent { 65 66 const char* DhtServer::queries[] = { 67 "ping", operatorhashstring_ptr_hash68 "find_node", 69 "get_peers", 70 "announce_peer", 71 }; 72 73 // List of all possible keys we need/support in a DHT message. 74 // Unsupported keys we receive are dropped (ignored) while decoding. 75 // See torrent/object_static_map.h for how this works. 76 template <> 77 const DhtMessage::key_list_type DhtMessage::base_type::keys = { 78 { key_a_id, "a::id*S" }, 79 { key_a_infoHash, "a::info_hash*S" }, 80 { key_a_port, "a::port", }, 81 { key_a_target, "a::target*S" }, 82 { key_a_token, "a::token*S" }, 83 84 { key_e_0, "e[]*" }, operatorhashstring_hash85 { key_e_1, "e[]*" }, 86 87 { key_q, "q*S" }, 88 89 { key_r_id, "r::id*S" }, 90 { key_r_nodes, "r::nodes*S" }, 91 { key_r_token, "r::token*S" }, 92 { key_r_values, "r::values*L" }, 93 94 { key_t, "t*S" }, 95 { key_v, "v*" }, 96 { key_y, "y*S" }, 97 }; 98 99 // Error in DHT protocol, avoids std::string ctor from communication_error 100 class dht_error : public network_error { 101 public: 102 dht_error(int code, const char* message) : m_message(message), m_code(code) {} operatorhashstring_ptr_equal103 104 virtual int code() const throw() { return m_code; } 105 virtual const char* what() const throw() { return m_message; } 106 107 private: 108 const char* m_message; 109 int m_code; 110 }; 111 112 DhtServer::DhtServer(DhtRouter* router) : 113 m_router(router), 114 accessor_wrapperaccessor_wrapper115 m_uploadNode(60), 116 m_downloadNode(60), 117 118 m_uploadThrottle(manager->upload_throttle()->throttle_list()), 119 m_downloadThrottle(manager->download_throttle()->throttle_list()), 120 121 m_networkUp(false) { 122 123 get_fd().clear(); 124 reset_statistics(); 125 126 // Reserve a socket for the DHT server, even though we don't 127 // actually open it until the server is started, which may not 128 // happen until the first non-private torrent is started. 129 manager->connection_manager()->inc_socket_count(); 130 } 131 132 DhtServer::~DhtServer() { 133 stop(); accessor_wrapperaccessor_wrapper134 135 std::for_each(m_highQueue.begin(), m_highQueue.end(), rak::call_delete<DhtTransactionPacket>()); idaccessor_wrapper136 std::for_each(m_lowQueue.begin(), m_lowQueue.end(), rak::call_delete<DhtTransactionPacket>()); trackeraccessor_wrapper137 138 manager->connection_manager()->dec_socket_count(); 139 } 140 141 void 142 DhtServer::start(int port) { 143 try { 144 if (!get_fd().open_datagram() || !get_fd().set_nonblock()) 145 throw resource_error("Could not allocate datagram socket."); 146 147 if (!get_fd().set_reuse_address(true)) 148 throw resource_error("Could not set listening port to reuse address."); 149 150 rak::socket_address sa = *m_router->address(); 151 152 if (sa.family() == rak::socket_address::af_unspec) 153 sa.sa_inet6()->clear(); 154 155 sa.set_port(port); 156 157 LT_LOG_THIS("starting (address:%s)", sa.pretty_address_str().c_str()); 158 159 // Figure out how to bind to both inet and inet6. 160 if (!get_fd().bind(sa)) 161 throw resource_error("Could not bind datagram socket."); 162 163 } catch (torrent::base_error& e) { 164 get_fd().close(); 165 get_fd().clear(); 166 throw; 167 } 168 169 m_taskTimeout.slot() = std::bind(&DhtServer::receive_timeout, this); 170 171 m_uploadNode.set_list_iterator(m_uploadThrottle->end()); 172 m_uploadNode.slot_activate() = std::bind(&SocketBase::receive_throttle_up_activate, static_cast<SocketBase*>(this)); 173 174 m_downloadNode.set_list_iterator(m_downloadThrottle->end()); 175 m_downloadThrottle->insert(&m_downloadNode); 176 177 manager->poll()->open(this); 178 manager->poll()->insert_read(this); 179 manager->poll()->insert_error(this); 180 } 181 182 void 183 DhtServer::stop() { 184 if (!is_active()) 185 return; 186 187 LT_LOG_THIS("stopping", 0); 188 189 clear_transactions(); 190 191 priority_queue_erase(&taskScheduler, &m_taskTimeout); 192 add_node(DhtNode * n)193 m_uploadThrottle->erase(&m_uploadNode); 194 m_downloadThrottle->erase(&m_downloadNode); 195 196 manager->poll()->remove_read(this); 197 manager->poll()->remove_write(this); 198 manager->poll()->remove_error(this); 199 manager->poll()->close(this); 200 201 get_fd().close(); 202 get_fd().clear(); 203 204 m_networkUp = false; 205 } 206 207 void 208 DhtServer::reset_statistics() { 209 m_queriesReceived = 0; 210 m_queriesSent = 0; 211 m_repliesReceived = 0; 212 m_errorsReceived = 0; 213 m_errorsCaught = 0; 214 215 m_uploadNode.rate()->set_total(0); 216 m_downloadNode.rate()->set_total(0); 217 } 218 219 // Ping a node whose ID we know. 220 void 221 DhtServer::ping(const HashString& id, const rak::socket_address* sa) { 222 // No point pinging a node that we're already contacting otherwise. 223 transaction_itr itr = m_transactions.lower_bound(DhtTransaction::key(sa, 0)); 224 if (itr == m_transactions.end() || !DhtTransaction::key_match(itr->first, sa)) 225 add_transaction(new DhtTransactionPing(id, sa), packet_prio_low); 226 } 227 228 // Contact nodes in given bucket and ask for their nodes closest to target. 229 void 230 DhtServer::find_node(const DhtBucket& contacts, const HashString& target) { 231 DhtSearch* search = new DhtSearch(target, contacts); 232 233 DhtSearch::const_accessor n; 234 while ((n = search->get_contact()) != search->end()) 235 add_transaction(new DhtTransactionFindNode(n), packet_prio_low); 236 237 // This shouldn't happen, it means we had no contactable nodes at all. 238 if (!search->start()) 239 delete search; 240 } 241 242 void 243 DhtServer::announce(const DhtBucket& contacts, const HashString& infoHash, TrackerDht* tracker) { 244 DhtAnnounce* announce = new DhtAnnounce(infoHash, tracker, contacts); 245 246 DhtSearch::const_accessor n; 247 while ((n = announce->get_contact()) != announce->end()) 248 add_transaction(new DhtTransactionFindNode(n), packet_prio_high); 249 250 // This can only happen if all nodes we know are bad. 251 if (!announce->start()) 252 delete announce; 253 else 254 announce->update_status(); 255 } 256 257 void 258 DhtServer::cancel_announce(DownloadInfo* info, const TrackerDht* tracker) { 259 transaction_itr itr = m_transactions.begin(); 260 261 while (itr != m_transactions.end()) { 262 if (itr->second->is_search() && itr->second->as_search()->search()->is_announce()) { 263 DhtAnnounce* announce = static_cast<DhtAnnounce*>(itr->second->as_search()->search()); 264 265 if ((info == NULL || announce->target() == info->hash()) && (tracker == NULL || announce->tracker() == tracker)) { 266 drop_packet(itr->second->packet()); 267 delete itr->second; 268 m_transactions.erase(itr++); 269 continue; 270 } 271 } 272 273 ++itr; 274 } 275 } 276 277 void 278 DhtServer::update() { 279 // Reset this every 15 minutes. It'll get set back to true if we receive 280 // any valid packets. This allows detecting when the entire network goes 281 // down, and prevents all nodes from getting removed as unresponsive. 282 m_networkUp = false; 283 } 284 285 void 286 DhtServer::process_query(const HashString& id, const rak::socket_address* sa, const DhtMessage& msg) { 287 m_queriesReceived++; 288 m_networkUp = true; 289 290 raw_string query = msg[key_q].as_raw_string(); 291 292 // Construct reply. 293 DhtMessage reply; 294 295 if (query == raw_string::from_c_str("find_node")) 296 create_find_node_response(msg, reply); 297 298 else if (query == raw_string::from_c_str("get_peers")) 299 create_get_peers_response(msg, sa, reply); 300 301 else if (query == raw_string::from_c_str("announce_peer")) 302 create_announce_peer_response(msg, sa, reply); 303 304 else if (query != raw_string::from_c_str("ping")) 305 throw dht_error(dht_error_bad_method, "Unknown query type."); 306 307 m_router->node_queried(id, sa); 308 create_response(msg, sa, reply); 309 } 310 311 void 312 DhtServer::create_find_node_response(const DhtMessage& req, DhtMessage& reply) { 313 raw_string target = req[key_a_target].as_raw_string(); 314 315 if (target.size() < HashString::size_data) 316 throw dht_error(dht_error_protocol, "target string too short"); 317 318 reply[key_r_nodes] = m_router->get_closest_nodes(*HashString::cast_from(target.data())); 319 320 if (reply[key_r_nodes].as_raw_string().empty()) 321 throw dht_error(dht_error_generic, "No nodes"); 322 } 323 324 void 325 DhtServer::create_get_peers_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) { 326 reply[key_r_token] = m_router->make_token(sa, reply.data_end); 327 reply.data_end += reply[key_r_token].as_raw_string().size(); 328 329 raw_string info_hash_str = req[key_a_infoHash].as_raw_string(); 330 331 if (info_hash_str.size() < HashString::size_data) 332 throw dht_error(dht_error_protocol, "info hash too short"); 333 334 const HashString* info_hash = HashString::cast_from(info_hash_str.data()); 335 336 DhtTracker* tracker = m_router->get_tracker(*info_hash, false); 337 338 // If we're not tracking or have no peers, send closest nodes. 339 if (!tracker || tracker->empty()) { 340 raw_string nodes = m_router->get_closest_nodes(*info_hash); 341 342 if (nodes.empty()) 343 throw dht_error(dht_error_generic, "No peers nor nodes"); 344 345 reply[key_r_nodes] = nodes; 346 347 } else { 348 reply[key_r_values] = tracker->get_peers(); 349 } 350 } 351 352 void 353 DhtServer::create_announce_peer_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) { 354 raw_string info_hash = req[key_a_infoHash].as_raw_string(); 355 356 if (info_hash.size() < HashString::size_data) 357 throw dht_error(dht_error_protocol, "info hash too short"); 358 359 if (!m_router->token_valid(req[key_a_token].as_raw_string(), sa)) 360 throw dht_error(dht_error_protocol, "Token invalid."); 361 362 DhtTracker* tracker = m_router->get_tracker(*HashString::cast_from(info_hash.data()), true); 363 tracker->add_peer(sa->sa_inet()->address_n(), req[key_a_port].as_value()); 364 } 365 366 void 367 DhtServer::process_response(const HashString& id, const rak::socket_address* sa, const DhtMessage& response) { 368 int transactionId = (unsigned char)response[key_t].as_raw_string().data()[0]; 369 transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId)); 370 371 // Response to a transaction we don't have in our table. At this point it's 372 // impossible to tell whether it used to be a valid transaction but timed out 373 // the node did not return the ID we sent it, or it returned it with a 374 // different address than we sent it o. Best we can do is ignore the reply, 375 // since the protocol doesn't call for returning errors in responses. 376 if (itr == m_transactions.end()) 377 return; 378 379 m_repliesReceived++; 380 m_networkUp = true; 381 382 // Make sure transaction is erased even if an exception is thrown. 383 try { 384 DhtTransaction* transaction = itr->second; 385 #ifdef USE_EXTRA_DEBUG 386 if (DhtTransaction::key(sa, transactionId) != transaction->key(transactionId)) 387 throw internal_error("DhtServer::process_response key mismatch."); 388 #endif 389 390 // If we contact a node but its ID is not the one we expect, ignore the reply 391 // to prevent interference from rogue nodes. 392 if ((id != transaction->id() && transaction->id() != m_router->zero_id)) 393 return; 394 395 switch (transaction->type()) { 396 case DhtTransaction::DHT_FIND_NODE: 397 parse_find_node_reply(transaction->as_find_node(), response[key_r_nodes].as_raw_string()); 398 break; 399 400 case DhtTransaction::DHT_GET_PEERS: 401 parse_get_peers_reply(transaction->as_get_peers(), response); 402 break; 403 404 // Nothing to do for DHT_PING and DHT_ANNOUNCE_PEER 405 default: 406 break; 407 } 408 409 // Mark node responsive only if all processing was successful, without errors. 410 m_router->node_replied(id, sa); 411 412 } catch (std::exception& e) { 413 drop_packet(itr->second->packet()); 414 delete itr->second; 415 m_transactions.erase(itr); 416 417 m_errorsCaught++; 418 throw; 419 } 420 421 drop_packet(itr->second->packet()); 422 delete itr->second; 423 m_transactions.erase(itr); 424 } 425 426 void 427 DhtServer::process_error(const rak::socket_address* sa, const DhtMessage& error) { 428 int transactionId = (unsigned char)error[key_t].as_raw_string().data()[0]; 429 transaction_itr itr = m_transactions.find(DhtTransaction::key(sa, transactionId)); 430 431 if (itr == m_transactions.end()) 432 return; 433 434 m_repliesReceived++; 435 m_errorsReceived++; 436 m_networkUp = true; 437 438 // Don't mark node as good (because it replied) or bad (because it returned an error). 439 // If it consistently returns errors for valid queries it's probably broken. But a 440 // few error messages are acceptable. So we do nothing and pretend the query never happened. 441 442 drop_packet(itr->second->packet()); 443 delete itr->second; 444 m_transactions.erase(itr); 445 } 446 447 void 448 DhtServer::parse_find_node_reply(DhtTransactionSearch* transaction, raw_string nodes) { 449 transaction->complete(true); 450 451 if (sizeof(const compact_node_info) != 26) 452 throw internal_error("DhtServer::parse_find_node_reply(...) bad struct size."); 453 454 node_info_list list; 455 std::copy(reinterpret_cast<const compact_node_info*>(nodes.data()), 456 reinterpret_cast<const compact_node_info*>(nodes.data() + nodes.size() - nodes.size() % sizeof(compact_node_info)), 457 std::back_inserter(list)); 458 459 for (node_info_list::iterator itr = list.begin(); itr != list.end(); ++itr) { 460 if (itr->id() != m_router->id()) { 461 rak::socket_address sa = itr->address(); 462 transaction->search()->add_contact(itr->id(), &sa); 463 } 464 } 465 466 find_node_next(transaction); 467 } 468 469 void 470 DhtServer::parse_get_peers_reply(DhtTransactionGetPeers* transaction, const DhtMessage& response) { 471 DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->as_search()->search()); 472 473 transaction->complete(true); 474 475 if (response[key_r_values].is_raw_list()) 476 announce->receive_peers(response[key_r_values].as_raw_list()); 477 478 if (response[key_r_token].is_raw_string()) 479 add_transaction(new DhtTransactionAnnouncePeer(transaction->id(), 480 transaction->address(), 481 announce->target(), 482 response[key_r_token].as_raw_string()), 483 packet_prio_low); 484 485 announce->update_status(); 486 } 487 488 void 489 DhtServer::find_node_next(DhtTransactionSearch* transaction) { 490 int priority = packet_prio_low; 491 if (transaction->search()->is_announce()) 492 priority = packet_prio_high; 493 494 DhtSearch::const_accessor node; 495 while ((node = transaction->search()->get_contact()) != transaction->search()->end()) 496 add_transaction(new DhtTransactionFindNode(node), priority); 497 498 if (!transaction->search()->is_announce()) 499 return; 500 501 DhtAnnounce* announce = static_cast<DhtAnnounce*>(transaction->search()); 502 if (announce->complete()) { 503 // We have found the 8 closest nodes to the info hash. Retrieve peers 504 // from them and announce to them. 505 for (node = announce->start_announce(); node != announce->end(); ++node) 506 add_transaction(new DhtTransactionGetPeers(node), packet_prio_high); 507 } 508 509 announce->update_status(); 510 } 511 512 void 513 DhtServer::add_packet(DhtTransactionPacket* packet, int priority) { 514 switch (priority) { 515 // High priority packets are for important queries, and quite small. 516 // They're added to front of high priority queue and thus will be the 517 // next packets sent. 518 case packet_prio_high: 519 m_highQueue.push_front(packet); 520 break; 521 522 // Low priority query packets are added to the back of the high priority 523 // queue and will be sent when all high priority packets have been transmitted. 524 case packet_prio_low: 525 m_highQueue.push_back(packet); 526 break; 527 528 // Reply packets will be processed after all of our own packets have been send. 529 case packet_prio_reply: 530 m_lowQueue.push_back(packet); 531 break; 532 533 default: 534 throw internal_error("DhtServer::add_packet called with invalid priority."); 535 } 536 } 537 538 void 539 DhtServer::drop_packet(DhtTransactionPacket* packet) { 540 m_highQueue.erase(std::remove(m_highQueue.begin(), m_highQueue.end(), packet), m_highQueue.end()); 541 m_lowQueue.erase(std::remove(m_lowQueue.begin(), m_lowQueue.end(), packet), m_lowQueue.end()); 542 } 543 544 void 545 DhtServer::create_query(transaction_itr itr, int tID, const rak::socket_address* sa, int priority) { 546 if (itr->second->id() == m_router->id()) 547 throw internal_error("DhtServer::create_query trying to send to itself."); 548 549 DhtMessage query; 550 551 // Transaction ID is a bencode string. 552 query[key_t] = raw_bencode(query.data_end, 3); 553 *query.data_end++ = '1'; 554 *query.data_end++ = ':'; 555 *query.data_end++ = tID; 556 557 DhtTransaction* transaction = itr->second; 558 query[key_q] = raw_string::from_c_str(queries[transaction->type()]); 559 query[key_y] = raw_bencode::from_c_str("1:q"); 560 query[key_v] = raw_bencode("4:" PEER_VERSION, 6); 561 query[key_a_id] = m_router->id_raw_string(); 562 563 switch (transaction->type()) { 564 case DhtTransaction::DHT_PING: 565 // nothing to do 566 break; 567 568 case DhtTransaction::DHT_FIND_NODE: 569 query[key_a_target] = transaction->as_find_node()->search()->target_raw_string(); 570 break; 571 572 case DhtTransaction::DHT_GET_PEERS: 573 query[key_a_infoHash] = transaction->as_get_peers()->search()->target_raw_string(); 574 break; 575 576 case DhtTransaction::DHT_ANNOUNCE_PEER: 577 query[key_a_infoHash] = transaction->as_announce_peer()->info_hash_raw_string(); 578 query[key_a_token] = transaction->as_announce_peer()->token(); 579 query[key_a_port] = manager->connection_manager()->listen_port(); 580 break; 581 } 582 583 DhtTransactionPacket* packet = new DhtTransactionPacket(transaction->address(), query, tID, transaction); 584 transaction->set_packet(packet); 585 add_packet(packet, priority); 586 587 m_queriesSent++; 588 } 589 590 void 591 DhtServer::create_response(const DhtMessage& req, const rak::socket_address* sa, DhtMessage& reply) { 592 reply[key_r_id] = m_router->id_raw_string(); 593 reply[key_t] = req[key_t]; 594 reply[key_y] = raw_bencode::from_c_str("1:r"); 595 reply[key_v] = raw_bencode("4:" PEER_VERSION, 6); 596 597 add_packet(new DhtTransactionPacket(sa, reply), packet_prio_reply); 598 } 599 600 void 601 DhtServer::create_error(const DhtMessage& req, const rak::socket_address* sa, int num, const char* msg) { 602 DhtMessage error; 603 604 if (req[key_t].is_raw_string() && req[key_t].as_raw_string().size() < 67) 605 error[key_t] = req[key_t]; 606 607 error[key_y] = raw_bencode::from_c_str("1:e"); 608 error[key_v] = raw_bencode("4:" PEER_VERSION, 6); 609 error[key_e_0] = num; 610 error[key_e_1] = raw_string::from_c_str(msg); 611 612 add_packet(new DhtTransactionPacket(sa, error), packet_prio_reply); 613 } 614 615 int 616 DhtServer::add_transaction(DhtTransaction* transaction, int priority) { 617 // Try random transaction ID. This is to make it less likely that we reuse 618 // a transaction ID from an earlier transaction which timed out and we forgot 619 // about it, so that if the node replies after the timeout it's less likely 620 // that we match the reply to the wrong transaction. 621 // 622 // If there's an existing transaction with the random ID we search for the next 623 // unused one. Since normally only one or two transactions will be active per 624 // node, a collision is extremely unlikely, and a linear search for the first 625 // open one is the most efficient. 626 unsigned int rnd = (uint8_t)random(); 627 unsigned int id = rnd; 628 629 transaction_itr insertItr = m_transactions.lower_bound(transaction->key(rnd)); 630 631 // If key matches, keep trying successive IDs. 632 while (insertItr != m_transactions.end() && insertItr->first == transaction->key(id)) { 633 ++insertItr; 634 id = (uint8_t)(id + 1); 635 636 // Give up after trying all possible IDs. This should never happen. 637 if (id == rnd) { 638 delete transaction; 639 return -1; 640 } 641 642 // Transaction ID wrapped around, reset iterator. 643 if (id == 0) 644 insertItr = m_transactions.lower_bound(transaction->key(id)); 645 } 646 647 // We know where to insert it, so pass that as hint. 648 insertItr = m_transactions.insert(insertItr, std::make_pair(transaction->key(id), transaction)); 649 650 create_query(insertItr, id, transaction->address(), priority); 651 652 start_write(); 653 654 return id; 655 } 656 657 // Transaction received no reply and timed out. Mark node as bad and remove 658 // transaction (except if it was only the quick timeout). 659 DhtServer::transaction_itr 660 DhtServer::failed_transaction(transaction_itr itr, bool quick) { 661 DhtTransaction* transaction = itr->second; 662 663 // If it was a known node, remember that it didn't reply, unless the transaction 664 // is only stalled (had quick timeout, but not full timeout). Also if the 665 // transaction still has an associated packet, the packet never got sent due to 666 // throttling, so don't blame the remote node for not replying. 667 // Finally, if we haven't received anything whatsoever so far, assume the entire 668 // network is down and so we can't blame the node either. 669 if (!quick && m_networkUp && transaction->packet() == NULL && transaction->id() != m_router->zero_id) 670 m_router->node_inactive(transaction->id(), transaction->address()); 671 672 if (transaction->type() == DhtTransaction::DHT_FIND_NODE) { 673 if (quick) 674 transaction->as_find_node()->set_stalled(); 675 else 676 transaction->as_find_node()->complete(false); 677 678 try { 679 find_node_next(transaction->as_find_node()); 680 681 } catch (std::exception& e) { 682 if (!quick) { 683 drop_packet(transaction->packet()); 684 delete itr->second; 685 m_transactions.erase(itr); 686 } 687 688 throw; 689 } 690 } 691 692 if (quick) { 693 return ++itr; // don't actually delete the transaction until the final timeout 694 695 } else { 696 drop_packet(transaction->packet()); 697 delete itr->second; 698 m_transactions.erase(itr++); 699 return itr; 700 } 701 } 702 703 void 704 DhtServer::clear_transactions() { 705 for (transaction_map::iterator itr = m_transactions.begin(), last = m_transactions.end(); itr != last; itr++) { 706 drop_packet(itr->second->packet()); 707 delete itr->second; 708 } 709 710 m_transactions.clear(); 711 } 712 713 void 714 DhtServer::event_read() { 715 uint32_t total = 0; 716 717 while (true) { 718 Object request; 719 rak::socket_address sa; 720 int type = '?'; 721 DhtMessage message; 722 const HashString* nodeId = NULL; 723 724 try { 725 char buffer[2048]; 726 int32_t read = read_datagram(buffer, sizeof(buffer), &sa); 727 728 if (read < 0) 729 break; 730 731 // We can currently only process mapped-IPv4 addresses, not real IPv6. 732 // Translate them to an af_inet socket_address. 733 if (sa.family() == rak::socket_address::af_inet6) 734 sa = sa.sa_inet6()->normalize_address(); 735 736 if (sa.family() != rak::socket_address::af_inet) 737 continue; 738 739 total += read; 740 741 // If it's not a valid bencode dictionary at all, it's probably not a DHT 742 // packet at all, so we don't throw an error to prevent bounce loops. 743 try { 744 static_map_read_bencode(buffer, buffer + read, message); 745 } catch (bencode_error& e) { 746 continue; 747 } 748 749 if (!message[key_t].is_raw_string()) 750 throw dht_error(dht_error_protocol, "No transaction ID"); 751 752 // Restrict the length of Transaction IDs. We echo them in our replies. 753 if(message[key_t].as_raw_string().size() > 20) { 754 throw dht_error(dht_error_protocol, "Transaction ID length too long"); 755 } 756 757 if (!message[key_y].is_raw_string()) 758 throw dht_error(dht_error_protocol, "No message type"); 759 760 if (message[key_y].as_raw_string().size() != 1) 761 throw dht_error(dht_error_bad_method, "Unsupported message type"); 762 763 type = message[key_y].as_raw_string().data()[0]; 764 765 // Queries and replies have node ID in different dictionaries. 766 if (type == 'r' || type == 'q') { 767 if (!message[type == 'q' ? key_a_id : key_r_id].is_raw_string()) 768 throw dht_error(dht_error_protocol, "Invalid `id' value"); 769 770 raw_string nodeIdStr = message[type == 'q' ? key_a_id : key_r_id].as_raw_string(); 771 772 if (nodeIdStr.size() < HashString::size_data) 773 throw dht_error(dht_error_protocol, "`id' value too short"); 774 775 nodeId = HashString::cast_from(nodeIdStr.data()); 776 } 777 778 // Sanity check the returned transaction ID. 779 if ((type == 'r' || type == 'e') && 780 (!message[key_t].is_raw_string() || message[key_t].as_raw_string().size() != 1)) 781 throw dht_error(dht_error_protocol, "Invalid transaction ID type/length."); 782 783 // Stupid broken implementations. 784 if (nodeId != NULL && *nodeId == m_router->id()) 785 throw dht_error(dht_error_protocol, "Send your own ID, not mine"); 786 787 switch (type) { 788 case 'q': 789 process_query(*nodeId, &sa, message); 790 break; 791 792 case 'r': 793 process_response(*nodeId, &sa, message); 794 break; 795 796 case 'e': 797 process_error(&sa, message); 798 break; 799 800 default: 801 throw dht_error(dht_error_bad_method, "Unknown message type."); 802 } 803 804 // If node was querying us, reply with error packet, otherwise mark the node as "query failed", 805 // so that if it repeatedly sends malformed replies we will drop it instead of propagating it 806 // to other nodes. 807 } catch (bencode_error& e) { 808 if ((type == 'r' || type == 'e') && nodeId != NULL) { 809 m_router->node_inactive(*nodeId, &sa); 810 } else { 811 snprintf(message.data_end, message.data + message.data_size - message.data_end - 1, "Malformed packet: %s", e.what()); 812 message.data[message.data_size - 1] = '\0'; 813 create_error(message, &sa, dht_error_protocol, message.data_end); 814 } 815 816 } catch (dht_error& e) { 817 if ((type == 'r' || type == 'e') && nodeId != NULL) 818 m_router->node_inactive(*nodeId, &sa); 819 else 820 create_error(message, &sa, e.code(), e.what()); 821 822 } catch (network_error& e) { 823 824 } 825 } 826 827 m_downloadThrottle->node_used_unthrottled(total); 828 m_downloadNode.rate()->insert(total); 829 830 start_write(); 831 } 832 833 bool 834 DhtServer::process_queue(packet_queue& queue, uint32_t* quota) { 835 uint32_t used = 0; 836 837 while (!queue.empty()) { 838 DhtTransactionPacket* packet = queue.front(); 839 DhtTransaction::key_type transactionKey = 0; 840 if(packet->has_transaction()) 841 transactionKey = packet->transaction()->key(packet->id()); 842 843 // Make sure its transaction hasn't timed out yet, if it has/had one 844 // and don't bother sending non-transaction packets (replies) after 845 // more than 15 seconds in the queue. 846 if (packet->has_failed() || packet->age() > 15) { 847 delete packet; 848 queue.pop_front(); 849 continue; 850 } 851 852 if (packet->length() > *quota) { 853 m_uploadThrottle->node_used(&m_uploadNode, used); 854 return false; 855 } 856 857 queue.pop_front(); 858 859 try { 860 int written = write_datagram(packet->c_str(), packet->length(), packet->address()); 861 862 if (written == -1) 863 throw network_error(); 864 865 used += written; 866 *quota -= written; 867 868 if ((unsigned int)written != packet->length()) 869 throw network_error(); 870 871 } catch (network_error& e) { 872 // Couldn't write packet, maybe something wrong with node address or routing, so mark node as bad. 873 if (packet->has_transaction()) { 874 transaction_itr itr = m_transactions.find(transactionKey); 875 if (itr == m_transactions.end()) 876 throw internal_error("DhtServer::process_queue could not find transaction."); 877 878 failed_transaction(itr, false); 879 } 880 } 881 882 if (packet->has_transaction()) { 883 // here transaction can be already deleted by failed_transaction. 884 transaction_itr itr = m_transactions.find(transactionKey); 885 if (itr != m_transactions.end()) 886 packet->transaction()->set_packet(NULL); 887 } 888 889 delete packet; 890 } 891 892 m_uploadThrottle->node_used(&m_uploadNode, used); 893 return true; 894 } 895 896 void 897 DhtServer::event_write() { 898 if (m_highQueue.empty() && m_lowQueue.empty()) 899 throw internal_error("DhtServer::event_write called but both write queues are empty."); 900 901 if (!m_uploadThrottle->is_throttled(&m_uploadNode)) 902 throw internal_error("DhtServer::event_write called while not in throttle list."); 903 904 uint32_t quota = m_uploadThrottle->node_quota(&m_uploadNode); 905 906 if (quota == 0 || !process_queue(m_highQueue, "a) || !process_queue(m_lowQueue, "a)) { 907 manager->poll()->remove_write(this); 908 m_uploadThrottle->node_deactivate(&m_uploadNode); 909 910 } else if (m_highQueue.empty() && m_lowQueue.empty()) { 911 manager->poll()->remove_write(this); 912 m_uploadThrottle->erase(&m_uploadNode); 913 } 914 } 915 916 void 917 DhtServer::event_error() { 918 } 919 920 void 921 DhtServer::start_write() { 922 if ((!m_highQueue.empty() || !m_lowQueue.empty()) && !m_uploadThrottle->is_throttled(&m_uploadNode)) { 923 m_uploadThrottle->insert(&m_uploadNode); 924 manager->poll()->insert_write(this); 925 } 926 927 if (!m_taskTimeout.is_queued() && !m_transactions.empty()) 928 priority_queue_insert(&taskScheduler, &m_taskTimeout, (cachedTime + rak::timer::from_seconds(5)).round_seconds()); 929 } 930 931 void 932 DhtServer::receive_timeout() { 933 transaction_itr itr = m_transactions.begin(); 934 while (itr != m_transactions.end()) { 935 if (itr->second->has_quick_timeout() && itr->second->quick_timeout() < cachedTime.seconds()) { 936 itr = failed_transaction(itr, true); 937 938 } else if (itr->second->timeout() < cachedTime.seconds()) { 939 itr = failed_transaction(itr, false); 940 941 } else { 942 ++itr; 943 } 944 } 945 946 start_write(); 947 } 948 949 } 950