1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "Session.h"
6 
7 #include "Host.h"
8 #include "RLPXFrameCoder.h"
9 #include <libdevcore/Common.h>
10 #include <libdevcore/CommonIO.h>
11 #include <libdevcore/Exceptions.h>
12 #include <chrono>
13 
14 using namespace std;
15 using namespace dev;
16 using namespace dev::p2p;
17 
Session(Host * _h,unique_ptr<RLPXFrameCoder> && _io,std::shared_ptr<RLPXSocket> const & _s,std::shared_ptr<Peer> const & _n,PeerSessionInfo _info)18 Session::Session(Host* _h, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s,
19     std::shared_ptr<Peer> const& _n, PeerSessionInfo _info)
20   : m_server(_h),
21     m_io(move(_io)),
22     m_socket(_s),
23     m_peer(_n),
24     m_info(_info),
25     m_ping(chrono::steady_clock::time_point::max())
26 {
27     stringstream remoteInfoStream;
28     remoteInfoStream << "(" << m_info.id << "@" << m_socket->remoteEndpoint() << ")";
29 
30     m_logSuffix = remoteInfoStream.str();
31 
32     auto const attr = boost::log::attributes::constant<std::string>{remoteInfoStream.str()};
33     m_netLogger.add_attribute("Suffix", attr);
34     m_netLoggerDetail.add_attribute("Suffix", attr);
35     m_netLoggerError.add_attribute("Suffix", attr);
36 
37     m_capLogger.add_attribute("Suffix", attr);
38     m_capLoggerDetail.add_attribute("Suffix", attr);
39 
40     m_peer->m_lastDisconnect = NoDisconnect;
41     m_lastReceived = m_connect = chrono::steady_clock::now();
42 }
43 
~Session()44 Session::~Session()
45 {
46     cnetlog << "Closing peer session with " << m_logSuffix;
47 
48     m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
49 
50     // Read-chain finished for one reason or another.
51     for (auto& i : m_capabilities)
52     {
53         i.second->onDisconnect(id());
54         i.second.reset();
55     }
56 
57     try
58     {
59         bi::tcp::socket& socket = m_socket->ref();
60         if (socket.is_open())
61         {
62             boost::system::error_code ec;
63             socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
64             socket.close();
65         }
66     }
67     catch (...){}
68 }
69 
repMan()70 ReputationManager& Session::repMan()
71 {
72     return m_server->repMan();
73 }
74 
id() const75 NodeID Session::id() const
76 {
77     return m_peer ? m_peer->id : NodeID();
78 }
79 
addRating(int _r)80 void Session::addRating(int _r)
81 {
82     if (m_peer)
83     {
84         m_peer->m_rating += _r;
85         m_peer->m_score += _r;
86         if (_r >= 0)
87             m_peer->noteSessionGood();
88     }
89 }
90 
rating() const91 int Session::rating() const
92 {
93     return m_peer->m_rating;
94 }
95 
readPacket(uint16_t _capId,unsigned _packetType,RLP const & _r)96 bool Session::readPacket(uint16_t _capId, unsigned _packetType, RLP const& _r)
97 {
98     m_lastReceived = chrono::steady_clock::now();
99     LOG(m_netLoggerDetail) << "Received " << capabilityPacketTypeToString(_packetType) << " ("
100                            << _packetType << ") from";
101     try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
102     {
103         // v4 frame headers are useless, offset packet type used
104         // v5 protocol type is in header, packet type not offset
105         if (_capId == 0 && _packetType < UserPacket)
106             return interpretP2pPacket(static_cast<P2pPacketType>(_packetType), _r);
107 
108         for (auto const& cap : m_capabilities)
109         {
110             auto const& name = cap.first.first;
111             auto const& capability = cap.second;
112 
113             if (canHandle(name, capability->messageCount(), _packetType))
114             {
115                 if (!capabilityEnabled(name))
116                     return true;
117 
118                 auto offset = capabilityOffset(name);
119                 assert(offset);
120                 return capability->interpretCapabilityPacket(id(), _packetType - *offset, _r);
121             }
122         }
123 
124         return false;
125     }
126     catch (std::exception const& _e)
127     {
128         LOG(m_netLogger) << "Exception caught in p2p::Session::readPacket(): " << _e.what()
129                          << ". PacketType: " << capabilityPacketTypeToString(_packetType) << " ("
130                          << _packetType << "). RLP: " << _r;
131         disconnect(BadProtocol);
132         return true;
133     }
134     return true;
135 }
136 
interpretP2pPacket(P2pPacketType _t,RLP const & _r)137 bool Session::interpretP2pPacket(P2pPacketType _t, RLP const& _r)
138 {
139     LOG(m_capLoggerDetail) << p2pPacketTypeToString(_t) << " from";
140     switch (_t)
141     {
142     case DisconnectPacket:
143     {
144         string reason = "Unspecified";
145         auto r = (DisconnectReason)_r[0].toInt<int>();
146         if (!_r[0].isInt())
147             drop(BadProtocol);
148         else
149         {
150             reason = reasonOf(r);
151             LOG(m_capLogger) << "Disconnect (reason: " << reason << ") from";
152             drop(DisconnectRequested);
153         }
154         break;
155     }
156     case PingPacket:
157     {
158         LOG(m_capLoggerDetail) << "Pong to";
159         RLPStream s;
160         sealAndSend(prep(s, PongPacket));
161         break;
162     }
163     case PongPacket:
164         DEV_GUARDED(x_info)
165         {
166             m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
167             LOG(m_capLoggerDetail)
168                 << "Ping latency: "
169                 << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
170         }
171         break;
172     default:
173         return false;
174     }
175     return true;
176 }
177 
ping()178 void Session::ping()
179 {
180     clog(VerbosityTrace, "p2pcap") << "Ping to " << m_logSuffix;
181     RLPStream s;
182     sealAndSend(prep(s, PingPacket));
183     m_ping = std::chrono::steady_clock::now();
184 }
185 
prep(RLPStream & _s,P2pPacketType _id,unsigned _args)186 RLPStream& Session::prep(RLPStream& _s, P2pPacketType _id, unsigned _args)
187 {
188     return _s.append((unsigned)_id).appendList(_args);
189 }
190 
sealAndSend(RLPStream & _s)191 void Session::sealAndSend(RLPStream& _s)
192 {
193     bytes b;
194     _s.swapOut(b);
195     send(move(b));
196 }
197 
checkPacket(bytesConstRef _msg)198 bool Session::checkPacket(bytesConstRef _msg)
199 {
200     if (_msg[0] > 0x7f || _msg.size() < 2)
201         return false;
202     if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
203         return false;
204     return true;
205 }
206 
send(bytes && _msg)207 void Session::send(bytes&& _msg)
208 {
209     bytesConstRef msg(&_msg);
210     LOG(m_netLoggerDetail) << capabilityPacketTypeToString(_msg[0]) << " to";
211     if (!checkPacket(msg))
212         clog(VerbosityError, "net") << "Invalid packet constructed. Size: " << msg.size()
213                                     << " bytes, message: " << toHex(msg);
214 
215     if (!m_socket->ref().is_open())
216         return;
217 
218     bool doWrite = false;
219     DEV_GUARDED(x_framing)
220     {
221         m_writeQueue.push_back(std::move(_msg));
222         doWrite = (m_writeQueue.size() == 1);
223     }
224 
225     if (doWrite)
226         write();
227 }
228 
write()229 void Session::write()
230 {
231     bytes const* out = nullptr;
232     DEV_GUARDED(x_framing)
233     {
234         m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);
235         out = &m_writeQueue[0];
236     }
237     auto self(shared_from_this());
238     ba::async_write(m_socket->ref(), ba::buffer(*out),
239         [this, self](boost::system::error_code ec, std::size_t /*length*/) {
240             // must check queue, as write callback can occur following dropped()
241             if (ec)
242             {
243                 LOG(m_netLogger) << "Error sending: " << ec.message();
244                 drop(TCPError);
245                 return;
246             }
247 
248             DEV_GUARDED(x_framing)
249             {
250                 m_writeQueue.pop_front();
251                 if (m_writeQueue.empty())
252                     return;
253             }
254             write();
255         });
256 }
257 
258 namespace
259 {
halveAtomicInt(atomic<int> & i)260     void halveAtomicInt(atomic<int>& i)
261     {
262         // atomic<int> doesn't have /= operator, so we do it manually
263         int oldInt = 0;
264         int newInt = 0;
265         do
266         {
267             oldInt = i;
268             newInt = oldInt / 2;
269             // Current value could already change when we get to exchange,
270             // we'll need to retry in the loop in this case
271         } while (!i.atomic::compare_exchange_weak(oldInt, newInt));
272     }
273 }
274 
drop(DisconnectReason _reason)275 void Session::drop(DisconnectReason _reason)
276 {
277     if (m_dropped)
278         return;
279     bi::tcp::socket& socket = m_socket->ref();
280     if (socket.is_open())
281         try
282         {
283             boost::system::error_code ec;
284             LOG(m_netLoggerDetail) << "Closing (" << reasonOf(_reason) << ") connection with";
285             socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
286             socket.close();
287         }
288         catch (...) {}
289 
290     m_peer->m_lastDisconnect = _reason;
291     if (_reason == BadProtocol)
292     {
293         halveAtomicInt(m_peer->m_rating);
294         halveAtomicInt(m_peer->m_score);
295     }
296     m_dropped = true;
297 }
298 
disconnect(DisconnectReason _reason)299 void Session::disconnect(DisconnectReason _reason)
300 {
301     clog(VerbosityTrace, "p2pcap") << "Disconnecting (our reason: " << reasonOf(_reason) << ") from " << m_logSuffix;
302 
303     if (m_socket->ref().is_open())
304     {
305         RLPStream s;
306         prep(s, DisconnectPacket, 1) << (int)_reason;
307         sealAndSend(s);
308     }
309     drop(_reason);
310 }
311 
start()312 void Session::start()
313 {
314     ping();
315     doRead();
316 }
317 
doRead()318 void Session::doRead()
319 {
320     // ignore packets received while waiting to disconnect.
321     if (m_dropped)
322         return;
323 
324     auto self(shared_from_this());
325     m_data.resize(h256::size);
326     ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size),
327         [this, self](boost::system::error_code ec, std::size_t length) {
328             if (!checkRead(h256::size, ec, length))
329                 return;
330             else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
331             {
332                 LOG(m_netLogger) << "Header decrypt failed";
333                 drop(BadProtocol);  // todo: better error
334                 return;
335             }
336 
337             uint16_t hProtocolId;
338             uint32_t hLength;
339             uint8_t hPadding;
340             try
341             {
342                 RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
343                 hProtocolId = header.protocolId;
344                 hLength = header.length;
345                 hPadding = header.padding;
346             }
347             catch (std::exception const& _e)
348             {
349                 LOG(m_netLogger) << "Exception decoding frame header RLP: " << _e.what() << " "
350                                  << bytesConstRef(m_data.data(), h128::size).cropped(3);
351                 drop(BadProtocol);
352                 return;
353             }
354 
355             /// read padded frame and mac
356             auto tlen = hLength + hPadding + h128::size;
357             m_data.resize(tlen);
358             ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen),
359                 [this, self, hLength, hProtocolId, tlen](
360                     boost::system::error_code ec, std::size_t length) {
361 
362                     if (!checkRead(tlen, ec, length))
363                         return;
364                     else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
365                     {
366                         LOG(m_netLogger) << "Frame decrypt failed";
367                         drop(BadProtocol);  // todo: better error
368                         return;
369                     }
370 
371                     bytesConstRef frame(m_data.data(), hLength);
372                     if (!checkPacket(frame))
373                     {
374                         LOG(m_netLogger) << "Received invalid message. Size: " << frame.size()
375                                          << " bytes, message: " << toHex(frame) << endl;
376                         disconnect(BadProtocol);
377                         return;
378                     }
379                     else
380                     {
381                         auto packetType = static_cast<P2pPacketType>(RLP(frame.cropped(0, 1)).toInt<unsigned>());
382                         RLP r(frame.cropped(1));
383                         bool ok = readPacket(hProtocolId, packetType, r);
384                         if (!ok)
385                             LOG(m_netLogger)
386                                 << "Couldn't interpret " << p2pPacketTypeToString(packetType)
387                                 << " (" << packetType << "). RLP: " << RLP(r);
388                     }
389                     doRead();
390                 });
391         });
392 }
393 
checkRead(std::size_t _expected,boost::system::error_code _ec,std::size_t _length)394 bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
395 {
396     if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
397     {
398         LOG(m_netLogger) << "Error reading: " << _ec.message();
399         drop(TCPError);
400         return false;
401     }
402     else if (_ec && _length < _expected)
403     {
404         LOG(m_netLogger) << "Error reading - Abrupt peer disconnect: " << _ec.message();
405         repMan().noteRude(*this);
406         drop(TCPError);
407         return false;
408     }
409     else if (_length != _expected)
410     {
411         // with static m_data-sized buffer this shouldn't happen unless there's a regression
412         // sec recommends checking anyways (instead of assert)
413         LOG(m_netLoggerError)
414             << "Error reading - TCP read buffer length differs from expected frame size ("
415             << _length << " != " << _expected << ")";
416         disconnect(UserReason);
417         return false;
418     }
419 
420     return true;
421 }
422 
registerCapability(CapDesc const & _desc,unsigned _offset,std::shared_ptr<CapabilityFace> _p)423 void Session::registerCapability(
424     CapDesc const& _desc, unsigned _offset, std::shared_ptr<CapabilityFace> _p)
425 {
426     DEV_GUARDED(x_framing)
427     {
428         m_capabilities[_desc] = move(_p);
429         m_capabilityOffsets[_desc.first] = _offset;
430     }
431 }
432 
canHandle(std::string const & _capability,unsigned _messageCount,unsigned _packetType) const433 bool Session::canHandle(
434     std::string const& _capability, unsigned _messageCount, unsigned _packetType) const
435 {
436     auto const offset = capabilityOffset(_capability);
437 
438     return offset && _packetType >= *offset && _packetType < _messageCount + *offset;
439 }
440 
disableCapability(std::string const & _capabilityName,std::string const & _problem)441 void Session::disableCapability(std::string const& _capabilityName, std::string const& _problem)
442 {
443     cnetlog << "Disabling capability '" << _capabilityName << "'. Reason: " << _problem << " " << m_logSuffix;
444     m_disabledCapabilities.insert(_capabilityName);
445     if (m_disabledCapabilities.size() == m_capabilities.size())
446     {
447         cnetlog << "All capabilities disabled. Disconnecting session.";
448         disconnect(DisconnectReason::UselessPeer);
449     }
450 }
451 
capabilityOffset(std::string const & _capabilityName) const452 boost::optional<unsigned> Session::capabilityOffset(std::string const& _capabilityName) const
453 {
454     auto it = m_capabilityOffsets.find(_capabilityName);
455     return it == m_capabilityOffsets.end() ? boost::optional<unsigned>{} : it->second;
456 }
457 
capabilityPacketTypeToString(unsigned _packetType) const458 char const* Session::capabilityPacketTypeToString(unsigned _packetType) const
459 {
460     if (_packetType < UserPacket)
461         return p2pPacketTypeToString(static_cast<P2pPacketType>(_packetType));
462     for (auto capIter : m_capabilities)
463     {
464         auto const& capName = capIter.first.first;
465         auto cap = capIter.second;
466         if (canHandle(capName, cap->messageCount(), _packetType))
467         {
468             auto offset = capabilityOffset(capName);
469             assert(offset);
470             return cap->packetTypeToString(_packetType - *offset);
471         }
472     }
473     return "Unknown";
474 }
475