1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "NodeTable.h"
6 using namespace std;
7 
8 namespace dev
9 {
10 namespace p2p
11 {
12 namespace
13 {
14 // global thread-safe logger for static methods
15 BOOST_LOG_INLINE_GLOBAL_LOGGER_CTOR_ARGS(g_discoveryWarnLogger,
16     boost::log::sources::severity_channel_logger_mt<>,
17     (boost::log::keywords::severity = 0)(boost::log::keywords::channel = "discov"))
18 
19 // Cadence at which we timeout sent pings and evict unresponsive nodes
20 constexpr chrono::milliseconds c_handleTimeoutsIntervalMs{5000};
21 // Cadence at which we remove old records from EndpointTracker
22 constexpr chrono::milliseconds c_removeOldEndpointStatementsIntervalMs{5000};
23 // Change external endpoint after this number of peers report new one
24 constexpr size_t c_minEndpointTrackStatements{10};
25 // Interval during which each endpoint statement is kept
26 constexpr std::chrono::minutes c_endpointStatementTimeToLiveMin{5};
27 
28 }  // namespace
29 
30 constexpr chrono::seconds DiscoveryDatagram::c_timeToLiveS;
31 constexpr chrono::milliseconds NodeTable::c_reqTimeoutMs;
32 constexpr chrono::milliseconds NodeTable::c_bucketRefreshMs;
33 constexpr chrono::milliseconds NodeTable::c_discoveryRoundIntervalMs;
34 
operator ==(weak_ptr<NodeEntry> const & _weak,shared_ptr<NodeEntry> const & _shared)35 inline bool operator==(weak_ptr<NodeEntry> const& _weak, shared_ptr<NodeEntry> const& _shared)
36 {
37     return !_weak.owner_before(_shared) && !_shared.owner_before(_weak);
38 }
39 
NodeTable(ba::io_context & _io,KeyPair const & _alias,NodeIPEndpoint const & _endpoint,ENR const & _enr,bool _enabled,bool _allowLocalDiscovery)40 NodeTable::NodeTable(ba::io_context& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint,
41     ENR const& _enr, bool _enabled, bool _allowLocalDiscovery)
42   : m_hostNodeID{_alias.pub()},
43     m_hostNodeIDHash{sha3(m_hostNodeID)},
44     m_hostStaticIP{isAllowedEndpoint(_endpoint) ? _endpoint.address() : bi::address{}},
45     m_hostNodeEndpoint{_enr.ip(), _enr.udpPort(), _enr.tcpPort()},
46     m_hostENR{_enr},
47     m_secret{_alias.secret()},
48     m_socket{make_shared<NodeSocket>(
49         _io, static_cast<UDPSocketEvents&>(*this), (bi::udp::endpoint)_endpoint)},
50     m_requestTimeToLive{DiscoveryDatagram::c_timeToLiveS},
51     m_allowLocalDiscovery{_allowLocalDiscovery},
52     m_discoveryTimer{make_shared<ba::steady_timer>(_io)},
53     m_timeoutsTimer{make_shared<ba::steady_timer>(_io)},
54     m_endpointTrackingTimer{make_shared<ba::steady_timer>(_io)},
55     m_io{_io}
56 {
57     for (unsigned i = 0; i < s_bins; i++)
58         m_buckets[i].distance = i;
59 
60     if (!_enabled)
61     {
62         cwarn << "\"_enabled\" parameter is false, discovery is disabled";
63         return;
64     }
65 
66     try
67     {
68         m_socket->connect();
69         doDiscovery();
70         doHandleTimeouts();
71         doEndpointTracking();
72     }
73     catch (exception const& _e)
74     {
75         cwarn << "Exception connecting NodeTable socket: " << _e.what();
76         cwarn << "Discovery disabled.";
77     }
78 }
79 
processEvents()80 void NodeTable::processEvents()
81 {
82     if (m_nodeEventHandler)
83         m_nodeEventHandler->processEvents();
84 }
85 
addNode(Node const & _node)86 bool NodeTable::addNode(Node const& _node)
87 {
88     LOG(m_logger) << "Adding node " << _node;
89 
90     if (!isValidNode(_node))
91         return false;
92 
93     bool needToPing = false;
94     DEV_GUARDED(x_nodes)
95     {
96         auto const it = m_allNodes.find(_node.id);
97         needToPing = (it == m_allNodes.end() || it->second->endpoint() != _node.endpoint ||
98                       !it->second->hasValidEndpointProof());
99     }
100 
101     if (needToPing)
102     {
103         LOG(m_logger) << "Pending " << _node;
104         schedulePing(_node);
105     }
106 
107     return true;
108 }
109 
addKnownNode(Node const & _node,uint32_t _lastPongReceivedTime,uint32_t _lastPongSentTime)110 bool NodeTable::addKnownNode(
111     Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime)
112 {
113     LOG(m_logger) << "Adding known node " << _node;
114 
115     if (!isValidNode(_node))
116         return false;
117 
118     if (nodeEntry(_node.id))
119     {
120         LOG(m_logger) << "Node " << _node << " is already in the node table";
121         return true;
122     }
123 
124     auto entry = make_shared<NodeEntry>(
125         m_hostNodeIDHash, _node.id, _node.endpoint, _lastPongReceivedTime, _lastPongSentTime);
126 
127     if (entry->hasValidEndpointProof())
128     {
129         LOG(m_logger) << "Known " << _node;
130         noteActiveNode(move(entry));
131     }
132     else
133     {
134         LOG(m_logger) << "Pending " << _node;
135         schedulePing(_node);
136     }
137 
138     return true;
139 }
140 
isValidNode(Node const & _node) const141 bool NodeTable::isValidNode(Node const& _node) const
142 {
143     if (!_node.endpoint || !_node.id)
144     {
145         LOG(m_logger) << "Supplied node " << _node
146                       << " has an invalid endpoint or id. Skipping adding node to node table.";
147         return false;
148     }
149 
150     if (!isAllowedEndpoint(_node.endpoint))
151     {
152         LOG(m_logger) << "Supplied node" << _node
153                       << " doesn't have an allowed endpoint. Skipping adding node to node table";
154         return false;
155     }
156 
157     if (m_hostNodeID == _node.id)
158     {
159         LOG(m_logger) << "Skip adding self to node table (" << _node.id << ")";
160         return false;
161     }
162 
163     return true;
164 }
165 
nodes() const166 list<NodeID> NodeTable::nodes() const
167 {
168     list<NodeID> nodes;
169     DEV_GUARDED(x_nodes)
170     {
171         for (auto& i : m_allNodes)
172             nodes.push_back(i.second->id());
173     }
174     return nodes;
175 }
176 
snapshot() const177 list<NodeEntry> NodeTable::snapshot() const
178 {
179     list<NodeEntry> ret;
180     DEV_GUARDED(x_state)
181     {
182         for (auto const& s : m_buckets)
183             for (auto const& np : s.nodes)
184                 if (auto n = np.lock())
185                     ret.push_back(*n);
186     }
187     return ret;
188 }
189 
node(NodeID const & _id)190 Node NodeTable::node(NodeID const& _id)
191 {
192     Guard l(x_nodes);
193     auto const it = m_allNodes.find(_id);
194     if (it != m_allNodes.end())
195     {
196         auto const& entry = it->second;
197         return Node(_id, entry->endpoint(), entry->peerType());
198     }
199     return UnspecifiedNode;
200 }
201 
nodeEntry(NodeID const & _id)202 shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeID const& _id)
203 {
204     Guard l(x_nodes);
205     auto const it = m_allNodes.find(_id);
206     return it != m_allNodes.end() ? it->second : shared_ptr<NodeEntry>();
207 }
208 
doDiscoveryRound(NodeID _node,unsigned _round,shared_ptr<set<shared_ptr<NodeEntry>>> _tried)209 void NodeTable::doDiscoveryRound(
210     NodeID _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
211 {
212     // NOTE: ONLY called by doDiscovery or "recursively" via lambda scheduled via timer at
213     // the end of this function
214     if (!m_socket->isOpen())
215         return;
216 
217     // send s_alpha FindNode packets to nodes we know, closest to target
218     auto const nearestNodes = nearestNodeEntries(_node);
219     auto newTriedCount = 0;
220     for (auto const& nodeEntry : nearestNodes)
221     {
222         if (!contains(*_tried, nodeEntry))
223         {
224             // Avoid sending FindNode, if we have not sent a valid PONG lately.
225             // This prevents being considered invalid node and FindNode being ignored.
226             if (!nodeEntry->hasValidEndpointProof())
227             {
228                 LOG(m_logger) << "Node " << nodeEntry->node << " endpoint proof expired.";
229                 ping(nodeEntry->node);
230                 continue;
231             }
232 
233             FindNode p(nodeEntry->endpoint(), _node);
234             p.expiration = nextRequestExpirationTime();
235             p.sign(m_secret);
236             m_sentFindNodes.emplace_back(nodeEntry->id(), chrono::steady_clock::now());
237             LOG(m_logger) << p.typeName() << " to " << nodeEntry->node << " (target: " << _node
238                           << ")";
239             m_socket->send(p);
240 
241             _tried->emplace(nodeEntry);
242             if (++newTriedCount == s_alpha)
243                 break;
244         }
245     }
246 
247     if (_round == s_maxSteps || newTriedCount == 0)
248     {
249         LOG(m_logger) << "Terminating discover after " << _round << " rounds.";
250         doDiscovery();
251         return;
252     }
253 
254     m_discoveryTimer->expires_after(c_discoveryRoundIntervalMs);
255     auto discoveryTimer{m_discoveryTimer};
256     m_discoveryTimer->async_wait(
257         [this, discoveryTimer, _node, _round, _tried](boost::system::error_code const& _ec) {
258             // We can't use m_logger here if there's an error because captured this might already be
259             // destroyed
260             if (_ec.value() == boost::asio::error::operation_aborted ||
261                 discoveryTimer->expiry() == c_steadyClockMin)
262             {
263                 clog(VerbosityDebug, "discov") << "Discovery timer was probably cancelled";
264                 return;
265             }
266             else if (_ec)
267             {
268                 clog(VerbosityDebug, "discov")
269                     << "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
270                 return;
271             }
272 
273             doDiscoveryRound(_node, _round + 1, _tried);
274         });
275 }
276 
nearestNodeEntries(NodeID const & _target)277 vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeID const& _target)
278 {
279     auto const distanceToTargetLess = [](pair<int, shared_ptr<NodeEntry>> const& _node1,
280                                           pair<int, shared_ptr<NodeEntry>> const& _node2) {
281         return _node1.first < _node2.first;
282     };
283 
284     h256 const targetHash = sha3(_target);
285 
286     std::multiset<pair<int, shared_ptr<NodeEntry>>, decltype(distanceToTargetLess)>
287         nodesByDistanceToTarget(distanceToTargetLess);
288     for (auto const& bucket : m_buckets)
289         for (auto const& nodeWeakPtr : bucket.nodes)
290             if (auto node = nodeWeakPtr.lock())
291             {
292                 nodesByDistanceToTarget.emplace(distance(targetHash, node->nodeIDHash), node);
293 
294                 if (nodesByDistanceToTarget.size() > s_bucketSize)
295                     nodesByDistanceToTarget.erase(--nodesByDistanceToTarget.end());
296             }
297 
298     vector<shared_ptr<NodeEntry>> ret;
299     for (auto& distanceAndNode : nodesByDistanceToTarget)
300         ret.emplace_back(move(distanceAndNode.second));
301 
302     return ret;
303 }
304 
ping(Node const & _node,shared_ptr<NodeEntry> _replacementNodeEntry)305 void NodeTable::ping(Node const& _node, shared_ptr<NodeEntry> _replacementNodeEntry)
306 {
307     if (!m_socket->isOpen())
308         return;
309 
310     // Don't send Ping if one is already sent
311     if (m_sentPings.find(_node.endpoint) != m_sentPings.end())
312     {
313         LOG(m_logger) << "Ignoring request to ping " << _node << ", because it's already pinged";
314         return;
315     }
316 
317     PingNode p{m_hostNodeEndpoint, _node.endpoint};
318     p.expiration = nextRequestExpirationTime();
319     p.seq = m_hostENR.sequenceNumber();
320     auto const pingHash = p.sign(m_secret);
321     LOG(m_logger) << p.typeName() << " to " << _node;
322     m_socket->send(p);
323 
324     NodeValidation const validation{_node.id, _node.endpoint.tcpPort(), chrono::steady_clock::now(),
325         pingHash, _replacementNodeEntry};
326     m_sentPings.insert({_node.endpoint, validation});
327 }
328 
schedulePing(Node const & _node)329 void NodeTable::schedulePing(Node const& _node)
330 {
331     post(m_io, [this, _node] { ping(_node, {}); });
332 }
333 
evict(NodeEntry const & _leastSeen,shared_ptr<NodeEntry> _replacement)334 void NodeTable::evict(NodeEntry const& _leastSeen, shared_ptr<NodeEntry> _replacement)
335 {
336     if (!m_socket->isOpen())
337         return;
338 
339     LOG(m_logger) << "Evicting node " << _leastSeen.node;
340     ping(_leastSeen.node, move(_replacement));
341 
342     if (m_nodeEventHandler)
343         m_nodeEventHandler->appendEvent(_leastSeen.id(), NodeEntryScheduledForEviction);
344 }
345 
noteActiveNode(shared_ptr<NodeEntry> _nodeEntry)346 void NodeTable::noteActiveNode(shared_ptr<NodeEntry> _nodeEntry)
347 {
348     assert(_nodeEntry);
349 
350     if (_nodeEntry->id() == m_hostNodeID)
351     {
352         LOG(m_logger) << "Skipping making self active.";
353         return;
354     }
355     if (!isAllowedEndpoint(_nodeEntry->endpoint()))
356     {
357         LOG(m_logger) << "Skipping making node with unallowed endpoint active. Node "
358                       << _nodeEntry->node;
359         return;
360     }
361 
362     if (!_nodeEntry->hasValidEndpointProof())
363         return;
364 
365     LOG(m_logger) << "Active node " << _nodeEntry->node;
366 
367     shared_ptr<NodeEntry> nodeToEvict;
368     {
369         Guard l(x_state);
370         // Find a bucket to put a node to
371         NodeBucket& s = bucket_UNSAFE(_nodeEntry.get());
372         auto& nodes = s.nodes;
373 
374         // check if the node is already in the bucket
375         auto it = find(nodes.begin(), nodes.end(), _nodeEntry);
376         if (it != nodes.end())
377         {
378             // if it was in the bucket, move it to the last position
379             nodes.splice(nodes.end(), nodes, it);
380         }
381         else
382         {
383             if (nodes.size() < s_bucketSize)
384             {
385                 // if it was not there, just add it as a most recently seen node
386                 // (i.e. to the end of the list)
387                 nodes.push_back(_nodeEntry);
388                 DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id(), _nodeEntry}); }
389                 if (m_nodeEventHandler)
390                     m_nodeEventHandler->appendEvent(_nodeEntry->id(), NodeEntryAdded);
391             }
392             else
393             {
394                 // if bucket is full, start eviction process for the least recently seen node
395                 nodeToEvict = nodes.front().lock();
396                 // It could have been replaced in addNode(), then weak_ptr is expired.
397                 // If so, just add a new one instead of expired
398                 if (!nodeToEvict)
399                 {
400                     nodes.pop_front();
401                     nodes.push_back(_nodeEntry);
402                     DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id(), _nodeEntry}); }
403                     if (m_nodeEventHandler)
404                         m_nodeEventHandler->appendEvent(_nodeEntry->id(), NodeEntryAdded);
405                 }
406             }
407         }
408     }
409 
410     if (nodeToEvict)
411         evict(*nodeToEvict, _nodeEntry);
412 }
413 
dropNode(shared_ptr<NodeEntry> _n)414 void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
415 {
416     // remove from nodetable
417     {
418         Guard l(x_state);
419         NodeBucket& s = bucket_UNSAFE(_n.get());
420         s.nodes.remove_if(
421             [_n](weak_ptr<NodeEntry> const& _bucketEntry) { return _bucketEntry == _n; });
422     }
423 
424     DEV_GUARDED(x_nodes) { m_allNodes.erase(_n->id()); }
425 
426     // notify host
427     LOG(m_logger) << "p2p.nodes.drop " << _n->id();
428     if (m_nodeEventHandler)
429         m_nodeEventHandler->appendEvent(_n->id(), NodeEntryDropped);
430 }
431 
bucket_UNSAFE(NodeEntry const * _n)432 NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
433 {
434     return m_buckets[_n->distance - 1];
435 }
436 
onPacketReceived(UDPSocketFace *,bi::udp::endpoint const & _from,bytesConstRef _packet)437 void NodeTable::onPacketReceived(
438     UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
439 {
440     try {
441         unique_ptr<DiscoveryDatagram> packet = DiscoveryDatagram::interpretUDP(_from, _packet);
442         if (!packet)
443             return;
444         if (packet->isExpired())
445         {
446             LOG(m_logger) << "Expired " << packet->typeName() << " from " << packet->sourceid << "@"
447                           << _from;
448             return;
449         }
450 
451         LOG(m_logger) << packet->typeName() << " from " << packet->sourceid << "@" << _from;
452 
453         shared_ptr<NodeEntry> sourceNodeEntry;
454         switch (packet->packetType())
455         {
456         case Pong::type:
457             sourceNodeEntry = handlePong(_from, *packet);
458             break;
459 
460         case Neighbours::type:
461             sourceNodeEntry = handleNeighbours(_from, *packet);
462             break;
463 
464         case FindNode::type:
465             sourceNodeEntry = handleFindNode(_from, *packet);
466             break;
467 
468         case PingNode::type:
469             sourceNodeEntry = handlePingNode(_from, *packet);
470             break;
471 
472         case ENRRequest::type:
473             sourceNodeEntry = handleENRRequest(_from, *packet);
474             break;
475 
476         case ENRResponse::type:
477             sourceNodeEntry = handleENRResponse(_from, *packet);
478             break;
479         }
480 
481         if (sourceNodeEntry)
482             noteActiveNode(move(sourceNodeEntry));
483     }
484     catch (exception const& _e)
485     {
486         LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
487                       << _from.port() << ": " << _e.what();
488     }
489     catch (...)
490     {
491         LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
492                       << _from.port();
493     }
494 }
495 
496 
handlePong(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)497 shared_ptr<NodeEntry> NodeTable::handlePong(
498     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
499 {
500     // validate pong
501     auto const sentPing = m_sentPings.find(_from);
502     if (sentPing == m_sentPings.end())
503     {
504         LOG(m_logger) << "Unexpected PONG from " << _from.address().to_string() << ":"
505                       << _from.port();
506         return {};
507     }
508 
509     auto const& pong = dynamic_cast<Pong const&>(_packet);
510     auto const& nodeValidation = sentPing->second;
511     if (pong.echo != nodeValidation.pingHash)
512     {
513         LOG(m_logger) << "Invalid PONG from " << _from.address().to_string() << ":" << _from.port();
514         return {};
515     }
516 
517     // in case the node answers with new NodeID, drop the record with the old NodeID
518     auto const& sourceId = pong.sourceid;
519     if (sourceId != nodeValidation.nodeID)
520     {
521         LOG(m_logger) << "Node " << _from << " changed public key from " << nodeValidation.nodeID
522                       << " to " << sourceId;
523         if (auto node = nodeEntry(nodeValidation.nodeID))
524             dropNode(move(node));
525     }
526 
527     // create or update nodeEntry with new Pong received time
528     shared_ptr<NodeEntry> sourceNodeEntry;
529     DEV_GUARDED(x_nodes)
530     {
531         auto it = m_allNodes.find(sourceId);
532         if (it == m_allNodes.end())
533             sourceNodeEntry = make_shared<NodeEntry>(m_hostNodeIDHash, sourceId,
534                 NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort},
535                 RLPXDatagramFace::secondsSinceEpoch(), 0 /* lastPongSentTime */);
536         else
537         {
538             sourceNodeEntry = it->second;
539             sourceNodeEntry->lastPongReceivedTime = RLPXDatagramFace::secondsSinceEpoch();
540 
541             if (sourceNodeEntry->endpoint() != _from)
542                 sourceNodeEntry->node.endpoint =
543                     NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort};
544         }
545     }
546 
547     m_sentPings.erase(_from);
548 
549     // update our external endpoint address and UDP port
550     if (m_endpointTracker.addEndpointStatement(_from, pong.destination) >=
551         c_minEndpointTrackStatements)
552     {
553         auto newUdpEndpoint = m_endpointTracker.bestEndpoint();
554         if (!m_hostStaticIP.is_unspecified())
555             newUdpEndpoint.address(m_hostStaticIP);
556 
557         if (newUdpEndpoint != m_hostNodeEndpoint)
558         {
559             m_hostNodeEndpoint = NodeIPEndpoint{
560                 newUdpEndpoint.address(), newUdpEndpoint.port(), m_hostNodeEndpoint.tcpPort()};
561             {
562                 Guard l(m_hostENRMutex);
563                 m_hostENR =
564                     IdentitySchemeV4::updateENR(m_hostENR, m_secret, m_hostNodeEndpoint.address(),
565                         m_hostNodeEndpoint.tcpPort(), m_hostNodeEndpoint.udpPort());
566             }
567             clog(VerbosityInfo, "net") << "ENR updated: " << m_hostENR;
568         }
569     }
570 
571     return sourceNodeEntry;
572 }
573 
handleNeighbours(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)574 shared_ptr<NodeEntry> NodeTable::handleNeighbours(
575     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
576 {
577     shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
578     if (!sourceNodeEntry)
579     {
580         LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
581                       << ") not found in node table. Ignoring Neighbours packet.";
582         return {};
583     }
584     if (sourceNodeEntry->endpoint() != _from)
585     {
586         LOG(m_logger) << "Neighbours packet from unexpected endpoint " << _from << " instead of "
587                       << sourceNodeEntry->endpoint();
588         return {};
589     }
590 
591     auto const& in = dynamic_cast<Neighbours const&>(_packet);
592 
593     bool expected = false;
594     auto now = chrono::steady_clock::now();
595     m_sentFindNodes.remove_if([&](NodeIdTimePoint const& _t) noexcept {
596         if (_t.first != in.sourceid)
597             return false;
598         if (now - _t.second < c_reqTimeoutMs)
599             expected = true;
600         return true;
601     });
602     if (!expected)
603     {
604         LOG(m_logger) << "Dropping unsolicited neighbours packet from " << _packet.sourceid << "@"
605                       << _from.address();
606         return sourceNodeEntry;
607     }
608 
609     for (auto const& n : in.neighbours)
610         addNode(Node(n.node, n.endpoint));
611 
612     return sourceNodeEntry;
613 }
614 
handleFindNode(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)615 std::shared_ptr<NodeEntry> NodeTable::handleFindNode(
616     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
617 {
618     std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
619     if (!sourceNodeEntry)
620     {
621         LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
622                       << ") not found in node table. Ignoring FindNode request.";
623         return {};
624     }
625     if (sourceNodeEntry->endpoint() != _from)
626     {
627         LOG(m_logger) << "FindNode packet from unexpected endpoint " << _from << " instead of "
628                       << sourceNodeEntry->endpoint();
629         return {};
630     }
631     if (!sourceNodeEntry->lastPongReceivedTime)
632     {
633         LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof hasn't been performed yet.";
634         return {};
635     }
636     if (!sourceNodeEntry->hasValidEndpointProof())
637     {
638         LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof has expired.";
639         return {};
640     }
641 
642     auto const& in = dynamic_cast<FindNode const&>(_packet);
643     vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
644     static unsigned constexpr nlimit = (NodeSocket::maxDatagramSize - 109) / 90;
645     for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
646     {
647         Neighbours out(_from, nearest, offset, nlimit);
648         out.expiration = nextRequestExpirationTime();
649         LOG(m_logger) << out.typeName() << " to " << in.sourceid << "@" << _from;
650         out.sign(m_secret);
651         if (out.data.size() > 1280)
652             cnetlog << "Sending truncated datagram, size: " << out.data.size();
653         m_socket->send(out);
654     }
655 
656     return sourceNodeEntry;
657 }
658 
handlePingNode(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)659 std::shared_ptr<NodeEntry> NodeTable::handlePingNode(
660     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
661 {
662     auto const& in = dynamic_cast<PingNode const&>(_packet);
663 
664     NodeIPEndpoint sourceEndpoint{_from.address(), _from.port(), in.source.tcpPort()};
665     if (!addNode({in.sourceid, sourceEndpoint}))
666         return {};  // Need to have valid endpoint proof before adding node to node table.
667 
668     // Send PONG response.
669     Pong p(sourceEndpoint);
670     LOG(m_logger) << p.typeName() << " to " << in.sourceid << "@" << _from;
671     p.expiration = nextRequestExpirationTime();
672     p.echo = in.echo;
673     p.seq = m_hostENR.sequenceNumber();
674     p.sign(m_secret);
675     m_socket->send(p);
676 
677     // Quirk: when the node is a replacement node (that is, not added to the node table
678     // yet, but can be added after another node's eviction), it will not be returned
679     // from nodeEntry() and we won't update its lastPongSentTime. But that shouldn't be
680     // a big problem, at worst it can lead to more Ping-Pongs than needed.
681     std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
682     if (sourceNodeEntry)
683         sourceNodeEntry->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch();
684 
685     return sourceNodeEntry;
686 }
687 
handleENRRequest(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)688 std::shared_ptr<NodeEntry> NodeTable::handleENRRequest(
689     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
690 {
691     std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
692     if (!sourceNodeEntry)
693     {
694         LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
695                       << ") not found in node table. Ignoring ENRRequest request.";
696         return {};
697     }
698     if (sourceNodeEntry->endpoint() != _from)
699     {
700         LOG(m_logger) << "ENRRequest packet from unexpected endpoint " << _from << " instead of "
701                       << sourceNodeEntry->endpoint();
702         return {};
703     }
704     if (!sourceNodeEntry->lastPongReceivedTime)
705     {
706         LOG(m_logger) << "Unexpected ENRRequest packet! Endpoint proof hasn't been performed yet.";
707         return {};
708     }
709     if (!sourceNodeEntry->hasValidEndpointProof())
710     {
711         LOG(m_logger) << "Unexpected ENRRequest packet! Endpoint proof has expired.";
712         return {};
713     }
714 
715     auto const& in = dynamic_cast<ENRRequest const&>(_packet);
716 
717     ENRResponse response{_from, m_hostENR};
718     LOG(m_logger) << response.typeName() << " to " << in.sourceid << "@" << _from;
719     response.expiration = nextRequestExpirationTime();
720     response.echo = in.echo;
721     response.sign(m_secret);
722     m_socket->send(response);
723 
724     return sourceNodeEntry;
725 }
726 
handleENRResponse(bi::udp::endpoint const & _from,DiscoveryDatagram const & _packet)727 std::shared_ptr<NodeEntry> NodeTable::handleENRResponse(
728     bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
729 {
730     std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
731     if (!sourceNodeEntry)
732     {
733         LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
734                       << ") not found in node table. Ignoring ENRResponse packet.";
735         return {};
736     }
737     if (sourceNodeEntry->endpoint() != _from)
738     {
739         LOG(m_logger) << "ENRResponse packet from unexpected endpoint " << _from << " instead of "
740                       << sourceNodeEntry->endpoint();
741         return {};
742     }
743 
744     auto const& in = dynamic_cast<ENRResponse const&>(_packet);
745     LOG(m_logger) << "Received ENR: " << *in.enr;
746 
747     return sourceNodeEntry;
748 }
749 
750 
doDiscovery()751 void NodeTable::doDiscovery()
752 {
753     m_discoveryTimer->expires_after(c_bucketRefreshMs);
754     auto discoveryTimer{m_discoveryTimer};
755     m_discoveryTimer->async_wait([this, discoveryTimer](boost::system::error_code const& _ec) {
756         // We can't use m_logger if an error occurred because captured this might be already
757         // destroyed
758         if (_ec.value() == boost::asio::error::operation_aborted ||
759             discoveryTimer->expiry() == c_steadyClockMin)
760         {
761             clog(VerbosityDebug, "discov") << "Discovery timer was cancelled";
762             return;
763         }
764         else if (_ec)
765         {
766             clog(VerbosityDebug, "discov")
767                 << "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
768             return;
769         }
770 
771         NodeID randNodeId;
772         crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
773         crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
774         LOG(m_logger) << "Starting discovery algorithm run for random node id: " << randNodeId;
775         doDiscoveryRound(randNodeId, 0 /* round */, make_shared<set<shared_ptr<NodeEntry>>>());
776     });
777 }
778 
doHandleTimeouts()779 void NodeTable::doHandleTimeouts()
780 {
781     runBackgroundTask(c_handleTimeoutsIntervalMs, m_timeoutsTimer, [this]() {
782         vector<shared_ptr<NodeEntry>> nodesToActivate;
783         for (auto it = m_sentPings.begin(); it != m_sentPings.end();)
784         {
785             if (chrono::steady_clock::now() > it->second.pingSentTime + m_requestTimeToLive)
786             {
787                 if (auto node = nodeEntry(it->second.nodeID))
788                 {
789                     dropNode(move(node));
790 
791                     // save the replacement node that should be activated
792                     if (it->second.replacementNodeEntry)
793                         nodesToActivate.emplace_back(move(it->second.replacementNodeEntry));
794                 }
795 
796                 it = m_sentPings.erase(it);
797             }
798             else
799                 ++it;
800         }
801 
802         // activate replacement nodes and put them into buckets
803         for (auto const& n : nodesToActivate)
804             noteActiveNode(n);
805     });
806 }
807 
doEndpointTracking()808 void NodeTable::doEndpointTracking()
809 {
810     runBackgroundTask(c_removeOldEndpointStatementsIntervalMs, m_endpointTrackingTimer,
811         [this]() { m_endpointTracker.garbageCollectStatements(c_endpointStatementTimeToLiveMin); });
812 }
813 
runBackgroundTask(std::chrono::milliseconds const & _period,std::shared_ptr<ba::steady_timer> _timer,std::function<void ()> _f)814 void NodeTable::runBackgroundTask(std::chrono::milliseconds const& _period,
815     std::shared_ptr<ba::steady_timer> _timer, std::function<void()> _f)
816 {
817     _timer->expires_after(_period);
818     _timer->async_wait([=](boost::system::error_code const& _ec) {
819         // We can't use m_logger if an error occurred because captured this might be already
820         // destroyed
821         if (_ec.value() == boost::asio::error::operation_aborted ||
822             _timer->expiry() == c_steadyClockMin)
823         {
824             clog(VerbosityDebug, "discov") << "Timer was cancelled";
825             return;
826         }
827         else if (_ec)
828         {
829             clog(VerbosityDebug, "discov")
830                 << "Timer error detected: " << _ec.value() << " " << _ec.message();
831             return;
832         }
833 
834         _f();
835 
836         runBackgroundTask(_period, move(_timer), move(_f));
837     });
838 }
839 
cancelTimer(std::shared_ptr<ba::steady_timer> _timer)840 void NodeTable::cancelTimer(std::shared_ptr<ba::steady_timer> _timer)
841 {
842     // We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
843     // because cancel won't set the boost error code if the timers have already expired and
844     // the handlers are in the ready queue.
845     //
846     // Note that we "cancel" via io_context::post to ensure thread safety when accessing the
847     // timers
848     post(m_io, [_timer] { _timer->expires_at(c_steadyClockMin); });
849 }
850 
interpretUDP(bi::udp::endpoint const & _from,bytesConstRef _packet)851 unique_ptr<DiscoveryDatagram> DiscoveryDatagram::interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet)
852 {
853     unique_ptr<DiscoveryDatagram> decoded;
854     // h256 + Signature + type + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
855     if (_packet.size() < h256::size + Signature::size + 1 + 3)
856     {
857         LOG(g_discoveryWarnLogger::get()) << "Invalid packet (too small) from "
858                                           << _from.address().to_string() << ":" << _from.port();
859         return decoded;
860     }
861     bytesConstRef hashedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
862     bytesConstRef signedBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
863     bytesConstRef signatureBytes(_packet.cropped(h256::size, Signature::size));
864     bytesConstRef bodyBytes(_packet.cropped(h256::size + Signature::size + 1));
865 
866     h256 echo(sha3(hashedBytes));
867     if (!_packet.cropped(0, h256::size).contentsEqual(echo.asBytes()))
868     {
869         LOG(g_discoveryWarnLogger::get()) << "Invalid packet (bad hash) from "
870                                           << _from.address().to_string() << ":" << _from.port();
871         return decoded;
872     }
873     Public sourceid(dev::recover(*(Signature const*)signatureBytes.data(), sha3(signedBytes)));
874     if (!sourceid)
875     {
876         LOG(g_discoveryWarnLogger::get()) << "Invalid packet (bad signature) from "
877                                           << _from.address().to_string() << ":" << _from.port();
878         return decoded;
879     }
880     switch (signedBytes[0])
881     {
882     case PingNode::type:
883         decoded.reset(new PingNode(_from, sourceid, echo));
884         break;
885     case Pong::type:
886         decoded.reset(new Pong(_from, sourceid, echo));
887         break;
888     case FindNode::type:
889         decoded.reset(new FindNode(_from, sourceid, echo));
890         break;
891     case Neighbours::type:
892         decoded.reset(new Neighbours(_from, sourceid, echo));
893         break;
894     case ENRRequest::type:
895         decoded.reset(new ENRRequest(_from, sourceid, echo));
896         break;
897     case ENRResponse::type:
898         decoded.reset(new ENRResponse(_from, sourceid, echo));
899         break;
900     default:
901         LOG(g_discoveryWarnLogger::get()) << "Invalid packet (unknown packet type) from "
902                                           << _from.address().to_string() << ":" << _from.port();
903         return decoded;
904     }
905     decoded->interpretRLP(bodyBytes);
906     return decoded;
907 }
908 
909 }  // namespace p2p
910 }  // namespace dev
911