1 /* Copyright The kNet Project.
2 
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6 
7        http://www.apache.org/licenses/LICENSE-2.0
8 
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License. */
14 #pragma once
15 
16 /** @file MessageConnection.h
17 	@brief The MessageConnection and ConnectionStatistics classes. */
18 
19 // Modified by Lasse Oorni for Urho3D
20 
21 #include <vector>
22 #include <map>
23 #include <utility>
24 #include <set>
25 
26 #include "kNetBuildConfig.h"
27 // Urho3D: include Socket.h first to make sure WS2Include.h is included before windows.h / winsock.h
28 #include "Socket.h"
29 #include "WaitFreeQueue.h"
30 #include "LockFreePoolAllocator.h"
31 #include "Lockable.h"
32 #include "NetworkSimulator.h"
33 #include "IMessageHandler.h"
34 #include "BasicSerializedDataTypes.h"
35 #include "Datagram.h"
36 #include "FragmentedTransferManager.h"
37 #include "NetworkMessage.h"
38 #include "Event.h"
39 #include "DataSerializer.h"
40 #include "DataDeserializer.h"
41 
42 #include "MaxHeap.h"
43 #include "Clock.h"
44 #include "PolledTimer.h"
45 #include "Thread.h"
46 #include "Types.h"
47 
48 namespace kNet
49 {
50 
51 class MessageConnection;
52 class UDPMessageConnection;
53 class TCPMessageConnection;
54 class NetworkServer;
55 class Network;
56 class NetworkWorkerThread;
57 class FragmentedSendManager;
58 
59 #ifdef _MSC_VER
60 struct FragmentedSendManager::FragmentedTransfer;
61 #endif
62 
63 /// Stores information about an established MessageConnection.
64 struct ConnectionStatistics
65 {
66 	/// Remembers a ping request that was sent to the other end.
67 	struct PingTrack
68 	{
69 		tick_t pingSentTick;  ///< Timestamp of when the PingRequest was sent.
70 		tick_t pingReplyTick; ///< If replyReceived==true, contains the timestamp of when PingReply was received as a response.
71 		unsigned long pingID;      ///< ID of this ping message.
72 		bool replyReceived;        ///< True of PingReply has already been received for this.
73 	};
74 	/// Contains an entry for each recently performed Ping operation, sorted by age (oldest first).
75 	std::vector<PingTrack> ping;
76 
77 	/// Remembers both in- and outbound traffic events on the socket.
78 	struct TrafficTrack
79 	{
80 		tick_t tick;          ///< Denotes when this event occurred.
81 		unsigned long packetsIn;   ///< The number of datagrams in when this event occurred.
82 		unsigned long packetsOut;  ///< The number of datagrams out when this event occurred.
83 		unsigned long messagesIn;  ///< The total number of messages the received datagrams contained.
84 		unsigned long messagesOut; ///< The total number of messages the sent datagrams contained.
85 		unsigned long bytesIn;     ///< The total number of bytes the received datagrams contained.
86 		unsigned long bytesOut;    ///< The total number of bytes the sent datagrams contained.
87 	};
88 	/// Contains an entry for each recent traffic event (data in/out) on the connection, sorted by age (oldest first).
89 	std::vector<TrafficTrack> traffic;
90 
91 	/// Remembers the send/receive time of a datagram with a certain ID.
92 	struct DatagramIDTrack
93 	{
94 		tick_t tick;
95 		packet_id_t packetID;
96 	};
97 	/// Contains an entry for each recently received packet, sorted by age (oldest first).
98 	std::vector<DatagramIDTrack> recvPacketIDs;
99 };
100 
101 /// Comparison object that sorts the two messages by their priority (higher priority/smaller number first).
102 class NetworkMessagePriorityCmp
103 {
104 public:
operator()105 	int operator ()(const NetworkMessage *a, const NetworkMessage *b)
106 	{
107 		assert(a && b);
108 		if (a->priority < b->priority) return -1;
109 		if (b->priority < a->priority) return 1;
110 
111 		if (a->MessageNumber() < b->MessageNumber()) return 1;
112 		if (b->MessageNumber() < a->MessageNumber()) return -1;
113 
114 		return 0;
115 	}
116 };
117 
118 /// Represents the current state of the connection.
119 enum ConnectionState
120 {
121 	ConnectionPending, ///< Waiting for the other end to send an acknowledgement packet to form the connection. No messages may yet be sent or received at this state.
122 	ConnectionOK,      ///< The connection is bidirectionally open, for both reading and writing. (readOpen=true, writeOpen=true)
123 	ConnectionDisconnecting, ///< We are closing the connection down. Cannot send any more messages, but can still receive. (readOpen=true, writeOpen=false)
124 	ConnectionPeerClosed, ///< The other end has closed the connection. No new messages will be received, but can still send messages. (readOpen=false, writeOpen=true)
125 	ConnectionClosed    ///< The socket is no longer open. A MessageConnection object in this state cannot be reused to open a new connection, but a new connection object must be created.
126 };
127 
128 /// Returns a textual representation of a ConnectionState.
129 std::string ConnectionStateToString(ConnectionState state);
130 
131 // Prevent confusion with Win32 functions
132 #ifdef SendMessage
133 #undef SendMessage
134 #endif
135 
136 /// Represents a single established network connection. MessageConnection maintains its own worker thread that manages
137 /// connection control, the scheduling and prioritization of outbound messages, and receiving inbound messages.
138 class MessageConnection : public RefCountable
139 {
140 public:
141 	virtual ~MessageConnection();
142 
143 	/// Returns the current connection state.
144 	ConnectionState GetConnectionState() const; // [main and worker thread]
145 
146 	/// Returns true if the peer has signalled it will not send any more data (the connection is half-closed or full-closed).
147 	bool IsReadOpen() const; // [main and worker thread]
148 
149 	/// Returns true if we have signalled not to send any more data (the connection is half-closed or full-closed).
150 	bool IsWriteOpen() const; // [main and worker thread]
151 
152 	/// Returns true if the connection is in the ConnectionPending state and waiting for the other end to resolve/establish the connection.
153 	/// When this function returns false, the connection may be half-open, bidirectionally open, timed out on ConnectionPending, or closed.
154 	bool IsPending() const; // [main and worker thread]
155 
156 	/// Returns true if this socket is connected, i.e. at least half-open in one way.
Connected()157 	bool Connected() const { return IsReadOpen() || IsWriteOpen(); } // [main and worker thread]
158 
159 	/// Runs a modal processing loop and produces events for all inbound received data. Returns when the connection is closed.
160 	/// This is an example function mostly useful only for very simple demo applications. In most cases,
161 	/// you do not want to call this.
162 	void RunModalClient(); // [main thread]
163 
164 	/// Blocks for the given amount of time until the connection has transitioned away from ConnectionPending state.
165 	/// @param maxMSecstoWait A positive value that indicates the maximum time to wait until returning.
166 	/// @return If the connection was successfully opened, this function returns true. Otherwise returns false, and
167 	///         either timeout was encountered and the other end has not acknowledged the connection,
168 	///         or the connection is in ConnectionClosed state.
169 	bool WaitToEstablishConnection(int maxMSecsToWait = 500); // [main thread]
170 
171 	/// Starts a benign disconnect procedure. Transitions ConnectionState to ConnectionDisconnecting. This
172 	/// function will block until the given period expires or the other end acknowledges and also closes
173 	/// down the connection. Currently no guarantee is given for whether previous reliable messages will
174 	/// safely reach the destination. To ensure this, do a manual wait to flush the outbound message queue
175 	/// before disconnecting.
176 	/// @param maxMSecsToWait A positive number that indicates the maximum time to wait for a disconnect
177 	///                       acknowledgement message until returning.
178 	///                       If 0 is passed, the function will send the Disconnect message and return immediately.
179 	/// When this function returns, the connection may either be in ConnectionClosing or ConnectionClosed
180 	/// state, depending on whether the other end has already acknowledged the disconnection.
181 	/// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
182 	void Disconnect(int maxMSecsToWait = 500); // [main thread]
183 
184 	/// Starts a forceful disconnect procedure.
185 	/// @param maxMSecsToWait If a positive number, Disconnect message will be sent to the peer and if no response
186 	///                       is received in the given time period, the connection is forcefully closed.
187 	///                       If 0, no Disconnect message will be sent at all, but the connection is torn down
188 	///                       and the function returns immediately. The other end will remain hanging and will timeout.
189 	/// When this function returns, the connection is in ConnectionClosed state.
190 	/// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
191 	void Close(int maxMSecsToWait = 500); // [main thread]
192 
193 	// There are 3 ways to send messages through a MessageConnection:
194 	// StartNewMessage/EndAndQueueMessage, SendStruct, and Send. See below.
195 
196 	/// Start building a new message with the given ID.
197 	/// @param id The ID for the message you will be sending.
198 	/// @param numBytes The number of bytes the body of this message will be. This function will pre-allocate the
199 	///                 NetworkMessage::data field to hold at least that many bytes (Capacity() can also return a larger value).
200 	///                 This number only needs to be an estimate, since you can later on call NetworkMessage::Reserve()
201 	///                 to reallocate the message memory. If you pass in the default value 0, no pre-allocation will be performed.
202 	/// @return The NetworkMessage object that represents the new message to be built. This message is dynamically allocated
203 	///         from an internal pool of NetworkMessage blocks. For each NetworkMessage pointer obtained, call
204 	///         EndAndQueueMessage when you have finished building the message to commit the network send and to release the memory.
205 	///         Alternatively, if after calling StartNewMessage, you decide to abort the network send, free up the NetworkMessage
206 	///         by calling this->FreeMessage().
207 	NetworkMessage *StartNewMessage(unsigned long id, size_t numBytes = 0); // [main and worker thread]
208 
209 	/// Finishes building the message and submits it to the outbound send queue.
210 	/// @param msg The message to send. After calling this function, this pointer should be considered freed and may not be
211 	///            dereferenced or passed to any other member function of this MessageConnection. Only pass in here
212 	///            NetworkMessage pointers obtained by a call to StartNewMessage() of the same MessageConnection instance.
213 	/// @param numBytes Specify here the number of actual bytes you filled in into the msg.data field. A size of 0
214 	///                 is valid, and can be used in cases the message ID itself is the whole message. Passing in the default
215 	///                 value of this parameter will use the size value that was specified in the call to StartNewMessage().
216 	/// @param internalQueue If true, specifies that this message was submitted from the network worker thread and not the application
217 	///                 thread. Pass in the value 'false' here in the client application, or there is a chance of a race condition.
218 	void EndAndQueueMessage(NetworkMessage *msg, size_t numBytes = (size_t)(-1), bool internalQueue = false); // [main and worker thread]
219 
220 	/// This is a conveniency function to access the above StartNewMessage/EndAndQueueMessage pair. The performance of this
221 	/// function call is not as good, since a memcpy of the message will need to be made. For performance-critical messages,
222 	/// it is better to craft the message directly into the buffer area provided by StartNewMessage.
223 	void SendMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority, unsigned long contentID,
224 	                 const char *data, size_t numBytes); // [main thread]
225 
226 	/// Sends a message using a serializable structure.
227 	template<typename SerializableData>
228 	void SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
229 		bool reliable, unsigned long priority, unsigned long contentID = 0); // [main thread]
230 
231 	/// Sends a message using a compiled message structure.
232 	template<typename SerializableMessage>
233 	void Send(const SerializableMessage &data, unsigned long contentID = 0); // [main thread]
234 
235 	/// Stops all outbound sends until ResumeOutboundSends is called. Use if you need to guarantee that some messages be sent in the same datagram.
236 	/// Do not stop outbound sends for long periods, or the other end may time out the connection.
237 	void PauseOutboundSends(); // [main thread]
238 
239 	/// Resumes sending of outbound messages.
240 	void ResumeOutboundSends(); // [main thread]
241 
242 	/// Returns the number of messages that have been received from the network but haven't been handled by the application yet.
NumInboundMessagesPending()243 	size_t NumInboundMessagesPending() const { return inboundMessageQueue.Size(); } // [main and worker thread]
244 
245 	/// Returns the total number of messages pending to be sent out.
NumOutboundMessagesPending()246 	size_t NumOutboundMessagesPending() const { return outboundQueue.Size() + outboundAcceptQueue.Size(); } // [main and worker thread]
247 
248 	/// Returns the number of outbound messages the main thread has queued for the worker thread to send out. (still unaccepted by the worker thread).
OutboundAcceptQueueSize()249 	size_t OutboundAcceptQueueSize() const { return outboundAcceptQueue.Size(); } // [main and worker thread]
250 
251 	/// Returns the number of outbound messages in the worker thread outbound message queue (already accepted and pending a send by the worker thread).
OutboundQueueSize()252 	size_t OutboundQueueSize() const { return outboundQueue.Size(); } // [main and worker thread]
253 
254 	/// Returns the underlying raw socket. [main and worker thread]
GetSocket()255 	Socket *GetSocket() { return socket; }
256 
257 	/// Returns an object that identifies the local endpoint (IP and port) this connection is connected to.
258 	EndPoint LocalEndPoint() const; // [main and worker thread]
259 
260 	/// Returns an object that identifies the remote endpoint (IP and port) this connection is connected to.
261 	EndPoint RemoteEndPoint() const; // [main and worker thread]
262 
263 	/// Sets an upper limit to the data send rate for this connection.
264 	/// The default is not to have an upper limit at all.
265 	/// @param numBytesPerSec The upper limit for the number of bytes to send per second. This limit includes the message header
266 	///                       bytes as well and not just the payload. Set to 0 to force no limit.
267 	/// @param numDatagramsPerSec The maximum number of datagrams (UDP packets) to send per second. Set to 0 to force no limit.
268 	///                       If the connection is operating on top of TCP, this field has no effect.
269 	///\todo Implement.
270 	void SetMaximumDataSendRate(int numBytesPerSec, int numDatagramsPerSec);
271 
272 	/// Registers a new listener object for the events of this connection.
273 	void RegisterInboundMessageHandler(IMessageHandler *handler); // [main thread]
274 
275 	/// Fetches all newly received messages waiting in the inbound queue, and passes each of these
276 	/// to the message handler registered using RegisterInboundMessageHandler.
277 	/// Call this function periodically to receive new data from the network if you are using the Observer pattern.
278 	/// Alternatively, use the immediate-mode ReceiveMessage function to receive messages directly one at a time.
279 	/// @param maxMessageToProcess If the inbound queue contains more than this amount of new messages,
280 	///                            the processing loop will return to give processing time to other parts of the application.
281 	///                            If 0 is passed, messages are processed until the queue is empty.
282 	/// \note It is important to have a non-zero limit in maxMessagesToProcess (unless you're sure what you are doing), since
283 	///       otherwise an attacker might affect the performance of the application main loop by sending messages so fast that
284 	///       the queue never has time to exhaust, thus giving an infinite loop in practice.
285 	void Process(int maxMessagesToProcess = 100); // [main thread]
286 
287 	/// Waits for at most the given amount of time until a new message is received for processing.
288 	/// @param maxMSecsToWait If 0, the call will wait indefinitely until a message is received or the connection transitions to
289 	///                       closing state.
290 	///                       If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
291 	void WaitForMessage(int maxMSecsToWait); // [main thread]
292 
293 	/// Returns the next message in the inbound queue. This is an alternative API to RegisterInboundMessageHandler/Process.
294 	/// \note When using this function to receive messages, remember to call FreeMessage for each NetworkMessage returned, or you
295 	/// will have a major size memory leak, fast.
296 	/// @param maxMSecsToWait If a negative number, the call will not wait at all if there are no new messages to process, but
297 	///                       returns 0 immediately.
298 	///                       If 0, the call will wait indefinitely until a message is received or the connection transitions to
299 	///                       closing state.
300 	///                       If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
301 	/// @return A newly allocated object to the received message, or 0 if the queue was empty and no messages were received during
302 	///         the wait period, or if the connection transitioned to closing state. When you are finished reading the message,
303 	///         call FreeMessage for the returned pointer.
304 	NetworkMessage *ReceiveMessage(int maxMSecsToWait = -1); // [main thread]
305 
306 	/// Frees up a NetworkMessage struct when it is no longer needed.
307 	/// You need to call this for each message that you received from a call to ReceiveMessage.
308 	void FreeMessage(NetworkMessage *msg); // [main and worker thread]
309 
310 	/// Returns a single-line message describing the connection state.
311 	std::string ToString() const; // [main and worker thread]
312 
313 	/// Dumps a long multi-line status message of this connection state to stdout.
314 	void DumpStatus() const; // [main thread]
315 
316 	// MessageConnection Statistics -related functions:
317 
318 	/// Returns the estimated RTT of the connection, in milliseconds. RTT is the time taken to communicate a message from client->host->client.
RoundTripTime()319 	float RoundTripTime() const { return rtt; } // [main and worker thread]
320 
321 	/// Returns the number of milliseconds since we last received data from the socket.
LastHeardTime()322 	float LastHeardTime() const { return Clock::TicksToMillisecondsF(Clock::TicksInBetween(Clock::Tick(), lastHeardTime)); } // [main and worker thread]
323 
PacketsInPerSec()324 	float PacketsInPerSec() const { return packetsInPerSec; } // [main and worker thread]
PacketsOutPerSec()325 	float PacketsOutPerSec() const { return packetsOutPerSec; } // [main and worker thread]
MsgsInPerSec()326 	float MsgsInPerSec() const { return msgsInPerSec; } // [main and worker thread]
MsgsOutPerSec()327 	float MsgsOutPerSec() const { return msgsOutPerSec; } // [main and worker thread]
BytesInPerSec()328 	float BytesInPerSec() const { return bytesInPerSec; } // [main and worker thread]
BytesOutPerSec()329 	float BytesOutPerSec() const { return bytesOutPerSec; } // [main and worker thread]
330 
331 	/// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been received from this connection.
BytesInTotal()332 	u64 BytesInTotal() const { return bytesInTotal; } // [main and worker thread]
333 
334 	/// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been sent from this connection.
BytesOutTotal()335 	u64 BytesOutTotal() const { return bytesOutTotal; } // [main and worker thread]
336 
337 	/// Returns the simulator object which can be used to apply network condition simulations to this connection.
NetworkSendSimulator()338 	NetworkSimulator &NetworkSendSimulator() { return networkSendSimulator; }
339 
340 	/// Stores all the statistics about the current connection. This data is periodically recomputed
341 	/// by the network worker thread and shared to the client through a lock.
342 	Lockable<ConnectionStatistics> statistics; // [main and worker thread]
343 
344 protected:
345 	friend class NetworkWorkerThread;
346 
347 	/// The Network object inside which this MessageConnection lives.
348 	Network *owner; // [set and read only by the main thread]
349 
350 	/// If this MessageConnection represents a client connection on the server side, this gives the owner.
351 	NetworkServer *ownerServer; // [set and read only by the main thread]
352 
353 	/// Stores the thread that manages the background processing of this connection. The same thread can manage multiple
354 	/// connections and servers, and not just this one.
355 	NetworkWorkerThread *workerThread; // [set and read only by worker thread]
356 
357 #ifdef KNET_THREAD_CHECKING_ENABLED
358 	/// In debug mode, we track and enforce thread safety constraints through this ID.
359 	ThreadId workerThreadId; // [set by worker thread on thread startup, read by both main and worker thread]
360 #endif
361 
362 	/// Performs a check that asserts that the current thread of execution is in the network worker thread.
363 	void AssertInWorkerThreadContext() const; // [main and worker thread]
364 
365 	/// Performs a check that asserts that the current thread of execution is not in the network worker thread (it is the main thread).
366 	void AssertInMainThreadContext() const; // [main and worker thread]
367 
368 	/// Returns true if this MessageConnection is associated with a NetworkWorkerThread to maintain.
IsWorkerThreadRunning()369 	bool IsWorkerThreadRunning() const { return workerThread != 0; } // [main and worker thread]
370 
371 	/// A queue populated by the main thread to give out messages to the MessageConnection work thread to process.
372 	WaitFreeQueue<NetworkMessage*> outboundAcceptQueue; // [produced by main thread, consumed by worker thread]
373 
374 	/// A queue populated by the networking thread to hold all the incoming messages until the application can process them.
375 	WaitFreeQueue<NetworkMessage*> inboundMessageQueue; // [produced by worker thread, consumed by main thread]
376 
377 	/// A priority queue that maintains in order all the messages that are going out the pipe.
378 	///\todo Make the choice of which of the following structures to use a runtime option.
379 #ifndef KNET_NO_MAXHEAP // If defined, disables message priorization feature to improve client-side CPU performance. By default disabled.
380 	MaxHeap<NetworkMessage*, NetworkMessagePriorityCmp> outboundQueue; // [worker thread]
381 #else
382 	WaitFreeQueue<NetworkMessage*> outboundQueue; // [worker thread]
383 #endif
384 
385 	/// Tracks all the message sends that are fragmented.
386 	Lockable<FragmentedSendManager> fragmentedSends; // [worker thread]
387 
388 	/// Tracks all the receives of fragmented messages and helps reconstruct the original messages from fragments.
389 	FragmentedReceiveManager fragmentedReceives; // [worker thread]
390 
391 	/// Allocations of NetworkMessage structures go through a pool to avoid dynamic new/delete calls when sending messages.
392 	/// This structure is shared between the main and worker thread through a lockfree construct.
393 	LockFreePoolAllocator<NetworkMessage> messagePool; // [main and worker thread]
394 
395 	/// Tracks when it is time to send the next PingRequest to the peer.
396 	PolledTimer pingTimer; // [worker thread]
397 
398 	/// Tracks when it is time to update the statistics structure.
399 	PolledTimer statsRefreshTimer; // [worker thread]
400 
401 	/// Specifies the return value for the functions that send out network packets.
402 	enum PacketSendResult
403 	{
404 		PacketSendOK, ///< The operating system signalled the packet was successfully sent.
405 		PacketSendSocketClosed, ///< The packet could not be sent, since the socket was closed.
406 		PacketSendSocketFull, ///< The packet could not be sent, since the OS outbound buffer was full.
407 		PacketSendNoMessages, ///< A packet could not be sent, since there was nothing to send.
408 		PacketSendThrottled   ///< The packet could not be sent right now, since a throttle timer is in effect.
409 	};
410 
411 	/// Serializes several messages into a single UDP/TCP packet and sends it out to the wire.
412 	virtual PacketSendResult SendOutPacket() = 0; // [worker thread]
413 
414 	/// Sends out as many packets at one go as is allowed by the current send rate of the connection.
415 	virtual void SendOutPackets() = 0; // [worker thread]
416 
417 	/// Returns how many milliseconds need to be waited before this socket can try sending data the next time.
418 	virtual unsigned long TimeUntilCanSendPacket() const = 0; // [worker thread]
419 
420 	/// Performs the internal work tick that updates this connection.
421 	void UpdateConnection(); // [worker thread]
422 
423 	/// Overridden by a subclass of MessageConnection to do protocol-specific updates (private implementation -pattern)
DoUpdateConnection()424 	virtual void DoUpdateConnection() {} // [worker thread]
425 
426 	/// Marks that the peer has closed the connection and will not send any more application-level data.
427 	void SetPeerClosed(); // [worker thread]
428 
DumpConnectionStatus()429 	virtual void DumpConnectionStatus() const {} // [main thread]
430 
431 	/// Posted when the application has pushed us some messages to handle.
432 	Event NewOutboundMessagesEvent() const; // [main and worker thread]
433 
434 	/// Specifies the result of a Socket read activity.
435 	enum SocketReadResult
436 	{
437 		SocketReadOK,        ///< All data was read from the socket and it is empty for now.
438 		SocketReadError,     ///< An error occurred - probably the connection is dead.
439 		SocketReadThrottled, ///< There was so much data to read that we need to pause and make room for sends as well.
440 	};
441 
442 	/// Reads all the new bytes available in the socket.
443 	/// This data will be read into the connection's internal data queue, where it will be
444 	/// parsed to messages.
445 	/// @param bytesRead [out] This field will get the number of bytes successfully read.
446 	/// @return The return code of the operation.
447 	virtual SocketReadResult ReadSocket(size_t &bytesRead) = 0; // [worker thread]
448 
449 	SocketReadResult ReadSocket(); // [worker thread]
450 
451 	/// Sets the worker thread object that will handle this connection.
452 	void SetWorkerThread(NetworkWorkerThread *thread); // [main thread]
453 
WorkerThread()454 	NetworkWorkerThread *WorkerThread() const { return workerThread; }
455 
456 	void HandleInboundMessage(packet_id_t packetID, const char *data, size_t numBytes); // [worker thread]
457 
458 	/// Allocates a new NetworkMessage struct. [both worker and main thread]
459 	NetworkMessage *AllocateNewMessage();
460 
461 	// Ping/RTT management operations:
462 	void SendPingRequestMessage(bool internalQueue); // [main or worker thread]
463 
464 	void HandlePingRequestMessage(const char *data, size_t numBytes); // [worker thread]
465 
466 	void HandlePingReplyMessage(const char *data, size_t numBytes); // [worker thread]
467 
468 	// Frees all internal dynamically allocated message data.
469 	void FreeMessageData(); // [main thread]
470 
471 	/// Checks if the connection has been silent too long and has now timed out.
472 	void DetectConnectionTimeOut(); // [worker thread]
473 
474 	/// Refreshes RTT and other connection related statistics.
475 	void ComputeStats(); // [worker thread]
476 
477 	/// Adds a new entry for outbound data statistics.
478 	void AddOutboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
479 
480 	/// Adds a new entry for inbound data statistics.
481 	void AddInboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
482 
483 	/// Pulls in all new messages from the main thread to the worker thread side and admits them to the send priority queue.
484 	void AcceptOutboundMessages(); // [worker thread]
485 
486 	/// Starts the socket-specific disconnection procedure.
487 	virtual void PerformDisconnection() = 0;
488 
489 	/// The object that receives notifications of all received data.
490 	IMessageHandler *inboundMessageHandler; // [main thread]
491 
492 	/// The underlying socket on top of which this connection operates.
493 	Socket *socket; // [set by main thread before the worker thread is running. Read-only when worker thread is running. Read by main and worker thread]
494 
495 	/// Specifies the current connection state.
496 	ConnectionState connectionState; // [main and worker thread]
497 
498 	/// If true, all sends to the socket are on hold, until ResumeOutboundSends() is called.
499 	bool bOutboundSendsPaused; // [set by main thread, read by worker thread]
500 
501 	friend class NetworkServer;
502 	friend class Network;
503 
504 	/// Posted when the application has pushed us some messages to handle.
505 	Event eventMsgsOutAvailable; // [main and worker thread]
506 
507 	float rtt; ///< The currently estimated round-trip time, in milliseconds. [main and worker thread]
508 	tick_t lastHeardTime; ///< The tick since last successful receive from the socket. [main and worker thread]
509 	float packetsInPerSec; ///< The average number of datagrams we are receiving/second. [main and worker thread]
510 	float packetsOutPerSec; ///< The average number of datagrams we are sending/second. [main and worker thread]
511 	float msgsInPerSec; ///< The average number of kNet messages we are receiving/second. [main and worker thread]
512 	float msgsOutPerSec; ///< The average number of kNet messages we are sending/second. [main and worker thread]
513 	float bytesInPerSec; ///< The average number of bytes we are receiving/second. This includes kNet headers. [main and worker thread]
514 	float bytesOutPerSec; ///< The average number of bytes we are sending/second. This includes kNet headers. [main and worker thread]
515 	u64 bytesInTotal;
516 	u64 bytesOutTotal;
517 
518 	/// Stores the current settigns related to network conditions testing.
519 	/// By default, the simulator is disabled.
520 	NetworkSimulator networkSendSimulator;
521 
522 	/// A running number attached to each outbound message (not present in network stream) to
523 	/// break ties when deducing which message should come before which.
524 	unsigned long outboundMessageNumberCounter; // [worker thread]
525 
526 	/// A running number that is assigned to each outbound reliable message. This is used to
527 	/// enforce proper ordering of ordered messages.
528 	unsigned long outboundReliableMessageNumberCounter; // [worker thread]
529 
530 	/// A (messageID, contentID) pair.
531 	typedef std::pair<u32, u32> MsgContentIDPair;
532 
533 	typedef std::map<MsgContentIDPair, std::pair<packet_id_t, tick_t> > ContentIDReceiveTrack;
534 
535 	/// Each (messageID, contentID) pair has a packetID "stamp" associated to them to track
536 	/// and decimate out-of-order received obsoleted messages.
537 	ContentIDReceiveTrack inboundContentIDStamps; // [worker thread]
538 
539 	typedef std::map<MsgContentIDPair, NetworkMessage*> ContentIDSendTrack;
540 
541 	ContentIDSendTrack outboundContentIDMessages; // [worker thread]
542 
543 	void CheckAndSaveOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
544 
545 	void ClearOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
546 
547 	/// Checks whether the given (messageID, contentID)-pair is already out-of-date and obsoleted
548 	/// by a newer packet and should not be processed.
549 	/// @return True if the packet should be processed (there was no superceding record), and
550 	///         false if the packet is old and should be discarded.
551 	bool CheckAndSaveContentIDStamp(message_id_t messageID, u32 contentID, packet_id_t packetID); // [worker thread]
552 
553 	void SplitAndQueueMessage(NetworkMessage *message, bool internalQueue, size_t maxFragmentSize); // [main and worker thread]
554 
555 	static const unsigned long MsgIdPingRequest = 1;
556 	static const unsigned long MsgIdPingReply = 2;
557 	static const unsigned long MsgIdFlowControlRequest = 3;
558 	static const unsigned long MsgIdPacketAck = 4;
559 	static const unsigned long MsgIdDisconnect = 0x3FFFFFFF;
560 	static const unsigned long MsgIdDisconnectAck = 0x3FFFFFFE;
561 
562 	/// Private ctor - MessageConnections are instantiated by Network and NetworkServer classes.
563 	explicit MessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState);
564 
HandleMessage(packet_id_t,message_id_t,const char *,size_t)565 	virtual bool HandleMessage(packet_id_t /*packetID*/, message_id_t /*messageID*/, const char * /*data*/, size_t /*numBytes*/) { return false; } // [main thread]
566 
567 private:
568 	void operator=(const MessageConnection &); ///< Noncopyable, N/I.
569 	MessageConnection(const MessageConnection &); ///< Noncopyable, N/I.
570 };
571 
572 template<typename SerializableData>
SendStruct(const SerializableData & data,unsigned long id,bool inOrder,bool reliable,unsigned long priority,unsigned long contentID)573 void MessageConnection::SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
574 		bool reliable, unsigned long priority, unsigned long contentID)
575 {
576 	AssertInMainThreadContext();
577 
578 	const size_t dataSize = data.Size();
579 
580 	NetworkMessage *msg = StartNewMessage(id, dataSize);
581 
582 	if (dataSize > 0)
583 	{
584 		DataSerializer mb(msg->data, dataSize);
585 		data.SerializeTo(mb);
586 		assert(mb.BytesFilled() == dataSize); // The SerializableData::Size() estimate must be exact!
587 	}
588 
589 	msg->id = id;
590 	msg->contentID = contentID;
591 	msg->inOrder = inOrder;
592 	msg->priority = priority;
593 	msg->reliable = reliable;
594 #ifdef KNET_NETWORK_PROFILING
595 	char str[512];
596 	sprintf(str, "%s (%u)", SerializableData::Name(), (unsigned int)id);
597 	msg->profilerName = str;
598 #endif
599 
600 	EndAndQueueMessage(msg);
601 }
602 
603 template<typename SerializableMessage>
Send(const SerializableMessage & data,unsigned long contentID)604 void MessageConnection::Send(const SerializableMessage &data, unsigned long contentID)
605 {
606 	SendStruct(data, SerializableMessage::messageID, data.inOrder, data.reliable, data.priority, contentID);
607 }
608 
609 } // ~kNet
610