1 /* 2 * Copyright (c) 2013-2021, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #ifndef STREAMING_H__ 10 #define STREAMING_H__ 11 12 #include <inttypes.h> 13 #include <string> 14 #include <unordered_map> 15 #include <set> 16 #include <queue> 17 #include <functional> 18 #include <memory> 19 #include <mutex> 20 #include <boost/asio.hpp> 21 #include "Base.h" 22 #include "I2PEndian.h" 23 #include "Identity.h" 24 #include "LeaseSet.h" 25 #include "I2NPProtocol.h" 26 #include "Garlic.h" 27 #include "Tunnel.h" 28 #include "util.h" // MemoryPool 29 30 namespace i2p 31 { 32 namespace client 33 { 34 class ClientDestination; 35 } 36 namespace stream 37 { 38 const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001; 39 const uint16_t PACKET_FLAG_CLOSE = 0x0002; 40 const uint16_t PACKET_FLAG_RESET = 0x0004; 41 const uint16_t PACKET_FLAG_SIGNATURE_INCLUDED = 0x0008; 42 const uint16_t PACKET_FLAG_SIGNATURE_REQUESTED = 0x0010; 43 const uint16_t PACKET_FLAG_FROM_INCLUDED = 0x0020; 44 const uint16_t PACKET_FLAG_DELAY_REQUESTED = 0x0040; 45 const uint16_t PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED = 0x0080; 46 const uint16_t PACKET_FLAG_PROFILE_INTERACTIVE = 0x0100; 47 const uint16_t PACKET_FLAG_ECHO = 0x0200; 48 const uint16_t PACKET_FLAG_NO_ACK = 0x0400; 49 const uint16_t PACKET_FLAG_OFFLINE_SIGNATURE = 0x0800; 50 51 const size_t STREAMING_MTU = 1730; 52 const size_t STREAMING_MTU_RATCHETS = 1812; 53 const size_t MAX_PACKET_SIZE = 4096; 54 const size_t COMPRESSION_THRESHOLD_SIZE = 66; 55 const int MAX_NUM_RESEND_ATTEMPTS = 6; 56 const int WINDOW_SIZE = 6; // in messages 57 const int MIN_WINDOW_SIZE = 1; 58 const int MAX_WINDOW_SIZE = 128; 59 const int INITIAL_RTT = 8000; // in milliseconds 60 const int INITIAL_RTO = 9000; // in milliseconds 61 const int SYN_TIMEOUT = 200; // how long we wait for SYN after follow-on, in milliseconds 62 const size_t MAX_PENDING_INCOMING_BACKLOG = 128; 63 const int PENDING_INCOMING_TIMEOUT = 10; // in seconds 64 const int MAX_RECEIVE_TIMEOUT = 20; // in seconds 65 66 struct Packet 67 { 68 size_t len, offset; 69 uint8_t buf[MAX_PACKET_SIZE]; 70 uint64_t sendTime; 71 PacketPacket72 Packet (): len (0), offset (0), sendTime (0) {}; GetBufferPacket73 uint8_t * GetBuffer () { return buf + offset; }; GetLengthPacket74 size_t GetLength () const { return len - offset; }; 75 GetSendStreamIDPacket76 uint32_t GetSendStreamID () const { return bufbe32toh (buf); }; GetReceiveStreamIDPacket77 uint32_t GetReceiveStreamID () const { return bufbe32toh (buf + 4); }; GetSeqnPacket78 uint32_t GetSeqn () const { return bufbe32toh (buf + 8); }; GetAckThroughPacket79 uint32_t GetAckThrough () const { return bufbe32toh (buf + 12); }; GetNACKCountPacket80 uint8_t GetNACKCount () const { return buf[16]; }; GetNACKPacket81 uint32_t GetNACK (int i) const { return bufbe32toh (buf + 17 + 4 * i); }; GetOptionPacket82 const uint8_t * GetOption () const { return buf + 17 + GetNACKCount ()*4 + 3; }; // 3 = resendDelay + flags GetFlagsPacket83 uint16_t GetFlags () const { return bufbe16toh (GetOption () - 2); }; GetOptionSizePacket84 uint16_t GetOptionSize () const { return bufbe16toh (GetOption ()); }; GetOptionDataPacket85 const uint8_t * GetOptionData () const { return GetOption () + 2; }; GetPayloadPacket86 const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); }; 87 IsSYNPacket88 bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; }; IsNoAckPacket89 bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; }; IsEchoPacket90 bool IsEcho () const { return GetFlags () & PACKET_FLAG_ECHO; }; 91 }; 92 93 struct PacketCmp 94 { operatorPacketCmp95 bool operator() (const Packet * p1, const Packet * p2) const 96 { 97 return p1->GetSeqn () < p2->GetSeqn (); 98 }; 99 }; 100 101 typedef std::function<void (const boost::system::error_code& ecode)> SendHandler; 102 struct SendBuffer 103 { 104 uint8_t * buf; 105 size_t len, offset; 106 SendHandler handler; 107 SendBufferSendBuffer108 SendBuffer (const uint8_t * b, size_t l, SendHandler h): 109 len(l), offset (0), handler(h) 110 { 111 buf = new uint8_t[len]; 112 memcpy (buf, b, len); 113 } SendBufferSendBuffer114 SendBuffer (size_t l): // create empty buffer 115 len(l), offset (0) 116 { 117 buf = new uint8_t[len]; 118 } ~SendBufferSendBuffer119 ~SendBuffer () 120 { 121 delete[] buf; 122 if (handler) handler(boost::system::error_code ()); 123 } GetRemainingSizeSendBuffer124 size_t GetRemainingSize () const { return len - offset; }; GetRemaningBufferSendBuffer125 const uint8_t * GetRemaningBuffer () const { return buf + offset; }; CancelSendBuffer126 void Cancel () { if (handler) handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); handler = nullptr; }; 127 }; 128 129 class SendBufferQueue 130 { 131 public: 132 SendBufferQueue()133 SendBufferQueue (): m_Size (0) {}; ~SendBufferQueue()134 ~SendBufferQueue () { CleanUp (); }; 135 136 void Add (const uint8_t * buf, size_t len, SendHandler handler); 137 void Add (std::shared_ptr<SendBuffer> buf); 138 size_t Get (uint8_t * buf, size_t len); GetSize()139 size_t GetSize () const { return m_Size; }; IsEmpty()140 bool IsEmpty () const { return m_Buffers.empty (); }; 141 void CleanUp (); 142 143 private: 144 145 std::list<std::shared_ptr<SendBuffer> > m_Buffers; 146 size_t m_Size; 147 }; 148 149 enum StreamStatus 150 { 151 eStreamStatusNew = 0, 152 eStreamStatusOpen, 153 eStreamStatusReset, 154 eStreamStatusClosing, 155 eStreamStatusClosed, 156 eStreamStatusTerminated 157 }; 158 159 class StreamingDestination; 160 class Stream: public std::enable_shared_from_this<Stream> 161 { 162 public: 163 164 Stream (boost::asio::io_service& service, StreamingDestination& local, 165 std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); // outgoing 166 Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming 167 168 ~Stream (); GetSendStreamID()169 uint32_t GetSendStreamID () const { return m_SendStreamID; }; GetRecvStreamID()170 uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; GetRemoteLeaseSet()171 std::shared_ptr<const i2p::data::LeaseSet> GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; GetRemoteIdentity()172 std::shared_ptr<const i2p::data::IdentityEx> GetRemoteIdentity () const { return m_RemoteIdentity; }; IsOpen()173 bool IsOpen () const { return m_Status == eStreamStatusOpen; }; IsEstablished()174 bool IsEstablished () const { return m_SendStreamID; }; GetStatus()175 StreamStatus GetStatus () const { return m_Status; }; GetLocalDestination()176 StreamingDestination& GetLocalDestination () { return m_LocalDestination; }; 177 178 void HandleNextPacket (Packet * packet); 179 void HandlePing (Packet * packet); 180 size_t Send (const uint8_t * buf, size_t len); 181 void AsyncSend (const uint8_t * buf, size_t len, SendHandler handler); 182 void SendPing (); 183 184 template<typename Buffer, typename ReceiveHandler> 185 void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); ReadSome(uint8_t * buf,size_t len)186 size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; 187 AsyncClose()188 void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; 189 190 /** only call close from destination thread, use Stream::AsyncClose for other threads */ 191 void Close (); Cancel()192 void Cancel () { m_ReceiveTimer.cancel (); }; 193 GetNumSentBytes()194 size_t GetNumSentBytes () const { return m_NumSentBytes; }; GetNumReceivedBytes()195 size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; GetSendQueueSize()196 size_t GetSendQueueSize () const { return m_SentPackets.size (); }; GetReceiveQueueSize()197 size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); }; GetSendBufferSize()198 size_t GetSendBufferSize () const { return m_SendBuffer.GetSize (); }; GetWindowSize()199 int GetWindowSize () const { return m_WindowSize; }; GetRTT()200 int GetRTT () const { return m_RTT; }; 201 202 void Terminate (bool deleteFromDestination = true); 203 204 private: 205 206 void CleanUp (); 207 208 void SendBuffer (); 209 void SendQuickAck (); 210 void SendClose (); 211 bool SendPacket (Packet * packet); 212 void SendPackets (const std::vector<Packet *>& packets); 213 void SendUpdatedLeaseSet (); 214 215 void SavePacket (Packet * packet); 216 void ProcessPacket (Packet * packet); 217 bool ProcessOptions (uint16_t flags, Packet * packet); 218 void ProcessAck (Packet * packet); 219 size_t ConcatenatePackets (uint8_t * buf, size_t len); 220 221 void UpdateCurrentRemoteLease (bool expired = false); 222 223 template<typename Buffer, typename ReceiveHandler> 224 void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout); 225 226 void ScheduleResend (); 227 void HandleResendTimer (const boost::system::error_code& ecode); 228 void HandleAckSendTimer (const boost::system::error_code& ecode); 229 230 private: 231 232 boost::asio::io_service& m_Service; 233 uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; 234 int32_t m_LastReceivedSequenceNumber; 235 StreamStatus m_Status; 236 bool m_IsAckSendScheduled; 237 StreamingDestination& m_LocalDestination; 238 std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity; 239 std::shared_ptr<const i2p::crypto::Verifier> m_TransientVerifier; // in case of offline key 240 std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet; 241 std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession; 242 std::shared_ptr<const i2p::data::Lease> m_CurrentRemoteLease; 243 std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel; 244 std::queue<Packet *> m_ReceiveQueue; 245 std::set<Packet *, PacketCmp> m_SavedPackets; 246 std::set<Packet *, PacketCmp> m_SentPackets; 247 boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer; 248 size_t m_NumSentBytes, m_NumReceivedBytes; 249 uint16_t m_Port; 250 251 std::mutex m_SendBufferMutex; 252 SendBufferQueue m_SendBuffer; 253 int m_WindowSize, m_RTT, m_RTO, m_AckDelay; 254 uint64_t m_LastWindowSizeIncreaseTime; 255 int m_NumResendAttempts; 256 size_t m_MTU; 257 }; 258 259 class StreamingDestination: public std::enable_shared_from_this<StreamingDestination> 260 { 261 public: 262 263 typedef std::function<void (std::shared_ptr<Stream>)> Acceptor; 264 265 StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort = 0, bool gzip = false); 266 ~StreamingDestination (); 267 268 void Start (); 269 void Stop (); 270 271 std::shared_ptr<Stream> CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); 272 void SendPing (std::shared_ptr<const i2p::data::LeaseSet> remote); 273 void DeleteStream (std::shared_ptr<Stream> stream); 274 bool DeleteStream (uint32_t recvStreamID); 275 void SetAcceptor (const Acceptor& acceptor); 276 void ResetAcceptor (); IsAcceptorSet()277 bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; 278 void AcceptOnce (const Acceptor& acceptor); 279 void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev); 280 GetOwner()281 std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; }; SetOwner(std::shared_ptr<i2p::client::ClientDestination> owner)282 void SetOwner (std::shared_ptr<i2p::client::ClientDestination> owner) { m_Owner = owner; }; GetLocalPort()283 uint16_t GetLocalPort () const { return m_LocalPort; }; 284 285 void HandleDataMessagePayload (const uint8_t * buf, size_t len); 286 std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort, bool checksum = true); 287 NewPacket()288 Packet * NewPacket () { return m_PacketsPool.Acquire(); } DeletePacket(Packet * p)289 void DeletePacket (Packet * p) { return m_PacketsPool.Release(p); } 290 291 private: 292 293 void HandleNextPacket (Packet * packet); 294 std::shared_ptr<Stream> CreateNewIncomingStream (uint32_t receiveStreamID); 295 void HandlePendingIncomingTimer (const boost::system::error_code& ecode); 296 297 private: 298 299 std::shared_ptr<i2p::client::ClientDestination> m_Owner; 300 uint16_t m_LocalPort; 301 bool m_Gzip; // gzip compression of data messages 302 std::mutex m_StreamsMutex; 303 std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream 304 std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream 305 std::shared_ptr<Stream> m_LastStream; 306 Acceptor m_Acceptor; 307 std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams; 308 boost::asio::deadline_timer m_PendingIncomingTimer; 309 std::unordered_map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN 310 311 i2p::util::MemoryPool<Packet> m_PacketsPool; 312 i2p::util::MemoryPool<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool; 313 314 public: 315 316 i2p::data::GzipInflator m_Inflator; 317 std::unique_ptr<i2p::data::GzipDeflator> m_Deflator; 318 319 // for HTTP only decltype(m_Streams)320 const decltype(m_Streams)& GetStreams () const { return m_Streams; }; 321 }; 322 323 //------------------------------------------------- 324 325 template<typename Buffer, typename ReceiveHandler> AsyncReceive(const Buffer & buffer,ReceiveHandler handler,int timeout)326 void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) 327 { 328 auto s = shared_from_this(); 329 m_Service.post ([s, buffer, handler, timeout](void) 330 { 331 if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset) 332 s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0); 333 else 334 { 335 int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; 336 s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); 337 int left = timeout - t; 338 auto self = s->shared_from_this(); 339 self->m_ReceiveTimer.async_wait ( 340 [self, buffer, handler, left](const boost::system::error_code & ec) 341 { 342 self->HandleReceiveTimer(ec, buffer, handler, left); 343 }); 344 } 345 }); 346 } 347 348 template<typename Buffer, typename ReceiveHandler> HandleReceiveTimer(const boost::system::error_code & ecode,const Buffer & buffer,ReceiveHandler handler,int remainingTimeout)349 void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout) 350 { 351 size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer)); 352 if (received > 0) 353 handler (boost::system::error_code (), received); 354 else if (ecode == boost::asio::error::operation_aborted) 355 { 356 // timeout not expired 357 if (m_Status == eStreamStatusReset) 358 handler (boost::asio::error::make_error_code (boost::asio::error::connection_reset), 0); 359 else 360 handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), 0); 361 } 362 else 363 { 364 // timeout expired 365 if (remainingTimeout <= 0) 366 handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received); 367 else 368 { 369 // itermediate interrupt 370 SendUpdatedLeaseSet (); // send our leaseset if applicable 371 AsyncReceive (buffer, handler, remainingTimeout); 372 } 373 } 374 } 375 } 376 } 377 378 #endif 379