1 /*
2 
3 Copyright (c) 2006-2018, Arvid Norberg & Daniel Wallin
4 All rights reserved.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
9 
10     * Redistributions of source code must retain the above copyright
11       notice, this list of conditions and the following disclaimer.
12     * Redistributions in binary form must reproduce the above copyright
13       notice, this list of conditions and the following disclaimer in
14       the documentation and/or other materials provided with the distribution.
15     * Neither the name of the author nor the names of its
16       contributors may be used to endorse or promote products derived
17       from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
30 
31 */
32 
33 #include <libtorrent/kademlia/traversal_algorithm.hpp>
34 #include <libtorrent/kademlia/rpc_manager.hpp>
35 #include <libtorrent/kademlia/node.hpp>
36 #include <libtorrent/kademlia/dht_observer.hpp> // for dht_logger
37 #include <libtorrent/kademlia/io.hpp>
38 #include <libtorrent/session_status.hpp>
39 #include <libtorrent/socket_io.hpp> // for read_*_endpoint
40 #include <libtorrent/alert_types.hpp> // for dht_lookup
41 #include <libtorrent/aux_/time.hpp>
42 
43 #ifndef TORRENT_DISABLE_LOGGING
44 #include <libtorrent/hex.hpp> // to_hex
45 #endif
46 
47 using namespace std::placeholders;
48 
49 namespace libtorrent {
50 namespace dht {
51 
52 constexpr traversal_flags_t traversal_algorithm::prevent_request;
53 constexpr traversal_flags_t traversal_algorithm::short_timeout;
54 
55 #if TORRENT_USE_ASSERTS
56 template <class It, class Cmp>
is_sorted(It b,It e,Cmp cmp)57 bool is_sorted(It b, It e, Cmp cmp)
58 {
59 	if (b == e) return true;
60 
61 	typename std::iterator_traits<It>::value_type v = *b;
62 	++b;
63 	while (b != e)
64 	{
65 		if (cmp(*b, v)) return false;
66 		v = *b;
67 		++b;
68 	}
69 	return true;
70 }
71 #endif
72 
new_observer(udp::endpoint const & ep,node_id const & id)73 observer_ptr traversal_algorithm::new_observer(udp::endpoint const& ep
74 	, node_id const& id)
75 {
76 	auto o = m_node.m_rpc.allocate_observer<null_observer>(self(), ep, id);
77 #if TORRENT_USE_ASSERTS
78 	if (o) o->m_in_constructor = false;
79 #endif
80 	return o;
81 }
82 
traversal_algorithm(node & dht_node,node_id const & target)83 traversal_algorithm::traversal_algorithm(node& dht_node, node_id const& target)
84 	: m_node(dht_node)
85 	, m_target(target)
86 {
87 #ifndef TORRENT_DISABLE_LOGGING
88 	m_id = m_node.search_id();
89 	dht_observer* logger = get_node().observer();
90 	if (logger != nullptr && logger->should_log(dht_logger::traversal))
91 	{
92 		logger->log(dht_logger::traversal, "[%u] NEW target: %s k: %d"
93 			, m_id, aux::to_hex(target).c_str(), m_node.m_table.bucket_size());
94 	}
95 #endif
96 }
97 
resort_result(observer * o)98 void traversal_algorithm::resort_result(observer* o)
99 {
100 	// find the given observer, remove it and insert it in its sorted location
101 	auto it = std::find_if(m_results.begin(), m_results.end()
102 		, [=](observer_ptr const& ptr) { return ptr.get() == o; });
103 
104 	if (it == m_results.end()) return;
105 
106 	if (it - m_results.begin() < m_sorted_results)
107 		--m_sorted_results;
108 
109 	observer_ptr ptr = std::move(*it);
110 	m_results.erase(it);
111 
112 	TORRENT_ASSERT(std::size_t(m_sorted_results) <= m_results.size());
113 	auto end = m_results.begin() + m_sorted_results;
114 
115 	TORRENT_ASSERT(libtorrent::dht::is_sorted(m_results.begin(), end
116 		, [this](observer_ptr const& lhs, observer_ptr const& rhs)
117 		{ return compare_ref(lhs->id(), rhs->id(), m_target); }));
118 
119 	auto iter = std::lower_bound(m_results.begin(), end, ptr
120 		, [this](observer_ptr const& lhs, observer_ptr const& rhs)
121 		{ return compare_ref(lhs->id(), rhs->id(), m_target); });
122 
123 	m_results.insert(iter, ptr);
124 	++m_sorted_results;
125 }
126 
add_entry(node_id const & id,udp::endpoint const & addr,observer_flags_t const flags)127 void traversal_algorithm::add_entry(node_id const& id
128 	, udp::endpoint const& addr, observer_flags_t const flags)
129 {
130 	if (m_done) return;
131 
132 	TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer));
133 	auto o = new_observer(addr, id);
134 	if (!o)
135 	{
136 #ifndef TORRENT_DISABLE_LOGGING
137 		if (get_node().observer() != nullptr)
138 		{
139 			get_node().observer()->log(dht_logger::traversal, "[%u] failed to allocate memory or observer. aborting!"
140 				, m_id);
141 		}
142 #endif
143 		done();
144 		return;
145 	}
146 
147 	o->flags |= flags;
148 
149 	if (id.is_all_zeros())
150 	{
151 		o->set_id(generate_random_id());
152 		o->flags |= observer::flag_no_id;
153 
154 		m_results.push_back(o);
155 
156 #ifndef TORRENT_DISABLE_LOGGING
157 		dht_observer* logger = get_node().observer();
158 		if (logger != nullptr && logger->should_log(dht_logger::traversal))
159 		{
160 			logger->log(dht_logger::traversal
161 				, "[%u] ADD (no-id) id: %s addr: %s distance: %d invoke-count: %d type: %s"
162 				, m_id, aux::to_hex(id).c_str(), print_endpoint(addr).c_str()
163 				, distance_exp(m_target, id), m_invoke_count, name());
164 		}
165 #endif
166 	}
167 	else
168 	{
169 		TORRENT_ASSERT(std::size_t(m_sorted_results) <= m_results.size());
170 		auto end = m_results.begin() + m_sorted_results;
171 
172 		TORRENT_ASSERT(libtorrent::dht::is_sorted(m_results.begin(), end
173 				, [this](observer_ptr const& lhs, observer_ptr const& rhs)
174 				{ return compare_ref(lhs->id(), rhs->id(), m_target); }));
175 
176 		auto iter = std::lower_bound(m_results.begin(), end, o
177 			, [this](observer_ptr const& lhs, observer_ptr const& rhs)
178 			{ return compare_ref(lhs->id(), rhs->id(), m_target); });
179 
180 		if (iter == end || (*iter)->id() != id)
181 		{
182 			// this IP restriction does not apply to the nodes we loaded from out
183 			// node cache
184 			if (m_node.settings().restrict_search_ips
185 				&& !(flags & observer::flag_initial))
186 			{
187 				if (o->target_addr().is_v6())
188 				{
189 					address_v6::bytes_type addr_bytes = o->target_addr().to_v6().to_bytes();
190 					auto prefix_it = addr_bytes.cbegin();
191 					std::uint64_t const prefix6 = detail::read_uint64(prefix_it);
192 
193 					if (m_peer6_prefixes.insert(prefix6).second)
194 						goto add_result;
195 				}
196 				else
197 				{
198 					// mask the lower octet
199 					std::uint32_t const prefix4
200 						= o->target_addr().to_v4().to_ulong() & 0xffffff00;
201 
202 					if (m_peer4_prefixes.insert(prefix4).second)
203 						goto add_result;
204 				}
205 
206 				// we already have a node in this search with an IP very
207 				// close to this one. We know that it's not the same, because
208 				// it claims a different node-ID. Ignore this to avoid attacks
209 #ifndef TORRENT_DISABLE_LOGGING
210 				dht_observer* logger = get_node().observer();
211 				if (logger != nullptr && logger->should_log(dht_logger::traversal))
212 				{
213 					logger->log(dht_logger::traversal
214 						, "[%u] traversal DUPLICATE node. id: %s addr: %s type: %s"
215 						, m_id, aux::to_hex(o->id()).c_str(), print_address(o->target_addr()).c_str(), name());
216 				}
217 #endif
218 				return;
219 			}
220 
221 	add_result:
222 
223 			TORRENT_ASSERT((o->flags & observer::flag_no_id)
224 				|| std::none_of(m_results.begin(), end
225 					, [&id](observer_ptr const& ob) { return ob->id() == id; }));
226 
227 #ifndef TORRENT_DISABLE_LOGGING
228 			dht_observer* logger = get_node().observer();
229 			if (logger != nullptr && logger->should_log(dht_logger::traversal))
230 			{
231 				logger->log(dht_logger::traversal
232 					, "[%u] ADD id: %s addr: %s distance: %d invoke-count: %d type: %s"
233 					, m_id, aux::to_hex(id).c_str(), print_endpoint(addr).c_str()
234 					, distance_exp(m_target, id), m_invoke_count, name());
235 			}
236 #endif
237 			m_results.insert(iter, o);
238 			++m_sorted_results;
239 		}
240 	}
241 
242 	TORRENT_ASSERT(std::size_t(m_sorted_results) <= m_results.size());
243 	TORRENT_ASSERT(libtorrent::dht::is_sorted(m_results.begin()
244 		, m_results.begin() + m_sorted_results
245 		, [this](observer_ptr const& lhs, observer_ptr const& rhs)
246 		{ return compare_ref(lhs->id(), rhs->id(), m_target); }));
247 
248 	if (m_results.size() > 100)
249 	{
250 		std::for_each(m_results.begin() + 100, m_results.end()
251 			, [this](std::shared_ptr<observer> const& ptr)
252 		{
253 			if ((ptr->flags & (observer::flag_queried | observer::flag_failed | observer::flag_alive))
254 				== observer::flag_queried)
255 			{
256 				// set the done flag on any outstanding queries to prevent them from
257 				// calling finished() or failed()
258 				ptr->flags |= observer::flag_done;
259 				TORRENT_ASSERT(m_invoke_count > 0);
260 				--m_invoke_count;
261 			}
262 
263 #if TORRENT_USE_ASSERTS
264 			ptr->m_was_abandoned = true;
265 #endif
266 		});
267 		m_results.resize(100);
268 		m_sorted_results = std::min(std::int8_t(100), m_sorted_results);
269 	}
270 }
271 
start()272 void traversal_algorithm::start()
273 {
274 	// in case the routing table is empty, use the
275 	// router nodes in the table
276 	if (m_results.size() < 3) add_router_entries();
277 	init();
278 	bool const is_done = add_requests();
279 	if (is_done) done();
280 }
281 
name() const282 char const* traversal_algorithm::name() const
283 {
284 	return "traversal_algorithm";
285 }
286 
traverse(node_id const & id,udp::endpoint const & addr)287 void traversal_algorithm::traverse(node_id const& id, udp::endpoint const& addr)
288 {
289 	if (m_done) return;
290 
291 #ifndef TORRENT_DISABLE_LOGGING
292 	dht_observer* logger = get_node().observer();
293 	if (logger != nullptr && logger->should_log(dht_logger::traversal) && id.is_all_zeros())
294 	{
295 		logger->log(dht_logger::traversal
296 			, "[%u] WARNING node returned a list which included a node with id 0"
297 			, m_id);
298 	}
299 #endif
300 
301 	// let the routing table know this node may exist
302 	m_node.m_table.heard_about(id, addr);
303 
304 	add_entry(id, addr, {});
305 }
306 
finished(observer_ptr o)307 void traversal_algorithm::finished(observer_ptr o)
308 {
309 #if TORRENT_USE_ASSERTS
310 	auto i = std::find(m_results.begin(), m_results.end(), o);
311 	TORRENT_ASSERT(i != m_results.end() || m_results.size() == 100);
312 #endif
313 
314 	// if this flag is set, it means we increased the
315 	// branch factor for it, and we should restore it
316 	if (o->flags & observer::flag_short_timeout)
317 	{
318 		TORRENT_ASSERT(m_branch_factor > 0);
319 		--m_branch_factor;
320 	}
321 
322 	TORRENT_ASSERT(o->flags & observer::flag_queried);
323 	o->flags |= observer::flag_alive;
324 
325 	++m_responses;
326 	TORRENT_ASSERT(m_invoke_count > 0);
327 	--m_invoke_count;
328 	bool const is_done = add_requests();
329 	if (is_done) done();
330 }
331 
332 // prevent request means that the total number of requests has
333 // overflown. This query failed because it was the oldest one.
334 // So, if this is true, don't make another request
failed(observer_ptr o,traversal_flags_t const flags)335 void traversal_algorithm::failed(observer_ptr o, traversal_flags_t const flags)
336 {
337 	// don't tell the routing table about
338 	// node ids that we just generated ourself
339 	if (!(o->flags & observer::flag_no_id))
340 		m_node.m_table.node_failed(o->id(), o->target_ep());
341 
342 	if (m_results.empty()) return;
343 
344 	bool decrement_branch_factor = false;
345 
346 	TORRENT_ASSERT(o->flags & observer::flag_queried);
347 	if (flags & short_timeout)
348 	{
349 		// short timeout means that it has been more than
350 		// two seconds since we sent the request, and that
351 		// we'll most likely not get a response. But, in case
352 		// we do get a late response, keep the handler
353 		// around for some more, but open up the slot
354 		// by increasing the branch factor
355 		if (!(o->flags & observer::flag_short_timeout)
356 			&& m_branch_factor < std::numeric_limits<std::int8_t>::max())
357 		{
358 			++m_branch_factor;
359 			o->flags |= observer::flag_short_timeout;
360 		}
361 #ifndef TORRENT_DISABLE_LOGGING
362 		log_timeout(o, "1ST_");
363 #endif
364 	}
365 	else
366 	{
367 		o->flags |= observer::flag_failed;
368 		// if this flag is set, it means we increased the
369 		// branch factor for it, and we should restore it
370 		decrement_branch_factor = bool(o->flags & observer::flag_short_timeout);
371 
372 #ifndef TORRENT_DISABLE_LOGGING
373 		log_timeout(o,"");
374 #endif
375 
376 		++m_timeouts;
377 		TORRENT_ASSERT(m_invoke_count > 0);
378 		--m_invoke_count;
379 	}
380 
381 	// this is another reason to decrement the branch factor, to prevent another
382 	// request from filling this slot. Only ever decrement once per response though
383 	decrement_branch_factor |= bool(flags & prevent_request);
384 
385 	if (decrement_branch_factor)
386 	{
387 		TORRENT_ASSERT(m_branch_factor > 0);
388 		--m_branch_factor;
389 		if (m_branch_factor <= 0) m_branch_factor = 1;
390 	}
391 
392 	bool const is_done = add_requests();
393 	if (is_done) done();
394 }
395 
396 #ifndef TORRENT_DISABLE_LOGGING
log_timeout(observer_ptr const & o,char const * prefix) const397 void traversal_algorithm::log_timeout(observer_ptr const& o, char const* prefix) const
398 {
399 	dht_observer * logger = get_node().observer();
400 	if (logger != nullptr && logger->should_log(dht_logger::traversal))
401 	{
402 		logger->log(dht_logger::traversal
403 			, "[%u] %sTIMEOUT id: %s distance: %d addr: %s branch-factor: %d "
404 			"invoke-count: %d type: %s"
405 			, m_id, prefix, aux::to_hex(o->id()).c_str(), distance_exp(m_target, o->id())
406 			, print_address(o->target_addr()).c_str(), m_branch_factor
407 			, m_invoke_count, name());
408 	}
409 
410 }
411 #endif
412 
done()413 void traversal_algorithm::done()
414 {
415 	TORRENT_ASSERT(m_done == false);
416 	m_done = true;
417 #ifndef TORRENT_DISABLE_LOGGING
418 	int results_target = m_node.m_table.bucket_size();
419 	int closest_target = 160;
420 #endif
421 
422 	for (auto const& o : m_results)
423 	{
424 		if ((o->flags & (observer::flag_queried | observer::flag_failed)) == observer::flag_queried)
425 		{
426 			// set the done flag on any outstanding queries to prevent them from
427 			// calling finished() or failed() after we've already declared the traversal
428 			// done
429 			o->flags |= observer::flag_done;
430 		}
431 
432 #ifndef TORRENT_DISABLE_LOGGING
433 		dht_observer* logger = get_node().observer();
434 		if (results_target > 0 && (o->flags & observer::flag_alive)
435 			&& logger != nullptr && logger->should_log(dht_logger::traversal))
436 		{
437 			TORRENT_ASSERT(o->flags & observer::flag_queried);
438 			logger->log(dht_logger::traversal
439 				, "[%u] id: %s distance: %d addr: %s"
440 				, m_id, aux::to_hex(o->id()).c_str(), closest_target
441 				, print_endpoint(o->target_ep()).c_str());
442 
443 			--results_target;
444 			int const dist = distance_exp(m_target, o->id());
445 			if (dist < closest_target) closest_target = dist;
446 		}
447 #endif
448 	}
449 
450 #ifndef TORRENT_DISABLE_LOGGING
451 	if (get_node().observer() != nullptr)
452 	{
453 		get_node().observer()->log(dht_logger::traversal
454 			, "[%u] COMPLETED distance: %d type: %s"
455 			, m_id, closest_target, name());
456 	}
457 #endif
458 
459 	// delete all our references to the observer objects so
460 	// they will in turn release the traversal algorithm
461 	m_results.clear();
462 	m_sorted_results = 0;
463 	m_invoke_count = 0;
464 }
465 
add_requests()466 bool traversal_algorithm::add_requests()
467 {
468 	if (m_done) return true;
469 
470 	int results_target = m_node.m_table.bucket_size();
471 
472 	// this only counts outstanding requests at the top of the
473 	// target list. This is <= m_invoke count. m_invoke_count
474 	// is the total number of outstanding requests, including
475 	// old ones that may be waiting on nodes much farther behind
476 	// the current point we've reached in the search.
477 	int outstanding = 0;
478 
479 	// if we're doing aggressive lookups, we keep branch-factor
480 	// outstanding requests _at the tops_ of the result list. Otherwise
481 	// we just keep any branch-factor outstanding requests
482 	bool const agg = m_node.settings().aggressive_lookups;
483 
484 	// Find the first node that hasn't already been queried.
485 	// and make sure that the 'm_branch_factor' top nodes
486 	// stay queried at all times (obviously ignoring failed nodes)
487 	// and without surpassing the 'result_target' nodes (i.e. k=8)
488 	// this is a slight variation of the original paper which instead
489 	// limits the number of outstanding requests, this limits the
490 	// number of good outstanding requests. It will use more traffic,
491 	// but is intended to speed up lookups
492 	for (auto i = m_results.begin()
493 		, end(m_results.end()); i != end
494 		&& results_target > 0
495 		&& (agg ? outstanding < m_branch_factor
496 			: m_invoke_count < m_branch_factor);
497 		++i)
498 	{
499 		observer* o = i->get();
500 		if (o->flags & observer::flag_alive)
501 		{
502 			TORRENT_ASSERT(o->flags & observer::flag_queried);
503 			--results_target;
504 			continue;
505 		}
506 		if (o->flags & observer::flag_queried)
507 		{
508 			// if it's queried, not alive and not failed, it
509 			// must be currently in flight
510 			if (!(o->flags & observer::flag_failed))
511 				++outstanding;
512 
513 			continue;
514 		}
515 
516 #ifndef TORRENT_DISABLE_LOGGING
517 		dht_observer* logger = get_node().observer();
518 		if (logger != nullptr && logger->should_log(dht_logger::traversal))
519 		{
520 			logger->log(dht_logger::traversal
521 				, "[%u] INVOKE nodes-left: %d top-invoke-count: %d "
522 				"invoke-count: %d branch-factor: %d "
523 				"distance: %d id: %s addr: %s type: %s"
524 				, m_id, int(m_results.end() - i), outstanding, int(m_invoke_count)
525 				, int(m_branch_factor), distance_exp(m_target, o->id()), aux::to_hex(o->id()).c_str()
526 				, print_address(o->target_addr()).c_str(), name());
527 		}
528 #endif
529 
530 		o->flags |= observer::flag_queried;
531 		if (invoke(*i))
532 		{
533 			TORRENT_ASSERT(m_invoke_count < std::numeric_limits<std::int8_t>::max());
534 			++m_invoke_count;
535 			++outstanding;
536 		}
537 		else
538 		{
539 			o->flags |= observer::flag_failed;
540 		}
541 	}
542 
543 	// this is the completion condition. If we found m_node.m_table.bucket_size()
544 	// (i.e. k=8) completed results, without finding any still
545 	// outstanding requests, we're done.
546 	// also, if invoke count is 0, it means we didn't even find 'k'
547 	// working nodes, we still have to terminate though.
548 	return (results_target == 0 && outstanding == 0) || m_invoke_count == 0;
549 }
550 
add_router_entries()551 void traversal_algorithm::add_router_entries()
552 {
553 #ifndef TORRENT_DISABLE_LOGGING
554 	dht_observer* logger = get_node().observer();
555 	if (logger != nullptr && logger->should_log(dht_logger::traversal))
556 	{
557 		logger->log(dht_logger::traversal
558 			, "[%u] using router nodes to initiate traversal algorithm %d routers"
559 			, m_id, int(std::distance(m_node.m_table.begin(), m_node.m_table.end())));
560 	}
561 #endif
562 	for (auto const& n : m_node.m_table)
563 		add_entry(node_id(), n, observer::flag_initial);
564 }
565 
init()566 void traversal_algorithm::init()
567 {
568 	m_branch_factor = aux::numeric_cast<std::int8_t>(m_node.branch_factor());
569 	m_node.add_traversal_algorithm(this);
570 }
571 
~traversal_algorithm()572 traversal_algorithm::~traversal_algorithm()
573 {
574 	m_node.remove_traversal_algorithm(this);
575 }
576 
status(dht_lookup & l)577 void traversal_algorithm::status(dht_lookup& l)
578 {
579 	l.timeouts = m_timeouts;
580 	l.responses = m_responses;
581 	l.outstanding_requests = m_invoke_count;
582 	l.branch_factor = m_branch_factor;
583 	l.type = name();
584 	l.nodes_left = 0;
585 	l.first_timeout = 0;
586 	l.target = m_target;
587 
588 	int last_sent = INT_MAX;
589 	time_point const now = aux::time_now();
590 	for (auto const& r : m_results)
591 	{
592 		observer const& o = *r;
593 		if (o.flags & observer::flag_queried)
594 		{
595 			last_sent = std::min(last_sent, int(total_seconds(now - o.sent())));
596 			if (o.has_short_timeout()) ++l.first_timeout;
597 			continue;
598 		}
599 		++l.nodes_left;
600 	}
601 	l.last_sent = last_sent;
602 }
603 
look_for_nodes(char const * nodes_key,udp const & protocol,bdecode_node const & r,std::function<void (const node_endpoint &)> f)604 void look_for_nodes(char const* nodes_key, udp const& protocol, bdecode_node const& r, std::function<void(const node_endpoint&)> f)
605 {
606 	bdecode_node const n = r.dict_find_string(nodes_key);
607 	if (n)
608 	{
609 		char const* nodes = n.string_ptr();
610 		char const* end = nodes + n.string_length();
611 		int const protocol_size = int(detail::address_size(protocol));
612 
613 		while (end - nodes >= 20 + protocol_size + 2)
614 		{
615 			f(read_node_endpoint(protocol, nodes));
616 		}
617 	}
618 }
619 
reply(msg const & m)620 void traversal_observer::reply(msg const& m)
621 {
622 	bdecode_node const r = m.message.dict_find_dict("r");
623 	if (!r)
624 	{
625 #ifndef TORRENT_DISABLE_LOGGING
626 		if (get_observer() != nullptr)
627 		{
628 			get_observer()->log(dht_logger::traversal
629 				, "[%u] missing response dict"
630 				, algorithm()->id());
631 		}
632 #endif
633 		return;
634 	}
635 
636 	bdecode_node const id = r.dict_find_string("id");
637 
638 #ifndef TORRENT_DISABLE_LOGGING
639 	dht_observer* logger = get_observer();
640 	if (logger != nullptr && logger->should_log(dht_logger::traversal))
641 	{
642 		char hex_id[41];
643 		aux::to_hex({id.string_ptr(), 20}, hex_id);
644 		logger->log(dht_logger::traversal
645 			, "[%u] RESPONSE id: %s invoke-count: %d addr: %s type: %s"
646 			, algorithm()->id(), hex_id, algorithm()->invoke_count()
647 			, print_endpoint(target_ep()).c_str(), algorithm()->name());
648 	}
649 #endif
650 
651 	look_for_nodes(algorithm()->get_node().protocol_nodes_key(), algorithm()->get_node().protocol(), r,
652 		[this](node_endpoint const& nep) { algorithm()->traverse(nep.id, nep.ep); });
653 
654 	if (!id || id.string_length() != 20)
655 	{
656 #ifndef TORRENT_DISABLE_LOGGING
657 		if (get_observer() != nullptr)
658 		{
659 			get_observer()->log(dht_logger::traversal, "[%u] invalid id in response"
660 				, algorithm()->id());
661 		}
662 #endif
663 		return;
664 	}
665 
666 	// in case we didn't know the id of this peer when we sent the message to
667 	// it. For instance if it's a bootstrap node.
668 	set_id(node_id(id.string_ptr()));
669 }
670 
671 } } // namespace libtorrent::dht
672