1 /*
2 * Copyright (c) 2013-2021, The PurpleI2P Project
3 *
4 * This file is part of Purple i2pd project and licensed under BSD3
5 *
6 * See full license text in LICENSE file at top of project tree
7 */
8 
9 #ifndef STREAMING_H__
10 #define STREAMING_H__
11 
12 #include <inttypes.h>
13 #include <string>
14 #include <unordered_map>
15 #include <set>
16 #include <queue>
17 #include <functional>
18 #include <memory>
19 #include <mutex>
20 #include <boost/asio.hpp>
21 #include "Base.h"
22 #include "I2PEndian.h"
23 #include "Identity.h"
24 #include "LeaseSet.h"
25 #include "I2NPProtocol.h"
26 #include "Garlic.h"
27 #include "Tunnel.h"
28 #include "util.h" // MemoryPool
29 
30 namespace i2p
31 {
32 namespace client
33 {
34 	class ClientDestination;
35 }
36 namespace stream
37 {
38 	const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001;
39 	const uint16_t PACKET_FLAG_CLOSE = 0x0002;
40 	const uint16_t PACKET_FLAG_RESET = 0x0004;
41 	const uint16_t PACKET_FLAG_SIGNATURE_INCLUDED = 0x0008;
42 	const uint16_t PACKET_FLAG_SIGNATURE_REQUESTED = 0x0010;
43 	const uint16_t PACKET_FLAG_FROM_INCLUDED = 0x0020;
44 	const uint16_t PACKET_FLAG_DELAY_REQUESTED = 0x0040;
45 	const uint16_t PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED = 0x0080;
46 	const uint16_t PACKET_FLAG_PROFILE_INTERACTIVE = 0x0100;
47 	const uint16_t PACKET_FLAG_ECHO = 0x0200;
48 	const uint16_t PACKET_FLAG_NO_ACK = 0x0400;
49 	const uint16_t PACKET_FLAG_OFFLINE_SIGNATURE = 0x0800;
50 
51 	const size_t STREAMING_MTU = 1730;
52 	const size_t STREAMING_MTU_RATCHETS = 1812;
53 	const size_t MAX_PACKET_SIZE = 4096;
54 	const size_t COMPRESSION_THRESHOLD_SIZE = 66;
55 	const int MAX_NUM_RESEND_ATTEMPTS = 6;
56 	const int WINDOW_SIZE = 6; // in messages
57 	const int MIN_WINDOW_SIZE = 1;
58 	const int MAX_WINDOW_SIZE = 128;
59 	const int INITIAL_RTT = 8000; // in milliseconds
60 	const int INITIAL_RTO = 9000; // in milliseconds
61 	const int SYN_TIMEOUT = 200; // how long we wait for SYN after follow-on, in milliseconds
62 	const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
63 	const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
64 	const int MAX_RECEIVE_TIMEOUT = 20; // in seconds
65 
66 	struct Packet
67 	{
68 		size_t len, offset;
69 		uint8_t buf[MAX_PACKET_SIZE];
70 		uint64_t sendTime;
71 
PacketPacket72 		Packet (): len (0), offset (0), sendTime (0) {};
GetBufferPacket73 		uint8_t * GetBuffer () { return buf + offset; };
GetLengthPacket74 		size_t GetLength () const { return len - offset; };
75 
GetSendStreamIDPacket76 		uint32_t GetSendStreamID () const { return bufbe32toh (buf); };
GetReceiveStreamIDPacket77 		uint32_t GetReceiveStreamID () const { return bufbe32toh (buf + 4); };
GetSeqnPacket78 		uint32_t GetSeqn () const { return bufbe32toh (buf + 8); };
GetAckThroughPacket79 		uint32_t GetAckThrough () const { return bufbe32toh (buf + 12); };
GetNACKCountPacket80 		uint8_t GetNACKCount () const { return buf[16]; };
GetNACKPacket81 		uint32_t GetNACK (int i) const { return bufbe32toh (buf + 17 + 4 * i); };
GetOptionPacket82 		const uint8_t * GetOption () const { return buf + 17 + GetNACKCount ()*4 + 3; }; // 3 = resendDelay + flags
GetFlagsPacket83 		uint16_t GetFlags () const { return bufbe16toh (GetOption () - 2); };
GetOptionSizePacket84 		uint16_t GetOptionSize () const { return bufbe16toh (GetOption ()); };
GetOptionDataPacket85 		const uint8_t * GetOptionData () const { return GetOption () + 2; };
GetPayloadPacket86 		const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); };
87 
IsSYNPacket88 		bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; };
IsNoAckPacket89 		bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; };
IsEchoPacket90 		bool IsEcho () const { return GetFlags () & PACKET_FLAG_ECHO; };
91 	};
92 
93 	struct PacketCmp
94 	{
operatorPacketCmp95 		bool operator() (const Packet * p1, const Packet * p2) const
96 		{
97 			return p1->GetSeqn () < p2->GetSeqn ();
98 		};
99 	};
100 
101 	typedef std::function<void (const boost::system::error_code& ecode)> SendHandler;
102 	struct SendBuffer
103 	{
104 		uint8_t * buf;
105 		size_t len, offset;
106 		SendHandler handler;
107 
SendBufferSendBuffer108 		SendBuffer (const uint8_t * b, size_t l, SendHandler h):
109 			len(l), offset (0), handler(h)
110 		{
111 			buf = new uint8_t[len];
112 			memcpy (buf, b, len);
113 		}
SendBufferSendBuffer114 		SendBuffer (size_t l): // create empty buffer
115 			len(l), offset (0)
116 		{
117 			buf = new uint8_t[len];
118 		}
~SendBufferSendBuffer119 		~SendBuffer ()
120 		{
121 			delete[] buf;
122 			if (handler) handler(boost::system::error_code ());
123 		}
GetRemainingSizeSendBuffer124 		size_t GetRemainingSize () const { return len - offset; };
GetRemaningBufferSendBuffer125 		const uint8_t * GetRemaningBuffer () const { return buf + offset; };
CancelSendBuffer126 		void Cancel () { if (handler) handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); handler = nullptr; };
127 	};
128 
129 	class SendBufferQueue
130 	{
131 		public:
132 
SendBufferQueue()133 			SendBufferQueue (): m_Size (0) {};
~SendBufferQueue()134 			~SendBufferQueue () { CleanUp (); };
135 
136 			void Add (const uint8_t * buf, size_t len, SendHandler handler);
137 			void Add (std::shared_ptr<SendBuffer> buf);
138 			size_t Get (uint8_t * buf, size_t len);
GetSize()139 			size_t GetSize () const { return m_Size; };
IsEmpty()140 			bool IsEmpty () const { return m_Buffers.empty (); };
141 			void CleanUp ();
142 
143 		private:
144 
145 			std::list<std::shared_ptr<SendBuffer> > m_Buffers;
146 			size_t m_Size;
147 	};
148 
149 	enum StreamStatus
150 	{
151 		eStreamStatusNew = 0,
152 		eStreamStatusOpen,
153 		eStreamStatusReset,
154 		eStreamStatusClosing,
155 		eStreamStatusClosed,
156 		eStreamStatusTerminated
157 	};
158 
159 	class StreamingDestination;
160 	class Stream: public std::enable_shared_from_this<Stream>
161 	{
162 		public:
163 
164 			Stream (boost::asio::io_service& service, StreamingDestination& local,
165 				std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); // outgoing
166 			Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming
167 
168 			~Stream ();
GetSendStreamID()169 			uint32_t GetSendStreamID () const { return m_SendStreamID; };
GetRecvStreamID()170 			uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
GetRemoteLeaseSet()171 			std::shared_ptr<const i2p::data::LeaseSet> GetRemoteLeaseSet () const { return m_RemoteLeaseSet; };
GetRemoteIdentity()172 			std::shared_ptr<const i2p::data::IdentityEx> GetRemoteIdentity () const { return m_RemoteIdentity; };
IsOpen()173 			bool IsOpen () const { return m_Status == eStreamStatusOpen; };
IsEstablished()174 			bool IsEstablished () const { return m_SendStreamID; };
GetStatus()175 			StreamStatus GetStatus () const { return m_Status; };
GetLocalDestination()176 			StreamingDestination& GetLocalDestination () { return m_LocalDestination; };
177 
178 			void HandleNextPacket (Packet * packet);
179 			void HandlePing (Packet * packet);
180 			size_t Send (const uint8_t * buf, size_t len);
181 			void AsyncSend (const uint8_t * buf, size_t len, SendHandler handler);
182 			void SendPing ();
183 
184 			template<typename Buffer, typename ReceiveHandler>
185 			void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
ReadSome(uint8_t * buf,size_t len)186 			size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
187 
AsyncClose()188 			void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
189 
190 			/** only call close from destination thread, use Stream::AsyncClose for other threads */
191 			void Close ();
Cancel()192 			void Cancel () { m_ReceiveTimer.cancel (); };
193 
GetNumSentBytes()194 			size_t GetNumSentBytes () const { return m_NumSentBytes; };
GetNumReceivedBytes()195 			size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
GetSendQueueSize()196 			size_t GetSendQueueSize () const { return m_SentPackets.size (); };
GetReceiveQueueSize()197 			size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); };
GetSendBufferSize()198 			size_t GetSendBufferSize () const { return m_SendBuffer.GetSize (); };
GetWindowSize()199 			int GetWindowSize () const { return m_WindowSize; };
GetRTT()200 			int GetRTT () const { return m_RTT; };
201 
202 			void Terminate (bool deleteFromDestination = true);
203 
204 		private:
205 
206 			void CleanUp ();
207 
208 			void SendBuffer ();
209 			void SendQuickAck ();
210 			void SendClose ();
211 			bool SendPacket (Packet * packet);
212 			void SendPackets (const std::vector<Packet *>& packets);
213 			void SendUpdatedLeaseSet ();
214 
215 			void SavePacket (Packet * packet);
216 			void ProcessPacket (Packet * packet);
217 			bool ProcessOptions (uint16_t flags, Packet * packet);
218 			void ProcessAck (Packet * packet);
219 			size_t ConcatenatePackets (uint8_t * buf, size_t len);
220 
221 			void UpdateCurrentRemoteLease (bool expired = false);
222 
223 			template<typename Buffer, typename ReceiveHandler>
224 			void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout);
225 
226 			void ScheduleResend ();
227 			void HandleResendTimer (const boost::system::error_code& ecode);
228 			void HandleAckSendTimer (const boost::system::error_code& ecode);
229 
230 		private:
231 
232 			boost::asio::io_service& m_Service;
233 			uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber;
234 			int32_t m_LastReceivedSequenceNumber;
235 			StreamStatus m_Status;
236 			bool m_IsAckSendScheduled;
237 			StreamingDestination& m_LocalDestination;
238 			std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity;
239 			std::shared_ptr<const i2p::crypto::Verifier> m_TransientVerifier; // in case of offline key
240 			std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
241 			std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
242 			std::shared_ptr<const i2p::data::Lease> m_CurrentRemoteLease;
243 			std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel;
244 			std::queue<Packet *> m_ReceiveQueue;
245 			std::set<Packet *, PacketCmp> m_SavedPackets;
246 			std::set<Packet *, PacketCmp> m_SentPackets;
247 			boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer;
248 			size_t m_NumSentBytes, m_NumReceivedBytes;
249 			uint16_t m_Port;
250 
251 			std::mutex m_SendBufferMutex;
252 			SendBufferQueue m_SendBuffer;
253 			int m_WindowSize, m_RTT, m_RTO, m_AckDelay;
254 			uint64_t m_LastWindowSizeIncreaseTime;
255 			int m_NumResendAttempts;
256 			size_t m_MTU;
257 	};
258 
259 	class StreamingDestination: public std::enable_shared_from_this<StreamingDestination>
260 	{
261 		public:
262 
263 			typedef std::function<void (std::shared_ptr<Stream>)> Acceptor;
264 
265 			StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort = 0, bool gzip = false);
266 			~StreamingDestination ();
267 
268 			void Start ();
269 			void Stop ();
270 
271 			std::shared_ptr<Stream> CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0);
272 			void SendPing (std::shared_ptr<const i2p::data::LeaseSet> remote);
273 			void DeleteStream (std::shared_ptr<Stream> stream);
274 			bool DeleteStream (uint32_t recvStreamID);
275 			void SetAcceptor (const Acceptor& acceptor);
276 			void ResetAcceptor ();
IsAcceptorSet()277 			bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
278 			void AcceptOnce (const Acceptor& acceptor);
279 			void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev);
280 
GetOwner()281 			std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; };
SetOwner(std::shared_ptr<i2p::client::ClientDestination> owner)282 			void SetOwner (std::shared_ptr<i2p::client::ClientDestination> owner) { m_Owner = owner; };
GetLocalPort()283 			uint16_t GetLocalPort () const { return m_LocalPort; };
284 
285 			void HandleDataMessagePayload (const uint8_t * buf, size_t len);
286 			std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort, bool checksum = true);
287 
NewPacket()288 			Packet * NewPacket () { return m_PacketsPool.Acquire(); }
DeletePacket(Packet * p)289 			void DeletePacket (Packet * p) { return m_PacketsPool.Release(p); }
290 
291 		private:
292 
293 			void HandleNextPacket (Packet * packet);
294 			std::shared_ptr<Stream> CreateNewIncomingStream (uint32_t receiveStreamID);
295 			void HandlePendingIncomingTimer (const boost::system::error_code& ecode);
296 
297 		private:
298 
299 			std::shared_ptr<i2p::client::ClientDestination> m_Owner;
300 			uint16_t m_LocalPort;
301 			bool m_Gzip; // gzip compression of data messages
302 			std::mutex m_StreamsMutex;
303 			std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream
304 			std::unordered_map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream
305 			std::shared_ptr<Stream> m_LastStream;
306 			Acceptor m_Acceptor;
307 			std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
308 			boost::asio::deadline_timer m_PendingIncomingTimer;
309 			std::unordered_map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN
310 
311 			i2p::util::MemoryPool<Packet> m_PacketsPool;
312 			i2p::util::MemoryPool<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool;
313 
314 		public:
315 
316 			i2p::data::GzipInflator m_Inflator;
317 			std::unique_ptr<i2p::data::GzipDeflator> m_Deflator;
318 
319 			// for HTTP only
decltype(m_Streams)320 			const decltype(m_Streams)& GetStreams () const { return m_Streams; };
321 	};
322 
323 //-------------------------------------------------
324 
325 	template<typename Buffer, typename ReceiveHandler>
AsyncReceive(const Buffer & buffer,ReceiveHandler handler,int timeout)326 	void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
327 	{
328 		auto s = shared_from_this();
329 		m_Service.post ([s, buffer, handler, timeout](void)
330 		{
331 			if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset)
332 				s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0);
333 			else
334 			{
335 				int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
336 				s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
337 				int left = timeout - t;
338 				auto self = s->shared_from_this();
339 				self->m_ReceiveTimer.async_wait (
340 					[self, buffer, handler, left](const boost::system::error_code & ec)
341 					{
342 						self->HandleReceiveTimer(ec, buffer, handler, left);
343 					});
344 			}
345 		});
346 	}
347 
348 	template<typename Buffer, typename ReceiveHandler>
HandleReceiveTimer(const boost::system::error_code & ecode,const Buffer & buffer,ReceiveHandler handler,int remainingTimeout)349 	void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout)
350 	{
351 		size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer));
352 		if (received > 0)
353 			handler (boost::system::error_code (), received);
354 		else if (ecode == boost::asio::error::operation_aborted)
355 		{
356 			// timeout not expired
357 			if (m_Status == eStreamStatusReset)
358 				handler (boost::asio::error::make_error_code (boost::asio::error::connection_reset), 0);
359 			else
360 				handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), 0);
361 		}
362 		else
363 		{
364 			// timeout expired
365 			if (remainingTimeout <= 0)
366 				handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received);
367 			else
368 			{
369 				// itermediate interrupt
370 				SendUpdatedLeaseSet (); // send our leaseset if applicable
371 				AsyncReceive (buffer, handler, remainingTimeout);
372 			}
373 		}
374 	}
375 }
376 }
377 
378 #endif
379