1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4
5 #include "NodeTable.h"
6 using namespace std;
7
8 namespace dev
9 {
10 namespace p2p
11 {
12 namespace
13 {
14 // global thread-safe logger for static methods
15 BOOST_LOG_INLINE_GLOBAL_LOGGER_CTOR_ARGS(g_discoveryWarnLogger,
16 boost::log::sources::severity_channel_logger_mt<>,
17 (boost::log::keywords::severity = 0)(boost::log::keywords::channel = "discov"))
18
19 // Cadence at which we timeout sent pings and evict unresponsive nodes
20 constexpr chrono::milliseconds c_handleTimeoutsIntervalMs{5000};
21 // Cadence at which we remove old records from EndpointTracker
22 constexpr chrono::milliseconds c_removeOldEndpointStatementsIntervalMs{5000};
23 // Change external endpoint after this number of peers report new one
24 constexpr size_t c_minEndpointTrackStatements{10};
25 // Interval during which each endpoint statement is kept
26 constexpr std::chrono::minutes c_endpointStatementTimeToLiveMin{5};
27
28 } // namespace
29
30 constexpr chrono::seconds DiscoveryDatagram::c_timeToLiveS;
31 constexpr chrono::milliseconds NodeTable::c_reqTimeoutMs;
32 constexpr chrono::milliseconds NodeTable::c_bucketRefreshMs;
33 constexpr chrono::milliseconds NodeTable::c_discoveryRoundIntervalMs;
34
operator ==(weak_ptr<NodeEntry> const & _weak,shared_ptr<NodeEntry> const & _shared)35 inline bool operator==(weak_ptr<NodeEntry> const& _weak, shared_ptr<NodeEntry> const& _shared)
36 {
37 return !_weak.owner_before(_shared) && !_shared.owner_before(_weak);
38 }
39
NodeTable(ba::io_context & _io,KeyPair const & _alias,NodeIPEndpoint const & _endpoint,ENR const & _enr,bool _enabled,bool _allowLocalDiscovery)40 NodeTable::NodeTable(ba::io_context& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint,
41 ENR const& _enr, bool _enabled, bool _allowLocalDiscovery)
42 : m_hostNodeID{_alias.pub()},
43 m_hostNodeIDHash{sha3(m_hostNodeID)},
44 m_hostStaticIP{isAllowedEndpoint(_endpoint) ? _endpoint.address() : bi::address{}},
45 m_hostNodeEndpoint{_enr.ip(), _enr.udpPort(), _enr.tcpPort()},
46 m_hostENR{_enr},
47 m_secret{_alias.secret()},
48 m_socket{make_shared<NodeSocket>(
49 _io, static_cast<UDPSocketEvents&>(*this), (bi::udp::endpoint)_endpoint)},
50 m_requestTimeToLive{DiscoveryDatagram::c_timeToLiveS},
51 m_allowLocalDiscovery{_allowLocalDiscovery},
52 m_discoveryTimer{make_shared<ba::steady_timer>(_io)},
53 m_timeoutsTimer{make_shared<ba::steady_timer>(_io)},
54 m_endpointTrackingTimer{make_shared<ba::steady_timer>(_io)},
55 m_io{_io}
56 {
57 for (unsigned i = 0; i < s_bins; i++)
58 m_buckets[i].distance = i;
59
60 if (!_enabled)
61 {
62 cwarn << "\"_enabled\" parameter is false, discovery is disabled";
63 return;
64 }
65
66 try
67 {
68 m_socket->connect();
69 doDiscovery();
70 doHandleTimeouts();
71 doEndpointTracking();
72 }
73 catch (exception const& _e)
74 {
75 cwarn << "Exception connecting NodeTable socket: " << _e.what();
76 cwarn << "Discovery disabled.";
77 }
78 }
79
processEvents()80 void NodeTable::processEvents()
81 {
82 if (m_nodeEventHandler)
83 m_nodeEventHandler->processEvents();
84 }
85
addNode(Node const & _node)86 bool NodeTable::addNode(Node const& _node)
87 {
88 LOG(m_logger) << "Adding node " << _node;
89
90 if (!isValidNode(_node))
91 return false;
92
93 bool needToPing = false;
94 DEV_GUARDED(x_nodes)
95 {
96 auto const it = m_allNodes.find(_node.id);
97 needToPing = (it == m_allNodes.end() || it->second->endpoint() != _node.endpoint ||
98 !it->second->hasValidEndpointProof());
99 }
100
101 if (needToPing)
102 {
103 LOG(m_logger) << "Pending " << _node;
104 schedulePing(_node);
105 }
106
107 return true;
108 }
109
addKnownNode(Node const & _node,uint32_t _lastPongReceivedTime,uint32_t _lastPongSentTime)110 bool NodeTable::addKnownNode(
111 Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime)
112 {
113 LOG(m_logger) << "Adding known node " << _node;
114
115 if (!isValidNode(_node))
116 return false;
117
118 if (nodeEntry(_node.id))
119 {
120 LOG(m_logger) << "Node " << _node << " is already in the node table";
121 return true;
122 }
123
124 auto entry = make_shared<NodeEntry>(
125 m_hostNodeIDHash, _node.id, _node.endpoint, _lastPongReceivedTime, _lastPongSentTime);
126
127 if (entry->hasValidEndpointProof())
128 {
129 LOG(m_logger) << "Known " << _node;
130 noteActiveNode(move(entry));
131 }
132 else
133 {
134 LOG(m_logger) << "Pending " << _node;
135 schedulePing(_node);
136 }
137
138 return true;
139 }
140
isValidNode(Node const & _node) const141 bool NodeTable::isValidNode(Node const& _node) const
142 {
143 if (!_node.endpoint || !_node.id)
144 {
145 LOG(m_logger) << "Supplied node " << _node
146 << " has an invalid endpoint or id. Skipping adding node to node table.";
147 return false;
148 }
149
150 if (!isAllowedEndpoint(_node.endpoint))
151 {
152 LOG(m_logger) << "Supplied node" << _node
153 << " doesn't have an allowed endpoint. Skipping adding node to node table";
154 return false;
155 }
156
157 if (m_hostNodeID == _node.id)
158 {
159 LOG(m_logger) << "Skip adding self to node table (" << _node.id << ")";
160 return false;
161 }
162
163 return true;
164 }
165
nodes() const166 list<NodeID> NodeTable::nodes() const
167 {
168 list<NodeID> nodes;
169 DEV_GUARDED(x_nodes)
170 {
171 for (auto& i : m_allNodes)
172 nodes.push_back(i.second->id());
173 }
174 return nodes;
175 }
176
snapshot() const177 list<NodeEntry> NodeTable::snapshot() const
178 {
179 list<NodeEntry> ret;
180 DEV_GUARDED(x_state)
181 {
182 for (auto const& s : m_buckets)
183 for (auto const& np : s.nodes)
184 if (auto n = np.lock())
185 ret.push_back(*n);
186 }
187 return ret;
188 }
189
node(NodeID const & _id)190 Node NodeTable::node(NodeID const& _id)
191 {
192 Guard l(x_nodes);
193 auto const it = m_allNodes.find(_id);
194 if (it != m_allNodes.end())
195 {
196 auto const& entry = it->second;
197 return Node(_id, entry->endpoint(), entry->peerType());
198 }
199 return UnspecifiedNode;
200 }
201
nodeEntry(NodeID const & _id)202 shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeID const& _id)
203 {
204 Guard l(x_nodes);
205 auto const it = m_allNodes.find(_id);
206 return it != m_allNodes.end() ? it->second : shared_ptr<NodeEntry>();
207 }
208
doDiscoveryRound(NodeID _node,unsigned _round,shared_ptr<set<shared_ptr<NodeEntry>>> _tried)209 void NodeTable::doDiscoveryRound(
210 NodeID _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
211 {
212 // NOTE: ONLY called by doDiscovery or "recursively" via lambda scheduled via timer at
213 // the end of this function
214 if (!m_socket->isOpen())
215 return;
216
217 // send s_alpha FindNode packets to nodes we know, closest to target
218 auto const nearestNodes = nearestNodeEntries(_node);
219 auto newTriedCount = 0;
220 for (auto const& nodeEntry : nearestNodes)
221 {
222 if (!contains(*_tried, nodeEntry))
223 {
224 // Avoid sending FindNode, if we have not sent a valid PONG lately.
225 // This prevents being considered invalid node and FindNode being ignored.
226 if (!nodeEntry->hasValidEndpointProof())
227 {
228 LOG(m_logger) << "Node " << nodeEntry->node << " endpoint proof expired.";
229 ping(nodeEntry->node);
230 continue;
231 }
232
233 FindNode p(nodeEntry->endpoint(), _node);
234 p.expiration = nextRequestExpirationTime();
235 p.sign(m_secret);
236 m_sentFindNodes.emplace_back(nodeEntry->id(), chrono::steady_clock::now());
237 LOG(m_logger) << p.typeName() << " to " << nodeEntry->node << " (target: " << _node
238 << ")";
239 m_socket->send(p);
240
241 _tried->emplace(nodeEntry);
242 if (++newTriedCount == s_alpha)
243 break;
244 }
245 }
246
247 if (_round == s_maxSteps || newTriedCount == 0)
248 {
249 LOG(m_logger) << "Terminating discover after " << _round << " rounds.";
250 doDiscovery();
251 return;
252 }
253
254 m_discoveryTimer->expires_after(c_discoveryRoundIntervalMs);
255 auto discoveryTimer{m_discoveryTimer};
256 m_discoveryTimer->async_wait(
257 [this, discoveryTimer, _node, _round, _tried](boost::system::error_code const& _ec) {
258 // We can't use m_logger here if there's an error because captured this might already be
259 // destroyed
260 if (_ec.value() == boost::asio::error::operation_aborted ||
261 discoveryTimer->expiry() == c_steadyClockMin)
262 {
263 clog(VerbosityDebug, "discov") << "Discovery timer was probably cancelled";
264 return;
265 }
266 else if (_ec)
267 {
268 clog(VerbosityDebug, "discov")
269 << "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
270 return;
271 }
272
273 doDiscoveryRound(_node, _round + 1, _tried);
274 });
275 }
276
nearestNodeEntries(NodeID const & _target)277 vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeID const& _target)
278 {
279 auto const distanceToTargetLess = [](pair<int, shared_ptr<NodeEntry>> const& _node1,
280 pair<int, shared_ptr<NodeEntry>> const& _node2) {
281 return _node1.first < _node2.first;
282 };
283
284 h256 const targetHash = sha3(_target);
285
286 std::multiset<pair<int, shared_ptr<NodeEntry>>, decltype(distanceToTargetLess)>
287 nodesByDistanceToTarget(distanceToTargetLess);
288 for (auto const& bucket : m_buckets)
289 for (auto const& nodeWeakPtr : bucket.nodes)
290 if (auto node = nodeWeakPtr.lock())
291 {
292 nodesByDistanceToTarget.emplace(distance(targetHash, node->nodeIDHash), node);
293
294 if (nodesByDistanceToTarget.size() > s_bucketSize)
295 nodesByDistanceToTarget.erase(--nodesByDistanceToTarget.end());
296 }
297
298 vector<shared_ptr<NodeEntry>> ret;
299 for (auto& distanceAndNode : nodesByDistanceToTarget)
300 ret.emplace_back(move(distanceAndNode.second));
301
302 return ret;
303 }
304
ping(Node const & _node,shared_ptr<NodeEntry> _replacementNodeEntry)305 void NodeTable::ping(Node const& _node, shared_ptr<NodeEntry> _replacementNodeEntry)
306 {
307 if (!m_socket->isOpen())
308 return;
309
310 // Don't send Ping if one is already sent
311 if (m_sentPings.find(_node.endpoint) != m_sentPings.end())
312 {
313 LOG(m_logger) << "Ignoring request to ping " << _node << ", because it's already pinged";
314 return;
315 }
316
317 PingNode p{m_hostNodeEndpoint, _node.endpoint};
318 p.expiration = nextRequestExpirationTime();
319 p.seq = m_hostENR.sequenceNumber();
320 auto const pingHash = p.sign(m_secret);
321 LOG(m_logger) << p.typeName() << " to " << _node;
322 m_socket->send(p);
323
324 NodeValidation const validation{_node.id, _node.endpoint.tcpPort(), chrono::steady_clock::now(),
325 pingHash, _replacementNodeEntry};
326 m_sentPings.insert({_node.endpoint, validation});
327 }
328
schedulePing(Node const & _node)329 void NodeTable::schedulePing(Node const& _node)
330 {
331 post(m_io, [this, _node] { ping(_node, {}); });
332 }
333
evict(NodeEntry const & _leastSeen,shared_ptr<NodeEntry> _replacement)334 void NodeTable::evict(NodeEntry const& _leastSeen, shared_ptr<NodeEntry> _replacement)
335 {
336 if (!m_socket->isOpen())
337 return;
338
339 LOG(m_logger) << "Evicting node " << _leastSeen.node;
340 ping(_leastSeen.node, move(_replacement));
341
342 if (m_nodeEventHandler)
343 m_nodeEventHandler->appendEvent(_leastSeen.id(), NodeEntryScheduledForEviction);
344 }
345
noteActiveNode(shared_ptr<NodeEntry> _nodeEntry)346 void NodeTable::noteActiveNode(shared_ptr<NodeEntry> _nodeEntry)
347 {
348 assert(_nodeEntry);
349
350 if (_nodeEntry->id() == m_hostNodeID)
351 {
352 LOG(m_logger) << "Skipping making self active.";
353 return;
354 }
355 if (!isAllowedEndpoint(_nodeEntry->endpoint()))
356 {
357 LOG(m_logger) << "Skipping making node with unallowed endpoint active. Node "
358 << _nodeEntry->node;
359 return;
360 }
361
362 if (!_nodeEntry->hasValidEndpointProof())
363 return;
364
365 LOG(m_logger) << "Active node " << _nodeEntry->node;
366
367 shared_ptr<NodeEntry> nodeToEvict;
368 {
369 Guard l(x_state);
370 // Find a bucket to put a node to
371 NodeBucket& s = bucket_UNSAFE(_nodeEntry.get());
372 auto& nodes = s.nodes;
373
374 // check if the node is already in the bucket
375 auto it = find(nodes.begin(), nodes.end(), _nodeEntry);
376 if (it != nodes.end())
377 {
378 // if it was in the bucket, move it to the last position
379 nodes.splice(nodes.end(), nodes, it);
380 }
381 else
382 {
383 if (nodes.size() < s_bucketSize)
384 {
385 // if it was not there, just add it as a most recently seen node
386 // (i.e. to the end of the list)
387 nodes.push_back(_nodeEntry);
388 DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id(), _nodeEntry}); }
389 if (m_nodeEventHandler)
390 m_nodeEventHandler->appendEvent(_nodeEntry->id(), NodeEntryAdded);
391 }
392 else
393 {
394 // if bucket is full, start eviction process for the least recently seen node
395 nodeToEvict = nodes.front().lock();
396 // It could have been replaced in addNode(), then weak_ptr is expired.
397 // If so, just add a new one instead of expired
398 if (!nodeToEvict)
399 {
400 nodes.pop_front();
401 nodes.push_back(_nodeEntry);
402 DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id(), _nodeEntry}); }
403 if (m_nodeEventHandler)
404 m_nodeEventHandler->appendEvent(_nodeEntry->id(), NodeEntryAdded);
405 }
406 }
407 }
408 }
409
410 if (nodeToEvict)
411 evict(*nodeToEvict, _nodeEntry);
412 }
413
dropNode(shared_ptr<NodeEntry> _n)414 void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
415 {
416 // remove from nodetable
417 {
418 Guard l(x_state);
419 NodeBucket& s = bucket_UNSAFE(_n.get());
420 s.nodes.remove_if(
421 [_n](weak_ptr<NodeEntry> const& _bucketEntry) { return _bucketEntry == _n; });
422 }
423
424 DEV_GUARDED(x_nodes) { m_allNodes.erase(_n->id()); }
425
426 // notify host
427 LOG(m_logger) << "p2p.nodes.drop " << _n->id();
428 if (m_nodeEventHandler)
429 m_nodeEventHandler->appendEvent(_n->id(), NodeEntryDropped);
430 }
431
bucket_UNSAFE(NodeEntry const * _n)432 NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
433 {
434 return m_buckets[_n->distance - 1];
435 }
436
onPacketReceived(UDPSocketFace *,bi::udp::endpoint const & _from,bytesConstRef _packet)437 void NodeTable::onPacketReceived(
438 UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
439 {
440 try {
441 unique_ptr<DiscoveryDatagram> packet = DiscoveryDatagram::interpretUDP(_from, _packet);
442 if (!packet)
443 return;
444 if (packet->isExpired())
445 {
446 LOG(m_logger) << "Expired " << packet->typeName() << " from " << packet->sourceid << "@"
447 << _from;
448 return;
449 }
450
451 LOG(m_logger) << packet->typeName() << " from " << packet->sourceid << "@" << _from;
452
453 shared_ptr<NodeEntry> sourceNodeEntry;
454 switch (packet->packetType())
455 {
456 case Pong::type:
457 sourceNodeEntry = handlePong(_from, *packet);
458 break;
459
460 case Neighbours::type:
461 sourceNodeEntry = handleNeighbours(_from, *packet);
462 break;
463
464 case FindNode::type:
465 sourceNodeEntry = handleFindNode(_from, *packet);
466 break;
467
468 case PingNode::type:
469 sourceNodeEntry = handlePingNode(_from, *packet);
470 break;
471
472 case ENRRequest::type:
473 sourceNodeEntry = handleENRRequest(_from, *packet);
474 break;
475
476 case ENRResponse::type:
477 sourceNodeEntry = handleENRResponse(_from, *packet);
478 break;
479 }
480
481 if (sourceNodeEntry)
482 noteActiveNode(move(sourceNodeEntry));
483 }
484 catch (exception const& _e)
485 {
486 LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
487 << _from.port() << ": " << _e.what();
488 }
489 catch (...)
490 {
491 LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
492 << _from.port();
493 }
494 }
495
496
handlePong(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)497 shared_ptr<NodeEntry> NodeTable::handlePong(
498 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
499 {
500 // validate pong
501 auto const sentPing = m_sentPings.find(_from);
502 if (sentPing == m_sentPings.end())
503 {
504 LOG(m_logger) << "Unexpected PONG from " << _from.address().to_string() << ":"
505 << _from.port();
506 return {};
507 }
508
509 auto const& pong = dynamic_cast<Pong const&>(_packet);
510 auto const& nodeValidation = sentPing->second;
511 if (pong.echo != nodeValidation.pingHash)
512 {
513 LOG(m_logger) << "Invalid PONG from " << _from.address().to_string() << ":" << _from.port();
514 return {};
515 }
516
517 // in case the node answers with new NodeID, drop the record with the old NodeID
518 auto const& sourceId = pong.sourceid;
519 if (sourceId != nodeValidation.nodeID)
520 {
521 LOG(m_logger) << "Node " << _from << " changed public key from " << nodeValidation.nodeID
522 << " to " << sourceId;
523 if (auto node = nodeEntry(nodeValidation.nodeID))
524 dropNode(move(node));
525 }
526
527 // create or update nodeEntry with new Pong received time
528 shared_ptr<NodeEntry> sourceNodeEntry;
529 DEV_GUARDED(x_nodes)
530 {
531 auto it = m_allNodes.find(sourceId);
532 if (it == m_allNodes.end())
533 sourceNodeEntry = make_shared<NodeEntry>(m_hostNodeIDHash, sourceId,
534 NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort},
535 RLPXDatagramFace::secondsSinceEpoch(), 0 /* lastPongSentTime */);
536 else
537 {
538 sourceNodeEntry = it->second;
539 sourceNodeEntry->lastPongReceivedTime = RLPXDatagramFace::secondsSinceEpoch();
540
541 if (sourceNodeEntry->endpoint() != _from)
542 sourceNodeEntry->node.endpoint =
543 NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort};
544 }
545 }
546
547 m_sentPings.erase(_from);
548
549 // update our external endpoint address and UDP port
550 if (m_endpointTracker.addEndpointStatement(_from, pong.destination) >=
551 c_minEndpointTrackStatements)
552 {
553 auto newUdpEndpoint = m_endpointTracker.bestEndpoint();
554 if (!m_hostStaticIP.is_unspecified())
555 newUdpEndpoint.address(m_hostStaticIP);
556
557 if (newUdpEndpoint != m_hostNodeEndpoint)
558 {
559 m_hostNodeEndpoint = NodeIPEndpoint{
560 newUdpEndpoint.address(), newUdpEndpoint.port(), m_hostNodeEndpoint.tcpPort()};
561 {
562 Guard l(m_hostENRMutex);
563 m_hostENR =
564 IdentitySchemeV4::updateENR(m_hostENR, m_secret, m_hostNodeEndpoint.address(),
565 m_hostNodeEndpoint.tcpPort(), m_hostNodeEndpoint.udpPort());
566 }
567 clog(VerbosityInfo, "net") << "ENR updated: " << m_hostENR;
568 }
569 }
570
571 return sourceNodeEntry;
572 }
573
handleNeighbours(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)574 shared_ptr<NodeEntry> NodeTable::handleNeighbours(
575 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
576 {
577 shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
578 if (!sourceNodeEntry)
579 {
580 LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
581 << ") not found in node table. Ignoring Neighbours packet.";
582 return {};
583 }
584 if (sourceNodeEntry->endpoint() != _from)
585 {
586 LOG(m_logger) << "Neighbours packet from unexpected endpoint " << _from << " instead of "
587 << sourceNodeEntry->endpoint();
588 return {};
589 }
590
591 auto const& in = dynamic_cast<Neighbours const&>(_packet);
592
593 bool expected = false;
594 auto now = chrono::steady_clock::now();
595 m_sentFindNodes.remove_if([&](NodeIdTimePoint const& _t) noexcept {
596 if (_t.first != in.sourceid)
597 return false;
598 if (now - _t.second < c_reqTimeoutMs)
599 expected = true;
600 return true;
601 });
602 if (!expected)
603 {
604 LOG(m_logger) << "Dropping unsolicited neighbours packet from " << _packet.sourceid << "@"
605 << _from.address();
606 return sourceNodeEntry;
607 }
608
609 for (auto const& n : in.neighbours)
610 addNode(Node(n.node, n.endpoint));
611
612 return sourceNodeEntry;
613 }
614
handleFindNode(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)615 std::shared_ptr<NodeEntry> NodeTable::handleFindNode(
616 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
617 {
618 std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
619 if (!sourceNodeEntry)
620 {
621 LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
622 << ") not found in node table. Ignoring FindNode request.";
623 return {};
624 }
625 if (sourceNodeEntry->endpoint() != _from)
626 {
627 LOG(m_logger) << "FindNode packet from unexpected endpoint " << _from << " instead of "
628 << sourceNodeEntry->endpoint();
629 return {};
630 }
631 if (!sourceNodeEntry->lastPongReceivedTime)
632 {
633 LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof hasn't been performed yet.";
634 return {};
635 }
636 if (!sourceNodeEntry->hasValidEndpointProof())
637 {
638 LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof has expired.";
639 return {};
640 }
641
642 auto const& in = dynamic_cast<FindNode const&>(_packet);
643 vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
644 static unsigned constexpr nlimit = (NodeSocket::maxDatagramSize - 109) / 90;
645 for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
646 {
647 Neighbours out(_from, nearest, offset, nlimit);
648 out.expiration = nextRequestExpirationTime();
649 LOG(m_logger) << out.typeName() << " to " << in.sourceid << "@" << _from;
650 out.sign(m_secret);
651 if (out.data.size() > 1280)
652 cnetlog << "Sending truncated datagram, size: " << out.data.size();
653 m_socket->send(out);
654 }
655
656 return sourceNodeEntry;
657 }
658
handlePingNode(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)659 std::shared_ptr<NodeEntry> NodeTable::handlePingNode(
660 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
661 {
662 auto const& in = dynamic_cast<PingNode const&>(_packet);
663
664 NodeIPEndpoint sourceEndpoint{_from.address(), _from.port(), in.source.tcpPort()};
665 if (!addNode({in.sourceid, sourceEndpoint}))
666 return {}; // Need to have valid endpoint proof before adding node to node table.
667
668 // Send PONG response.
669 Pong p(sourceEndpoint);
670 LOG(m_logger) << p.typeName() << " to " << in.sourceid << "@" << _from;
671 p.expiration = nextRequestExpirationTime();
672 p.echo = in.echo;
673 p.seq = m_hostENR.sequenceNumber();
674 p.sign(m_secret);
675 m_socket->send(p);
676
677 // Quirk: when the node is a replacement node (that is, not added to the node table
678 // yet, but can be added after another node's eviction), it will not be returned
679 // from nodeEntry() and we won't update its lastPongSentTime. But that shouldn't be
680 // a big problem, at worst it can lead to more Ping-Pongs than needed.
681 std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
682 if (sourceNodeEntry)
683 sourceNodeEntry->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch();
684
685 return sourceNodeEntry;
686 }
687
handleENRRequest(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)688 std::shared_ptr<NodeEntry> NodeTable::handleENRRequest(
689 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
690 {
691 std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
692 if (!sourceNodeEntry)
693 {
694 LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
695 << ") not found in node table. Ignoring ENRRequest request.";
696 return {};
697 }
698 if (sourceNodeEntry->endpoint() != _from)
699 {
700 LOG(m_logger) << "ENRRequest packet from unexpected endpoint " << _from << " instead of "
701 << sourceNodeEntry->endpoint();
702 return {};
703 }
704 if (!sourceNodeEntry->lastPongReceivedTime)
705 {
706 LOG(m_logger) << "Unexpected ENRRequest packet! Endpoint proof hasn't been performed yet.";
707 return {};
708 }
709 if (!sourceNodeEntry->hasValidEndpointProof())
710 {
711 LOG(m_logger) << "Unexpected ENRRequest packet! Endpoint proof has expired.";
712 return {};
713 }
714
715 auto const& in = dynamic_cast<ENRRequest const&>(_packet);
716
717 ENRResponse response{_from, m_hostENR};
718 LOG(m_logger) << response.typeName() << " to " << in.sourceid << "@" << _from;
719 response.expiration = nextRequestExpirationTime();
720 response.echo = in.echo;
721 response.sign(m_secret);
722 m_socket->send(response);
723
724 return sourceNodeEntry;
725 }
726
handleENRResponse(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)727 std::shared_ptr<NodeEntry> NodeTable::handleENRResponse(
728 bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
729 {
730 std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
731 if (!sourceNodeEntry)
732 {
733 LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
734 << ") not found in node table. Ignoring ENRResponse packet.";
735 return {};
736 }
737 if (sourceNodeEntry->endpoint() != _from)
738 {
739 LOG(m_logger) << "ENRResponse packet from unexpected endpoint " << _from << " instead of "
740 << sourceNodeEntry->endpoint();
741 return {};
742 }
743
744 auto const& in = dynamic_cast<ENRResponse const&>(_packet);
745 LOG(m_logger) << "Received ENR: " << *in.enr;
746
747 return sourceNodeEntry;
748 }
749
750
doDiscovery()751 void NodeTable::doDiscovery()
752 {
753 m_discoveryTimer->expires_after(c_bucketRefreshMs);
754 auto discoveryTimer{m_discoveryTimer};
755 m_discoveryTimer->async_wait([this, discoveryTimer](boost::system::error_code const& _ec) {
756 // We can't use m_logger if an error occurred because captured this might be already
757 // destroyed
758 if (_ec.value() == boost::asio::error::operation_aborted ||
759 discoveryTimer->expiry() == c_steadyClockMin)
760 {
761 clog(VerbosityDebug, "discov") << "Discovery timer was cancelled";
762 return;
763 }
764 else if (_ec)
765 {
766 clog(VerbosityDebug, "discov")
767 << "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
768 return;
769 }
770
771 NodeID randNodeId;
772 crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
773 crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
774 LOG(m_logger) << "Starting discovery algorithm run for random node id: " << randNodeId;
775 doDiscoveryRound(randNodeId, 0 /* round */, make_shared<set<shared_ptr<NodeEntry>>>());
776 });
777 }
778
doHandleTimeouts()779 void NodeTable::doHandleTimeouts()
780 {
781 runBackgroundTask(c_handleTimeoutsIntervalMs, m_timeoutsTimer, [this]() {
782 vector<shared_ptr<NodeEntry>> nodesToActivate;
783 for (auto it = m_sentPings.begin(); it != m_sentPings.end();)
784 {
785 if (chrono::steady_clock::now() > it->second.pingSentTime + m_requestTimeToLive)
786 {
787 if (auto node = nodeEntry(it->second.nodeID))
788 {
789 dropNode(move(node));
790
791 // save the replacement node that should be activated
792 if (it->second.replacementNodeEntry)
793 nodesToActivate.emplace_back(move(it->second.replacementNodeEntry));
794 }
795
796 it = m_sentPings.erase(it);
797 }
798 else
799 ++it;
800 }
801
802 // activate replacement nodes and put them into buckets
803 for (auto const& n : nodesToActivate)
804 noteActiveNode(n);
805 });
806 }
807
doEndpointTracking()808 void NodeTable::doEndpointTracking()
809 {
810 runBackgroundTask(c_removeOldEndpointStatementsIntervalMs, m_endpointTrackingTimer,
811 [this]() { m_endpointTracker.garbageCollectStatements(c_endpointStatementTimeToLiveMin); });
812 }
813
runBackgroundTask(std::chrono::milliseconds const & _period,std::shared_ptr<ba::steady_timer> _timer,std::function<void ()> _f)814 void NodeTable::runBackgroundTask(std::chrono::milliseconds const& _period,
815 std::shared_ptr<ba::steady_timer> _timer, std::function<void()> _f)
816 {
817 _timer->expires_after(_period);
818 _timer->async_wait([=](boost::system::error_code const& _ec) {
819 // We can't use m_logger if an error occurred because captured this might be already
820 // destroyed
821 if (_ec.value() == boost::asio::error::operation_aborted ||
822 _timer->expiry() == c_steadyClockMin)
823 {
824 clog(VerbosityDebug, "discov") << "Timer was cancelled";
825 return;
826 }
827 else if (_ec)
828 {
829 clog(VerbosityDebug, "discov")
830 << "Timer error detected: " << _ec.value() << " " << _ec.message();
831 return;
832 }
833
834 _f();
835
836 runBackgroundTask(_period, move(_timer), move(_f));
837 });
838 }
839
cancelTimer(std::shared_ptr<ba::steady_timer> _timer)840 void NodeTable::cancelTimer(std::shared_ptr<ba::steady_timer> _timer)
841 {
842 // We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
843 // because cancel won't set the boost error code if the timers have already expired and
844 // the handlers are in the ready queue.
845 //
846 // Note that we "cancel" via io_context::post to ensure thread safety when accessing the
847 // timers
848 post(m_io, [_timer] { _timer->expires_at(c_steadyClockMin); });
849 }
850
interpretUDP(bi::udp::endpoint const & _from,bytesConstRef _packet)851 unique_ptr<DiscoveryDatagram> DiscoveryDatagram::interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet)
852 {
853 unique_ptr<DiscoveryDatagram> decoded;
854 // h256 + Signature + type + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
855 if (_packet.size() < h256::size + Signature::size + 1 + 3)
856 {
857 LOG(g_discoveryWarnLogger::get()) << "Invalid packet (too small) from "
858 << _from.address().to_string() << ":" << _from.port();
859 return decoded;
860 }
861 bytesConstRef hashedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
862 bytesConstRef signedBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
863 bytesConstRef signatureBytes(_packet.cropped(h256::size, Signature::size));
864 bytesConstRef bodyBytes(_packet.cropped(h256::size + Signature::size + 1));
865
866 h256 echo(sha3(hashedBytes));
867 if (!_packet.cropped(0, h256::size).contentsEqual(echo.asBytes()))
868 {
869 LOG(g_discoveryWarnLogger::get()) << "Invalid packet (bad hash) from "
870 << _from.address().to_string() << ":" << _from.port();
871 return decoded;
872 }
873 Public sourceid(dev::recover(*(Signature const*)signatureBytes.data(), sha3(signedBytes)));
874 if (!sourceid)
875 {
876 LOG(g_discoveryWarnLogger::get()) << "Invalid packet (bad signature) from "
877 << _from.address().to_string() << ":" << _from.port();
878 return decoded;
879 }
880 switch (signedBytes[0])
881 {
882 case PingNode::type:
883 decoded.reset(new PingNode(_from, sourceid, echo));
884 break;
885 case Pong::type:
886 decoded.reset(new Pong(_from, sourceid, echo));
887 break;
888 case FindNode::type:
889 decoded.reset(new FindNode(_from, sourceid, echo));
890 break;
891 case Neighbours::type:
892 decoded.reset(new Neighbours(_from, sourceid, echo));
893 break;
894 case ENRRequest::type:
895 decoded.reset(new ENRRequest(_from, sourceid, echo));
896 break;
897 case ENRResponse::type:
898 decoded.reset(new ENRResponse(_from, sourceid, echo));
899 break;
900 default:
901 LOG(g_discoveryWarnLogger::get()) << "Invalid packet (unknown packet type) from "
902 << _from.address().to_string() << ":" << _from.port();
903 return decoded;
904 }
905 decoded->interpretRLP(bodyBytes);
906 return decoded;
907 }
908
909 } // namespace p2p
910 } // namespace dev
911