1 /* Copyright The kNet Project.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License. */
14 #pragma once
15
16 /** @file MessageConnection.h
17 @brief The MessageConnection and ConnectionStatistics classes. */
18
19 // Modified by Lasse Oorni for Urho3D
20
21 #include <vector>
22 #include <map>
23 #include <utility>
24 #include <set>
25
26 #include "kNetBuildConfig.h"
27 // Urho3D: include Socket.h first to make sure WS2Include.h is included before windows.h / winsock.h
28 #include "Socket.h"
29 #include "WaitFreeQueue.h"
30 #include "LockFreePoolAllocator.h"
31 #include "Lockable.h"
32 #include "NetworkSimulator.h"
33 #include "IMessageHandler.h"
34 #include "BasicSerializedDataTypes.h"
35 #include "Datagram.h"
36 #include "FragmentedTransferManager.h"
37 #include "NetworkMessage.h"
38 #include "Event.h"
39 #include "DataSerializer.h"
40 #include "DataDeserializer.h"
41
42 #include "MaxHeap.h"
43 #include "Clock.h"
44 #include "PolledTimer.h"
45 #include "Thread.h"
46 #include "Types.h"
47
48 namespace kNet
49 {
50
51 class MessageConnection;
52 class UDPMessageConnection;
53 class TCPMessageConnection;
54 class NetworkServer;
55 class Network;
56 class NetworkWorkerThread;
57 class FragmentedSendManager;
58
59 #ifdef _MSC_VER
60 struct FragmentedSendManager::FragmentedTransfer;
61 #endif
62
63 /// Stores information about an established MessageConnection.
64 struct ConnectionStatistics
65 {
66 /// Remembers a ping request that was sent to the other end.
67 struct PingTrack
68 {
69 tick_t pingSentTick; ///< Timestamp of when the PingRequest was sent.
70 tick_t pingReplyTick; ///< If replyReceived==true, contains the timestamp of when PingReply was received as a response.
71 unsigned long pingID; ///< ID of this ping message.
72 bool replyReceived; ///< True of PingReply has already been received for this.
73 };
74 /// Contains an entry for each recently performed Ping operation, sorted by age (oldest first).
75 std::vector<PingTrack> ping;
76
77 /// Remembers both in- and outbound traffic events on the socket.
78 struct TrafficTrack
79 {
80 tick_t tick; ///< Denotes when this event occurred.
81 unsigned long packetsIn; ///< The number of datagrams in when this event occurred.
82 unsigned long packetsOut; ///< The number of datagrams out when this event occurred.
83 unsigned long messagesIn; ///< The total number of messages the received datagrams contained.
84 unsigned long messagesOut; ///< The total number of messages the sent datagrams contained.
85 unsigned long bytesIn; ///< The total number of bytes the received datagrams contained.
86 unsigned long bytesOut; ///< The total number of bytes the sent datagrams contained.
87 };
88 /// Contains an entry for each recent traffic event (data in/out) on the connection, sorted by age (oldest first).
89 std::vector<TrafficTrack> traffic;
90
91 /// Remembers the send/receive time of a datagram with a certain ID.
92 struct DatagramIDTrack
93 {
94 tick_t tick;
95 packet_id_t packetID;
96 };
97 /// Contains an entry for each recently received packet, sorted by age (oldest first).
98 std::vector<DatagramIDTrack> recvPacketIDs;
99 };
100
101 /// Comparison object that sorts the two messages by their priority (higher priority/smaller number first).
102 class NetworkMessagePriorityCmp
103 {
104 public:
operator()105 int operator ()(const NetworkMessage *a, const NetworkMessage *b)
106 {
107 assert(a && b);
108 if (a->priority < b->priority) return -1;
109 if (b->priority < a->priority) return 1;
110
111 if (a->MessageNumber() < b->MessageNumber()) return 1;
112 if (b->MessageNumber() < a->MessageNumber()) return -1;
113
114 return 0;
115 }
116 };
117
118 /// Represents the current state of the connection.
119 enum ConnectionState
120 {
121 ConnectionPending, ///< Waiting for the other end to send an acknowledgement packet to form the connection. No messages may yet be sent or received at this state.
122 ConnectionOK, ///< The connection is bidirectionally open, for both reading and writing. (readOpen=true, writeOpen=true)
123 ConnectionDisconnecting, ///< We are closing the connection down. Cannot send any more messages, but can still receive. (readOpen=true, writeOpen=false)
124 ConnectionPeerClosed, ///< The other end has closed the connection. No new messages will be received, but can still send messages. (readOpen=false, writeOpen=true)
125 ConnectionClosed ///< The socket is no longer open. A MessageConnection object in this state cannot be reused to open a new connection, but a new connection object must be created.
126 };
127
128 /// Returns a textual representation of a ConnectionState.
129 std::string ConnectionStateToString(ConnectionState state);
130
131 // Prevent confusion with Win32 functions
132 #ifdef SendMessage
133 #undef SendMessage
134 #endif
135
136 /// Represents a single established network connection. MessageConnection maintains its own worker thread that manages
137 /// connection control, the scheduling and prioritization of outbound messages, and receiving inbound messages.
138 class MessageConnection : public RefCountable
139 {
140 public:
141 virtual ~MessageConnection();
142
143 /// Returns the current connection state.
144 ConnectionState GetConnectionState() const; // [main and worker thread]
145
146 /// Returns true if the peer has signalled it will not send any more data (the connection is half-closed or full-closed).
147 bool IsReadOpen() const; // [main and worker thread]
148
149 /// Returns true if we have signalled not to send any more data (the connection is half-closed or full-closed).
150 bool IsWriteOpen() const; // [main and worker thread]
151
152 /// Returns true if the connection is in the ConnectionPending state and waiting for the other end to resolve/establish the connection.
153 /// When this function returns false, the connection may be half-open, bidirectionally open, timed out on ConnectionPending, or closed.
154 bool IsPending() const; // [main and worker thread]
155
156 /// Returns true if this socket is connected, i.e. at least half-open in one way.
Connected()157 bool Connected() const { return IsReadOpen() || IsWriteOpen(); } // [main and worker thread]
158
159 /// Runs a modal processing loop and produces events for all inbound received data. Returns when the connection is closed.
160 /// This is an example function mostly useful only for very simple demo applications. In most cases,
161 /// you do not want to call this.
162 void RunModalClient(); // [main thread]
163
164 /// Blocks for the given amount of time until the connection has transitioned away from ConnectionPending state.
165 /// @param maxMSecstoWait A positive value that indicates the maximum time to wait until returning.
166 /// @return If the connection was successfully opened, this function returns true. Otherwise returns false, and
167 /// either timeout was encountered and the other end has not acknowledged the connection,
168 /// or the connection is in ConnectionClosed state.
169 bool WaitToEstablishConnection(int maxMSecsToWait = 500); // [main thread]
170
171 /// Starts a benign disconnect procedure. Transitions ConnectionState to ConnectionDisconnecting. This
172 /// function will block until the given period expires or the other end acknowledges and also closes
173 /// down the connection. Currently no guarantee is given for whether previous reliable messages will
174 /// safely reach the destination. To ensure this, do a manual wait to flush the outbound message queue
175 /// before disconnecting.
176 /// @param maxMSecsToWait A positive number that indicates the maximum time to wait for a disconnect
177 /// acknowledgement message until returning.
178 /// If 0 is passed, the function will send the Disconnect message and return immediately.
179 /// When this function returns, the connection may either be in ConnectionClosing or ConnectionClosed
180 /// state, depending on whether the other end has already acknowledged the disconnection.
181 /// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
182 void Disconnect(int maxMSecsToWait = 500); // [main thread]
183
184 /// Starts a forceful disconnect procedure.
185 /// @param maxMSecsToWait If a positive number, Disconnect message will be sent to the peer and if no response
186 /// is received in the given time period, the connection is forcefully closed.
187 /// If 0, no Disconnect message will be sent at all, but the connection is torn down
188 /// and the function returns immediately. The other end will remain hanging and will timeout.
189 /// When this function returns, the connection is in ConnectionClosed state.
190 /// \note You may not call this function in middle of StartNewMessage() - EndAndQueueMessage() function calls.
191 void Close(int maxMSecsToWait = 500); // [main thread]
192
193 // There are 3 ways to send messages through a MessageConnection:
194 // StartNewMessage/EndAndQueueMessage, SendStruct, and Send. See below.
195
196 /// Start building a new message with the given ID.
197 /// @param id The ID for the message you will be sending.
198 /// @param numBytes The number of bytes the body of this message will be. This function will pre-allocate the
199 /// NetworkMessage::data field to hold at least that many bytes (Capacity() can also return a larger value).
200 /// This number only needs to be an estimate, since you can later on call NetworkMessage::Reserve()
201 /// to reallocate the message memory. If you pass in the default value 0, no pre-allocation will be performed.
202 /// @return The NetworkMessage object that represents the new message to be built. This message is dynamically allocated
203 /// from an internal pool of NetworkMessage blocks. For each NetworkMessage pointer obtained, call
204 /// EndAndQueueMessage when you have finished building the message to commit the network send and to release the memory.
205 /// Alternatively, if after calling StartNewMessage, you decide to abort the network send, free up the NetworkMessage
206 /// by calling this->FreeMessage().
207 NetworkMessage *StartNewMessage(unsigned long id, size_t numBytes = 0); // [main and worker thread]
208
209 /// Finishes building the message and submits it to the outbound send queue.
210 /// @param msg The message to send. After calling this function, this pointer should be considered freed and may not be
211 /// dereferenced or passed to any other member function of this MessageConnection. Only pass in here
212 /// NetworkMessage pointers obtained by a call to StartNewMessage() of the same MessageConnection instance.
213 /// @param numBytes Specify here the number of actual bytes you filled in into the msg.data field. A size of 0
214 /// is valid, and can be used in cases the message ID itself is the whole message. Passing in the default
215 /// value of this parameter will use the size value that was specified in the call to StartNewMessage().
216 /// @param internalQueue If true, specifies that this message was submitted from the network worker thread and not the application
217 /// thread. Pass in the value 'false' here in the client application, or there is a chance of a race condition.
218 void EndAndQueueMessage(NetworkMessage *msg, size_t numBytes = (size_t)(-1), bool internalQueue = false); // [main and worker thread]
219
220 /// This is a conveniency function to access the above StartNewMessage/EndAndQueueMessage pair. The performance of this
221 /// function call is not as good, since a memcpy of the message will need to be made. For performance-critical messages,
222 /// it is better to craft the message directly into the buffer area provided by StartNewMessage.
223 void SendMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority, unsigned long contentID,
224 const char *data, size_t numBytes); // [main thread]
225
226 /// Sends a message using a serializable structure.
227 template<typename SerializableData>
228 void SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
229 bool reliable, unsigned long priority, unsigned long contentID = 0); // [main thread]
230
231 /// Sends a message using a compiled message structure.
232 template<typename SerializableMessage>
233 void Send(const SerializableMessage &data, unsigned long contentID = 0); // [main thread]
234
235 /// Stops all outbound sends until ResumeOutboundSends is called. Use if you need to guarantee that some messages be sent in the same datagram.
236 /// Do not stop outbound sends for long periods, or the other end may time out the connection.
237 void PauseOutboundSends(); // [main thread]
238
239 /// Resumes sending of outbound messages.
240 void ResumeOutboundSends(); // [main thread]
241
242 /// Returns the number of messages that have been received from the network but haven't been handled by the application yet.
NumInboundMessagesPending()243 size_t NumInboundMessagesPending() const { return inboundMessageQueue.Size(); } // [main and worker thread]
244
245 /// Returns the total number of messages pending to be sent out.
NumOutboundMessagesPending()246 size_t NumOutboundMessagesPending() const { return outboundQueue.Size() + outboundAcceptQueue.Size(); } // [main and worker thread]
247
248 /// Returns the number of outbound messages the main thread has queued for the worker thread to send out. (still unaccepted by the worker thread).
OutboundAcceptQueueSize()249 size_t OutboundAcceptQueueSize() const { return outboundAcceptQueue.Size(); } // [main and worker thread]
250
251 /// Returns the number of outbound messages in the worker thread outbound message queue (already accepted and pending a send by the worker thread).
OutboundQueueSize()252 size_t OutboundQueueSize() const { return outboundQueue.Size(); } // [main and worker thread]
253
254 /// Returns the underlying raw socket. [main and worker thread]
GetSocket()255 Socket *GetSocket() { return socket; }
256
257 /// Returns an object that identifies the local endpoint (IP and port) this connection is connected to.
258 EndPoint LocalEndPoint() const; // [main and worker thread]
259
260 /// Returns an object that identifies the remote endpoint (IP and port) this connection is connected to.
261 EndPoint RemoteEndPoint() const; // [main and worker thread]
262
263 /// Sets an upper limit to the data send rate for this connection.
264 /// The default is not to have an upper limit at all.
265 /// @param numBytesPerSec The upper limit for the number of bytes to send per second. This limit includes the message header
266 /// bytes as well and not just the payload. Set to 0 to force no limit.
267 /// @param numDatagramsPerSec The maximum number of datagrams (UDP packets) to send per second. Set to 0 to force no limit.
268 /// If the connection is operating on top of TCP, this field has no effect.
269 ///\todo Implement.
270 void SetMaximumDataSendRate(int numBytesPerSec, int numDatagramsPerSec);
271
272 /// Registers a new listener object for the events of this connection.
273 void RegisterInboundMessageHandler(IMessageHandler *handler); // [main thread]
274
275 /// Fetches all newly received messages waiting in the inbound queue, and passes each of these
276 /// to the message handler registered using RegisterInboundMessageHandler.
277 /// Call this function periodically to receive new data from the network if you are using the Observer pattern.
278 /// Alternatively, use the immediate-mode ReceiveMessage function to receive messages directly one at a time.
279 /// @param maxMessageToProcess If the inbound queue contains more than this amount of new messages,
280 /// the processing loop will return to give processing time to other parts of the application.
281 /// If 0 is passed, messages are processed until the queue is empty.
282 /// \note It is important to have a non-zero limit in maxMessagesToProcess (unless you're sure what you are doing), since
283 /// otherwise an attacker might affect the performance of the application main loop by sending messages so fast that
284 /// the queue never has time to exhaust, thus giving an infinite loop in practice.
285 void Process(int maxMessagesToProcess = 100); // [main thread]
286
287 /// Waits for at most the given amount of time until a new message is received for processing.
288 /// @param maxMSecsToWait If 0, the call will wait indefinitely until a message is received or the connection transitions to
289 /// closing state.
290 /// If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
291 void WaitForMessage(int maxMSecsToWait); // [main thread]
292
293 /// Returns the next message in the inbound queue. This is an alternative API to RegisterInboundMessageHandler/Process.
294 /// \note When using this function to receive messages, remember to call FreeMessage for each NetworkMessage returned, or you
295 /// will have a major size memory leak, fast.
296 /// @param maxMSecsToWait If a negative number, the call will not wait at all if there are no new messages to process, but
297 /// returns 0 immediately.
298 /// If 0, the call will wait indefinitely until a message is received or the connection transitions to
299 /// closing state.
300 /// If a positive value is passed, at most that many milliseconds is waited for a new message to be received.
301 /// @return A newly allocated object to the received message, or 0 if the queue was empty and no messages were received during
302 /// the wait period, or if the connection transitioned to closing state. When you are finished reading the message,
303 /// call FreeMessage for the returned pointer.
304 NetworkMessage *ReceiveMessage(int maxMSecsToWait = -1); // [main thread]
305
306 /// Frees up a NetworkMessage struct when it is no longer needed.
307 /// You need to call this for each message that you received from a call to ReceiveMessage.
308 void FreeMessage(NetworkMessage *msg); // [main and worker thread]
309
310 /// Returns a single-line message describing the connection state.
311 std::string ToString() const; // [main and worker thread]
312
313 /// Dumps a long multi-line status message of this connection state to stdout.
314 void DumpStatus() const; // [main thread]
315
316 // MessageConnection Statistics -related functions:
317
318 /// Returns the estimated RTT of the connection, in milliseconds. RTT is the time taken to communicate a message from client->host->client.
RoundTripTime()319 float RoundTripTime() const { return rtt; } // [main and worker thread]
320
321 /// Returns the number of milliseconds since we last received data from the socket.
LastHeardTime()322 float LastHeardTime() const { return Clock::TicksToMillisecondsF(Clock::TicksInBetween(Clock::Tick(), lastHeardTime)); } // [main and worker thread]
323
PacketsInPerSec()324 float PacketsInPerSec() const { return packetsInPerSec; } // [main and worker thread]
PacketsOutPerSec()325 float PacketsOutPerSec() const { return packetsOutPerSec; } // [main and worker thread]
MsgsInPerSec()326 float MsgsInPerSec() const { return msgsInPerSec; } // [main and worker thread]
MsgsOutPerSec()327 float MsgsOutPerSec() const { return msgsOutPerSec; } // [main and worker thread]
BytesInPerSec()328 float BytesInPerSec() const { return bytesInPerSec; } // [main and worker thread]
BytesOutPerSec()329 float BytesOutPerSec() const { return bytesOutPerSec; } // [main and worker thread]
330
331 /// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been received from this connection.
BytesInTotal()332 u64 BytesInTotal() const { return bytesInTotal; } // [main and worker thread]
333
334 /// Returns the total number of bytes (excluding IP and TCP/UDP headers) that have been sent from this connection.
BytesOutTotal()335 u64 BytesOutTotal() const { return bytesOutTotal; } // [main and worker thread]
336
337 /// Returns the simulator object which can be used to apply network condition simulations to this connection.
NetworkSendSimulator()338 NetworkSimulator &NetworkSendSimulator() { return networkSendSimulator; }
339
340 /// Stores all the statistics about the current connection. This data is periodically recomputed
341 /// by the network worker thread and shared to the client through a lock.
342 Lockable<ConnectionStatistics> statistics; // [main and worker thread]
343
344 protected:
345 friend class NetworkWorkerThread;
346
347 /// The Network object inside which this MessageConnection lives.
348 Network *owner; // [set and read only by the main thread]
349
350 /// If this MessageConnection represents a client connection on the server side, this gives the owner.
351 NetworkServer *ownerServer; // [set and read only by the main thread]
352
353 /// Stores the thread that manages the background processing of this connection. The same thread can manage multiple
354 /// connections and servers, and not just this one.
355 NetworkWorkerThread *workerThread; // [set and read only by worker thread]
356
357 #ifdef KNET_THREAD_CHECKING_ENABLED
358 /// In debug mode, we track and enforce thread safety constraints through this ID.
359 ThreadId workerThreadId; // [set by worker thread on thread startup, read by both main and worker thread]
360 #endif
361
362 /// Performs a check that asserts that the current thread of execution is in the network worker thread.
363 void AssertInWorkerThreadContext() const; // [main and worker thread]
364
365 /// Performs a check that asserts that the current thread of execution is not in the network worker thread (it is the main thread).
366 void AssertInMainThreadContext() const; // [main and worker thread]
367
368 /// Returns true if this MessageConnection is associated with a NetworkWorkerThread to maintain.
IsWorkerThreadRunning()369 bool IsWorkerThreadRunning() const { return workerThread != 0; } // [main and worker thread]
370
371 /// A queue populated by the main thread to give out messages to the MessageConnection work thread to process.
372 WaitFreeQueue<NetworkMessage*> outboundAcceptQueue; // [produced by main thread, consumed by worker thread]
373
374 /// A queue populated by the networking thread to hold all the incoming messages until the application can process them.
375 WaitFreeQueue<NetworkMessage*> inboundMessageQueue; // [produced by worker thread, consumed by main thread]
376
377 /// A priority queue that maintains in order all the messages that are going out the pipe.
378 ///\todo Make the choice of which of the following structures to use a runtime option.
379 #ifndef KNET_NO_MAXHEAP // If defined, disables message priorization feature to improve client-side CPU performance. By default disabled.
380 MaxHeap<NetworkMessage*, NetworkMessagePriorityCmp> outboundQueue; // [worker thread]
381 #else
382 WaitFreeQueue<NetworkMessage*> outboundQueue; // [worker thread]
383 #endif
384
385 /// Tracks all the message sends that are fragmented.
386 Lockable<FragmentedSendManager> fragmentedSends; // [worker thread]
387
388 /// Tracks all the receives of fragmented messages and helps reconstruct the original messages from fragments.
389 FragmentedReceiveManager fragmentedReceives; // [worker thread]
390
391 /// Allocations of NetworkMessage structures go through a pool to avoid dynamic new/delete calls when sending messages.
392 /// This structure is shared between the main and worker thread through a lockfree construct.
393 LockFreePoolAllocator<NetworkMessage> messagePool; // [main and worker thread]
394
395 /// Tracks when it is time to send the next PingRequest to the peer.
396 PolledTimer pingTimer; // [worker thread]
397
398 /// Tracks when it is time to update the statistics structure.
399 PolledTimer statsRefreshTimer; // [worker thread]
400
401 /// Specifies the return value for the functions that send out network packets.
402 enum PacketSendResult
403 {
404 PacketSendOK, ///< The operating system signalled the packet was successfully sent.
405 PacketSendSocketClosed, ///< The packet could not be sent, since the socket was closed.
406 PacketSendSocketFull, ///< The packet could not be sent, since the OS outbound buffer was full.
407 PacketSendNoMessages, ///< A packet could not be sent, since there was nothing to send.
408 PacketSendThrottled ///< The packet could not be sent right now, since a throttle timer is in effect.
409 };
410
411 /// Serializes several messages into a single UDP/TCP packet and sends it out to the wire.
412 virtual PacketSendResult SendOutPacket() = 0; // [worker thread]
413
414 /// Sends out as many packets at one go as is allowed by the current send rate of the connection.
415 virtual void SendOutPackets() = 0; // [worker thread]
416
417 /// Returns how many milliseconds need to be waited before this socket can try sending data the next time.
418 virtual unsigned long TimeUntilCanSendPacket() const = 0; // [worker thread]
419
420 /// Performs the internal work tick that updates this connection.
421 void UpdateConnection(); // [worker thread]
422
423 /// Overridden by a subclass of MessageConnection to do protocol-specific updates (private implementation -pattern)
DoUpdateConnection()424 virtual void DoUpdateConnection() {} // [worker thread]
425
426 /// Marks that the peer has closed the connection and will not send any more application-level data.
427 void SetPeerClosed(); // [worker thread]
428
DumpConnectionStatus()429 virtual void DumpConnectionStatus() const {} // [main thread]
430
431 /// Posted when the application has pushed us some messages to handle.
432 Event NewOutboundMessagesEvent() const; // [main and worker thread]
433
434 /// Specifies the result of a Socket read activity.
435 enum SocketReadResult
436 {
437 SocketReadOK, ///< All data was read from the socket and it is empty for now.
438 SocketReadError, ///< An error occurred - probably the connection is dead.
439 SocketReadThrottled, ///< There was so much data to read that we need to pause and make room for sends as well.
440 };
441
442 /// Reads all the new bytes available in the socket.
443 /// This data will be read into the connection's internal data queue, where it will be
444 /// parsed to messages.
445 /// @param bytesRead [out] This field will get the number of bytes successfully read.
446 /// @return The return code of the operation.
447 virtual SocketReadResult ReadSocket(size_t &bytesRead) = 0; // [worker thread]
448
449 SocketReadResult ReadSocket(); // [worker thread]
450
451 /// Sets the worker thread object that will handle this connection.
452 void SetWorkerThread(NetworkWorkerThread *thread); // [main thread]
453
WorkerThread()454 NetworkWorkerThread *WorkerThread() const { return workerThread; }
455
456 void HandleInboundMessage(packet_id_t packetID, const char *data, size_t numBytes); // [worker thread]
457
458 /// Allocates a new NetworkMessage struct. [both worker and main thread]
459 NetworkMessage *AllocateNewMessage();
460
461 // Ping/RTT management operations:
462 void SendPingRequestMessage(bool internalQueue); // [main or worker thread]
463
464 void HandlePingRequestMessage(const char *data, size_t numBytes); // [worker thread]
465
466 void HandlePingReplyMessage(const char *data, size_t numBytes); // [worker thread]
467
468 // Frees all internal dynamically allocated message data.
469 void FreeMessageData(); // [main thread]
470
471 /// Checks if the connection has been silent too long and has now timed out.
472 void DetectConnectionTimeOut(); // [worker thread]
473
474 /// Refreshes RTT and other connection related statistics.
475 void ComputeStats(); // [worker thread]
476
477 /// Adds a new entry for outbound data statistics.
478 void AddOutboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
479
480 /// Adds a new entry for inbound data statistics.
481 void AddInboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages); // [worker thread]
482
483 /// Pulls in all new messages from the main thread to the worker thread side and admits them to the send priority queue.
484 void AcceptOutboundMessages(); // [worker thread]
485
486 /// Starts the socket-specific disconnection procedure.
487 virtual void PerformDisconnection() = 0;
488
489 /// The object that receives notifications of all received data.
490 IMessageHandler *inboundMessageHandler; // [main thread]
491
492 /// The underlying socket on top of which this connection operates.
493 Socket *socket; // [set by main thread before the worker thread is running. Read-only when worker thread is running. Read by main and worker thread]
494
495 /// Specifies the current connection state.
496 ConnectionState connectionState; // [main and worker thread]
497
498 /// If true, all sends to the socket are on hold, until ResumeOutboundSends() is called.
499 bool bOutboundSendsPaused; // [set by main thread, read by worker thread]
500
501 friend class NetworkServer;
502 friend class Network;
503
504 /// Posted when the application has pushed us some messages to handle.
505 Event eventMsgsOutAvailable; // [main and worker thread]
506
507 float rtt; ///< The currently estimated round-trip time, in milliseconds. [main and worker thread]
508 tick_t lastHeardTime; ///< The tick since last successful receive from the socket. [main and worker thread]
509 float packetsInPerSec; ///< The average number of datagrams we are receiving/second. [main and worker thread]
510 float packetsOutPerSec; ///< The average number of datagrams we are sending/second. [main and worker thread]
511 float msgsInPerSec; ///< The average number of kNet messages we are receiving/second. [main and worker thread]
512 float msgsOutPerSec; ///< The average number of kNet messages we are sending/second. [main and worker thread]
513 float bytesInPerSec; ///< The average number of bytes we are receiving/second. This includes kNet headers. [main and worker thread]
514 float bytesOutPerSec; ///< The average number of bytes we are sending/second. This includes kNet headers. [main and worker thread]
515 u64 bytesInTotal;
516 u64 bytesOutTotal;
517
518 /// Stores the current settigns related to network conditions testing.
519 /// By default, the simulator is disabled.
520 NetworkSimulator networkSendSimulator;
521
522 /// A running number attached to each outbound message (not present in network stream) to
523 /// break ties when deducing which message should come before which.
524 unsigned long outboundMessageNumberCounter; // [worker thread]
525
526 /// A running number that is assigned to each outbound reliable message. This is used to
527 /// enforce proper ordering of ordered messages.
528 unsigned long outboundReliableMessageNumberCounter; // [worker thread]
529
530 /// A (messageID, contentID) pair.
531 typedef std::pair<u32, u32> MsgContentIDPair;
532
533 typedef std::map<MsgContentIDPair, std::pair<packet_id_t, tick_t> > ContentIDReceiveTrack;
534
535 /// Each (messageID, contentID) pair has a packetID "stamp" associated to them to track
536 /// and decimate out-of-order received obsoleted messages.
537 ContentIDReceiveTrack inboundContentIDStamps; // [worker thread]
538
539 typedef std::map<MsgContentIDPair, NetworkMessage*> ContentIDSendTrack;
540
541 ContentIDSendTrack outboundContentIDMessages; // [worker thread]
542
543 void CheckAndSaveOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
544
545 void ClearOutboundMessageWithContentID(NetworkMessage *msg); // [worker thread]
546
547 /// Checks whether the given (messageID, contentID)-pair is already out-of-date and obsoleted
548 /// by a newer packet and should not be processed.
549 /// @return True if the packet should be processed (there was no superceding record), and
550 /// false if the packet is old and should be discarded.
551 bool CheckAndSaveContentIDStamp(message_id_t messageID, u32 contentID, packet_id_t packetID); // [worker thread]
552
553 void SplitAndQueueMessage(NetworkMessage *message, bool internalQueue, size_t maxFragmentSize); // [main and worker thread]
554
555 static const unsigned long MsgIdPingRequest = 1;
556 static const unsigned long MsgIdPingReply = 2;
557 static const unsigned long MsgIdFlowControlRequest = 3;
558 static const unsigned long MsgIdPacketAck = 4;
559 static const unsigned long MsgIdDisconnect = 0x3FFFFFFF;
560 static const unsigned long MsgIdDisconnectAck = 0x3FFFFFFE;
561
562 /// Private ctor - MessageConnections are instantiated by Network and NetworkServer classes.
563 explicit MessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState);
564
HandleMessage(packet_id_t,message_id_t,const char *,size_t)565 virtual bool HandleMessage(packet_id_t /*packetID*/, message_id_t /*messageID*/, const char * /*data*/, size_t /*numBytes*/) { return false; } // [main thread]
566
567 private:
568 void operator=(const MessageConnection &); ///< Noncopyable, N/I.
569 MessageConnection(const MessageConnection &); ///< Noncopyable, N/I.
570 };
571
572 template<typename SerializableData>
SendStruct(const SerializableData & data,unsigned long id,bool inOrder,bool reliable,unsigned long priority,unsigned long contentID)573 void MessageConnection::SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
574 bool reliable, unsigned long priority, unsigned long contentID)
575 {
576 AssertInMainThreadContext();
577
578 const size_t dataSize = data.Size();
579
580 NetworkMessage *msg = StartNewMessage(id, dataSize);
581
582 if (dataSize > 0)
583 {
584 DataSerializer mb(msg->data, dataSize);
585 data.SerializeTo(mb);
586 assert(mb.BytesFilled() == dataSize); // The SerializableData::Size() estimate must be exact!
587 }
588
589 msg->id = id;
590 msg->contentID = contentID;
591 msg->inOrder = inOrder;
592 msg->priority = priority;
593 msg->reliable = reliable;
594 #ifdef KNET_NETWORK_PROFILING
595 char str[512];
596 sprintf(str, "%s (%u)", SerializableData::Name(), (unsigned int)id);
597 msg->profilerName = str;
598 #endif
599
600 EndAndQueueMessage(msg);
601 }
602
603 template<typename SerializableMessage>
Send(const SerializableMessage & data,unsigned long contentID)604 void MessageConnection::Send(const SerializableMessage &data, unsigned long contentID)
605 {
606 SendStruct(data, SerializableMessage::messageID, data.inOrder, data.reliable, data.priority, contentID);
607 }
608
609 } // ~kNet
610