1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4
5 #include "Session.h"
6
7 #include "Host.h"
8 #include "RLPXFrameCoder.h"
9 #include <libdevcore/Common.h>
10 #include <libdevcore/CommonIO.h>
11 #include <libdevcore/Exceptions.h>
12 #include <chrono>
13
14 using namespace std;
15 using namespace dev;
16 using namespace dev::p2p;
17
Session(Host * _h,unique_ptr<RLPXFrameCoder> && _io,std::shared_ptr<RLPXSocket> const & _s,std::shared_ptr<Peer> const & _n,PeerSessionInfo _info)18 Session::Session(Host* _h, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s,
19 std::shared_ptr<Peer> const& _n, PeerSessionInfo _info)
20 : m_server(_h),
21 m_io(move(_io)),
22 m_socket(_s),
23 m_peer(_n),
24 m_info(_info),
25 m_ping(chrono::steady_clock::time_point::max())
26 {
27 stringstream remoteInfoStream;
28 remoteInfoStream << "(" << m_info.id << "@" << m_socket->remoteEndpoint() << ")";
29
30 m_logSuffix = remoteInfoStream.str();
31
32 auto const attr = boost::log::attributes::constant<std::string>{remoteInfoStream.str()};
33 m_netLogger.add_attribute("Suffix", attr);
34 m_netLoggerDetail.add_attribute("Suffix", attr);
35 m_netLoggerError.add_attribute("Suffix", attr);
36
37 m_capLogger.add_attribute("Suffix", attr);
38 m_capLoggerDetail.add_attribute("Suffix", attr);
39
40 m_peer->m_lastDisconnect = NoDisconnect;
41 m_lastReceived = m_connect = chrono::steady_clock::now();
42 }
43
~Session()44 Session::~Session()
45 {
46 cnetlog << "Closing peer session with " << m_logSuffix;
47
48 m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
49
50 // Read-chain finished for one reason or another.
51 for (auto& i : m_capabilities)
52 {
53 i.second->onDisconnect(id());
54 i.second.reset();
55 }
56
57 try
58 {
59 bi::tcp::socket& socket = m_socket->ref();
60 if (socket.is_open())
61 {
62 boost::system::error_code ec;
63 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
64 socket.close();
65 }
66 }
67 catch (...){}
68 }
69
repMan()70 ReputationManager& Session::repMan()
71 {
72 return m_server->repMan();
73 }
74
id() const75 NodeID Session::id() const
76 {
77 return m_peer ? m_peer->id : NodeID();
78 }
79
addRating(int _r)80 void Session::addRating(int _r)
81 {
82 if (m_peer)
83 {
84 m_peer->m_rating += _r;
85 m_peer->m_score += _r;
86 if (_r >= 0)
87 m_peer->noteSessionGood();
88 }
89 }
90
rating() const91 int Session::rating() const
92 {
93 return m_peer->m_rating;
94 }
95
readPacket(uint16_t _capId,unsigned _packetType,RLP const & _r)96 bool Session::readPacket(uint16_t _capId, unsigned _packetType, RLP const& _r)
97 {
98 m_lastReceived = chrono::steady_clock::now();
99 LOG(m_netLoggerDetail) << "Received " << capabilityPacketTypeToString(_packetType) << " ("
100 << _packetType << ") from";
101 try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
102 {
103 // v4 frame headers are useless, offset packet type used
104 // v5 protocol type is in header, packet type not offset
105 if (_capId == 0 && _packetType < UserPacket)
106 return interpretP2pPacket(static_cast<P2pPacketType>(_packetType), _r);
107
108 for (auto const& cap : m_capabilities)
109 {
110 auto const& name = cap.first.first;
111 auto const& capability = cap.second;
112
113 if (canHandle(name, capability->messageCount(), _packetType))
114 {
115 if (!capabilityEnabled(name))
116 return true;
117
118 auto offset = capabilityOffset(name);
119 assert(offset);
120 return capability->interpretCapabilityPacket(id(), _packetType - *offset, _r);
121 }
122 }
123
124 return false;
125 }
126 catch (std::exception const& _e)
127 {
128 LOG(m_netLogger) << "Exception caught in p2p::Session::readPacket(): " << _e.what()
129 << ". PacketType: " << capabilityPacketTypeToString(_packetType) << " ("
130 << _packetType << "). RLP: " << _r;
131 disconnect(BadProtocol);
132 return true;
133 }
134 return true;
135 }
136
interpretP2pPacket(P2pPacketType _t,RLP const & _r)137 bool Session::interpretP2pPacket(P2pPacketType _t, RLP const& _r)
138 {
139 LOG(m_capLoggerDetail) << p2pPacketTypeToString(_t) << " from";
140 switch (_t)
141 {
142 case DisconnectPacket:
143 {
144 string reason = "Unspecified";
145 auto r = (DisconnectReason)_r[0].toInt<int>();
146 if (!_r[0].isInt())
147 drop(BadProtocol);
148 else
149 {
150 reason = reasonOf(r);
151 LOG(m_capLogger) << "Disconnect (reason: " << reason << ") from";
152 drop(DisconnectRequested);
153 }
154 break;
155 }
156 case PingPacket:
157 {
158 LOG(m_capLoggerDetail) << "Pong to";
159 RLPStream s;
160 sealAndSend(prep(s, PongPacket));
161 break;
162 }
163 case PongPacket:
164 DEV_GUARDED(x_info)
165 {
166 m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
167 LOG(m_capLoggerDetail)
168 << "Ping latency: "
169 << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
170 }
171 break;
172 default:
173 return false;
174 }
175 return true;
176 }
177
ping()178 void Session::ping()
179 {
180 clog(VerbosityTrace, "p2pcap") << "Ping to " << m_logSuffix;
181 RLPStream s;
182 sealAndSend(prep(s, PingPacket));
183 m_ping = std::chrono::steady_clock::now();
184 }
185
prep(RLPStream & _s,P2pPacketType _id,unsigned _args)186 RLPStream& Session::prep(RLPStream& _s, P2pPacketType _id, unsigned _args)
187 {
188 return _s.append((unsigned)_id).appendList(_args);
189 }
190
sealAndSend(RLPStream & _s)191 void Session::sealAndSend(RLPStream& _s)
192 {
193 bytes b;
194 _s.swapOut(b);
195 send(move(b));
196 }
197
checkPacket(bytesConstRef _msg)198 bool Session::checkPacket(bytesConstRef _msg)
199 {
200 if (_msg[0] > 0x7f || _msg.size() < 2)
201 return false;
202 if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
203 return false;
204 return true;
205 }
206
send(bytes && _msg)207 void Session::send(bytes&& _msg)
208 {
209 bytesConstRef msg(&_msg);
210 LOG(m_netLoggerDetail) << capabilityPacketTypeToString(_msg[0]) << " to";
211 if (!checkPacket(msg))
212 clog(VerbosityError, "net") << "Invalid packet constructed. Size: " << msg.size()
213 << " bytes, message: " << toHex(msg);
214
215 if (!m_socket->ref().is_open())
216 return;
217
218 bool doWrite = false;
219 DEV_GUARDED(x_framing)
220 {
221 m_writeQueue.push_back(std::move(_msg));
222 doWrite = (m_writeQueue.size() == 1);
223 }
224
225 if (doWrite)
226 write();
227 }
228
write()229 void Session::write()
230 {
231 bytes const* out = nullptr;
232 DEV_GUARDED(x_framing)
233 {
234 m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);
235 out = &m_writeQueue[0];
236 }
237 auto self(shared_from_this());
238 ba::async_write(m_socket->ref(), ba::buffer(*out),
239 [this, self](boost::system::error_code ec, std::size_t /*length*/) {
240 // must check queue, as write callback can occur following dropped()
241 if (ec)
242 {
243 LOG(m_netLogger) << "Error sending: " << ec.message();
244 drop(TCPError);
245 return;
246 }
247
248 DEV_GUARDED(x_framing)
249 {
250 m_writeQueue.pop_front();
251 if (m_writeQueue.empty())
252 return;
253 }
254 write();
255 });
256 }
257
258 namespace
259 {
halveAtomicInt(atomic<int> & i)260 void halveAtomicInt(atomic<int>& i)
261 {
262 // atomic<int> doesn't have /= operator, so we do it manually
263 int oldInt = 0;
264 int newInt = 0;
265 do
266 {
267 oldInt = i;
268 newInt = oldInt / 2;
269 // Current value could already change when we get to exchange,
270 // we'll need to retry in the loop in this case
271 } while (!i.atomic::compare_exchange_weak(oldInt, newInt));
272 }
273 }
274
drop(DisconnectReason _reason)275 void Session::drop(DisconnectReason _reason)
276 {
277 if (m_dropped)
278 return;
279 bi::tcp::socket& socket = m_socket->ref();
280 if (socket.is_open())
281 try
282 {
283 boost::system::error_code ec;
284 LOG(m_netLoggerDetail) << "Closing (" << reasonOf(_reason) << ") connection with";
285 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
286 socket.close();
287 }
288 catch (...) {}
289
290 m_peer->m_lastDisconnect = _reason;
291 if (_reason == BadProtocol)
292 {
293 halveAtomicInt(m_peer->m_rating);
294 halveAtomicInt(m_peer->m_score);
295 }
296 m_dropped = true;
297 }
298
disconnect(DisconnectReason _reason)299 void Session::disconnect(DisconnectReason _reason)
300 {
301 clog(VerbosityTrace, "p2pcap") << "Disconnecting (our reason: " << reasonOf(_reason) << ") from " << m_logSuffix;
302
303 if (m_socket->ref().is_open())
304 {
305 RLPStream s;
306 prep(s, DisconnectPacket, 1) << (int)_reason;
307 sealAndSend(s);
308 }
309 drop(_reason);
310 }
311
start()312 void Session::start()
313 {
314 ping();
315 doRead();
316 }
317
doRead()318 void Session::doRead()
319 {
320 // ignore packets received while waiting to disconnect.
321 if (m_dropped)
322 return;
323
324 auto self(shared_from_this());
325 m_data.resize(h256::size);
326 ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size),
327 [this, self](boost::system::error_code ec, std::size_t length) {
328 if (!checkRead(h256::size, ec, length))
329 return;
330 else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
331 {
332 LOG(m_netLogger) << "Header decrypt failed";
333 drop(BadProtocol); // todo: better error
334 return;
335 }
336
337 uint16_t hProtocolId;
338 uint32_t hLength;
339 uint8_t hPadding;
340 try
341 {
342 RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
343 hProtocolId = header.protocolId;
344 hLength = header.length;
345 hPadding = header.padding;
346 }
347 catch (std::exception const& _e)
348 {
349 LOG(m_netLogger) << "Exception decoding frame header RLP: " << _e.what() << " "
350 << bytesConstRef(m_data.data(), h128::size).cropped(3);
351 drop(BadProtocol);
352 return;
353 }
354
355 /// read padded frame and mac
356 auto tlen = hLength + hPadding + h128::size;
357 m_data.resize(tlen);
358 ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen),
359 [this, self, hLength, hProtocolId, tlen](
360 boost::system::error_code ec, std::size_t length) {
361
362 if (!checkRead(tlen, ec, length))
363 return;
364 else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
365 {
366 LOG(m_netLogger) << "Frame decrypt failed";
367 drop(BadProtocol); // todo: better error
368 return;
369 }
370
371 bytesConstRef frame(m_data.data(), hLength);
372 if (!checkPacket(frame))
373 {
374 LOG(m_netLogger) << "Received invalid message. Size: " << frame.size()
375 << " bytes, message: " << toHex(frame) << endl;
376 disconnect(BadProtocol);
377 return;
378 }
379 else
380 {
381 auto packetType = static_cast<P2pPacketType>(RLP(frame.cropped(0, 1)).toInt<unsigned>());
382 RLP r(frame.cropped(1));
383 bool ok = readPacket(hProtocolId, packetType, r);
384 if (!ok)
385 LOG(m_netLogger)
386 << "Couldn't interpret " << p2pPacketTypeToString(packetType)
387 << " (" << packetType << "). RLP: " << RLP(r);
388 }
389 doRead();
390 });
391 });
392 }
393
checkRead(std::size_t _expected,boost::system::error_code _ec,std::size_t _length)394 bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
395 {
396 if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
397 {
398 LOG(m_netLogger) << "Error reading: " << _ec.message();
399 drop(TCPError);
400 return false;
401 }
402 else if (_ec && _length < _expected)
403 {
404 LOG(m_netLogger) << "Error reading - Abrupt peer disconnect: " << _ec.message();
405 repMan().noteRude(*this);
406 drop(TCPError);
407 return false;
408 }
409 else if (_length != _expected)
410 {
411 // with static m_data-sized buffer this shouldn't happen unless there's a regression
412 // sec recommends checking anyways (instead of assert)
413 LOG(m_netLoggerError)
414 << "Error reading - TCP read buffer length differs from expected frame size ("
415 << _length << " != " << _expected << ")";
416 disconnect(UserReason);
417 return false;
418 }
419
420 return true;
421 }
422
registerCapability(CapDesc const & _desc,unsigned _offset,std::shared_ptr<CapabilityFace> _p)423 void Session::registerCapability(
424 CapDesc const& _desc, unsigned _offset, std::shared_ptr<CapabilityFace> _p)
425 {
426 DEV_GUARDED(x_framing)
427 {
428 m_capabilities[_desc] = move(_p);
429 m_capabilityOffsets[_desc.first] = _offset;
430 }
431 }
432
canHandle(std::string const & _capability,unsigned _messageCount,unsigned _packetType) const433 bool Session::canHandle(
434 std::string const& _capability, unsigned _messageCount, unsigned _packetType) const
435 {
436 auto const offset = capabilityOffset(_capability);
437
438 return offset && _packetType >= *offset && _packetType < _messageCount + *offset;
439 }
440
disableCapability(std::string const & _capabilityName,std::string const & _problem)441 void Session::disableCapability(std::string const& _capabilityName, std::string const& _problem)
442 {
443 cnetlog << "Disabling capability '" << _capabilityName << "'. Reason: " << _problem << " " << m_logSuffix;
444 m_disabledCapabilities.insert(_capabilityName);
445 if (m_disabledCapabilities.size() == m_capabilities.size())
446 {
447 cnetlog << "All capabilities disabled. Disconnecting session.";
448 disconnect(DisconnectReason::UselessPeer);
449 }
450 }
451
capabilityOffset(std::string const & _capabilityName) const452 boost::optional<unsigned> Session::capabilityOffset(std::string const& _capabilityName) const
453 {
454 auto it = m_capabilityOffsets.find(_capabilityName);
455 return it == m_capabilityOffsets.end() ? boost::optional<unsigned>{} : it->second;
456 }
457
capabilityPacketTypeToString(unsigned _packetType) const458 char const* Session::capabilityPacketTypeToString(unsigned _packetType) const
459 {
460 if (_packetType < UserPacket)
461 return p2pPacketTypeToString(static_cast<P2pPacketType>(_packetType));
462 for (auto capIter : m_capabilities)
463 {
464 auto const& capName = capIter.first.first;
465 auto cap = capIter.second;
466 if (canHandle(capName, cap->messageCount(), _packetType))
467 {
468 auto offset = capabilityOffset(capName);
469 assert(offset);
470 return cap->packetTypeToString(_packetType - *offset);
471 }
472 }
473 return "Unknown";
474 }
475