1 //====== Copyright Valve Corporation, All rights reserved. ====================
2
3 #include "steamnetworkingsockets_snp.h"
4 #include "steamnetworkingsockets_connections.h"
5 #include "crypto.h"
6
7 // memdbgon must be the last include file in a .cpp file!!!
8 #include "tier0/memdbgon.h"
9
10 namespace SteamNetworkingSocketsLib {
11
12 struct SNPAckSerializerHelper
13 {
14 struct Block
15 {
16 // Acks and nacks count to serialize
17 uint32 m_nAck;
18 uint32 m_nNack;
19
20 // What to put in the header if we use this as the
21 // highest numbered block
22 uint32 m_nLatestPktNum; // Lower 32-bits. We might send even fewer bits
23 uint16 m_nEncodedTimeSinceLatestPktNum;
24
25 // Total size of ack data up to this point:
26 // header, all previous blocks, and this block
27 int16 m_cbTotalEncodedSize;
28 };
29
30 enum { k_cbHeaderSize = 5 };
31 enum { k_nMaxBlocks = 64 };
32 int m_nBlocks;
33 int m_nBlocksNeedToAck; // Number of blocks we really need to send now.
34 Block m_arBlocks[ k_nMaxBlocks ];
35
EncodeTimeSinceSteamNetworkingSocketsLib::SNPAckSerializerHelper36 static uint16 EncodeTimeSince( SteamNetworkingMicroseconds usecNow, SteamNetworkingMicroseconds usecWhenSentLast )
37 {
38
39 // Encode time since last
40 SteamNetworkingMicroseconds usecElapsedSinceLast = usecNow - usecWhenSentLast;
41 Assert( usecElapsedSinceLast >= 0 );
42 Assert( usecNow > 0x20000*k_usecAckDelayPrecision ); // We should never have small timestamp values. A timestamp of zero should always be "a long time ago"
43 if ( usecElapsedSinceLast > 0xfffell<<k_nAckDelayPrecisionShift )
44 return 0xffff;
45 return uint16( usecElapsedSinceLast >> k_nAckDelayPrecisionShift );
46 }
47
48 };
49
50 // Fetch ping, and handle two edge cases:
51 // - if we don't have an estimate, just be relatively conservative
52 // - clamp to minimum
GetUsecPingWithFallback(CSteamNetworkConnectionBase * pConnection)53 inline SteamNetworkingMicroseconds GetUsecPingWithFallback( CSteamNetworkConnectionBase *pConnection )
54 {
55 int nPingMS = pConnection->m_statsEndToEnd.m_ping.m_nSmoothedPing;
56 if ( nPingMS < 0 )
57 return 200*1000; // no estimate, just be conservative
58 if ( nPingMS < 1 )
59 return 500; // less than 1ms. Make sure we don't blow up, though, since they are asking for microsecond resolution. We should just keep our pings with microsecond resolution!
60 return nPingMS*1000;
61 }
62
63
Shutdown()64 void SSNPSenderState::Shutdown()
65 {
66 m_unackedReliableMessages.PurgeMessages();
67 m_messagesQueued.PurgeMessages();
68 m_mapInFlightPacketsByPktNum.clear();
69 m_listInFlightReliableRange.clear();
70 m_cbPendingUnreliable = 0;
71 m_cbPendingReliable = 0;
72 m_cbSentUnackedReliable = 0;
73 }
74
75 //-----------------------------------------------------------------------------
RemoveAckedReliableMessageFromUnackedList()76 void SSNPSenderState::RemoveAckedReliableMessageFromUnackedList()
77 {
78
79 // Trim messages from the head that have been acked.
80 // Note that in theory we could have a message in the middle that
81 // has been acked. But it's not worth the time to go looking for them,
82 // just to free up a bit of memory early. We'll get to it once the earlier
83 // messages have been acked.
84 while ( !m_unackedReliableMessages.empty() )
85 {
86 CSteamNetworkingMessage *pMsg = m_unackedReliableMessages.m_pFirst;
87 Assert( pMsg->SNPSend_ReliableStreamPos() > 0 );
88 int64 nReliableEnd = pMsg->SNPSend_ReliableStreamPos() + pMsg->m_cbSize;
89
90 // Are we backing a range that is in flight (and thus we might need
91 // to resend?)
92 if ( !m_listInFlightReliableRange.empty() )
93 {
94 auto head = m_listInFlightReliableRange.begin();
95 Assert( head->first.m_nBegin >= pMsg->SNPSend_ReliableStreamPos() );
96 if ( head->second == pMsg )
97 {
98 Assert( head->first.m_nBegin < nReliableEnd );
99 return;
100 }
101 Assert( head->first.m_nBegin >= nReliableEnd );
102 }
103
104 // Are we backing the next range that is ready for resend now?
105 if ( !m_listReadyRetryReliableRange.empty() )
106 {
107 auto head = m_listReadyRetryReliableRange.begin();
108 Assert( head->first.m_nBegin >= pMsg->SNPSend_ReliableStreamPos() );
109 if ( head->second == pMsg )
110 {
111 Assert( head->first.m_nBegin < nReliableEnd );
112 return;
113 }
114 Assert( head->first.m_nBegin >= nReliableEnd );
115 }
116
117 // We're all done!
118 DbgVerify( m_unackedReliableMessages.pop_front() == pMsg );
119 pMsg->Release();
120 }
121 }
122
123 //-----------------------------------------------------------------------------
SSNPSenderState()124 SSNPSenderState::SSNPSenderState()
125 {
126 // Setup the table of inflight packets with a sentinel.
127 m_mapInFlightPacketsByPktNum.clear();
128 SNPInFlightPacket_t &sentinel = m_mapInFlightPacketsByPktNum[INT64_MIN];
129 sentinel.m_bNack = false;
130 sentinel.m_pTransport = nullptr;
131 sentinel.m_usecWhenSent = 0;
132 m_itNextInFlightPacketToTimeout = m_mapInFlightPacketsByPktNum.end();
133 DebugCheckInFlightPacketMap();
134 }
135
136 #if STEAMNETWORKINGSOCKETS_SNP_PARANOIA > 0
DebugCheckInFlightPacketMap() const137 void SSNPSenderState::DebugCheckInFlightPacketMap() const
138 {
139 Assert( !m_mapInFlightPacketsByPktNum.empty() );
140 bool bFoundNextToTimeout = false;
141 auto it = m_mapInFlightPacketsByPktNum.begin();
142 Assert( it->first == INT64_MIN );
143 Assert( m_itNextInFlightPacketToTimeout != it );
144 int64 prevPktNum = it->first;
145 SteamNetworkingMicroseconds prevWhenSent = it->second.m_usecWhenSent;
146 while ( ++it != m_mapInFlightPacketsByPktNum.end() )
147 {
148 Assert( prevPktNum < it->first );
149 Assert( prevWhenSent <= it->second.m_usecWhenSent );
150 if ( it == m_itNextInFlightPacketToTimeout )
151 {
152 Assert( !bFoundNextToTimeout );
153 bFoundNextToTimeout = true;
154 }
155 prevPktNum = it->first;
156 prevWhenSent = it->second.m_usecWhenSent;
157 }
158 if ( !bFoundNextToTimeout )
159 {
160 Assert( m_itNextInFlightPacketToTimeout == m_mapInFlightPacketsByPktNum.end() );
161 }
162 }
163 #endif
164
165 //-----------------------------------------------------------------------------
SSNPReceiverState()166 SSNPReceiverState::SSNPReceiverState()
167 {
168 // Init packet gaps with a sentinel
169 SSNPPacketGap &sentinel = m_mapPacketGaps[INT64_MAX];
170 sentinel.m_nEnd = INT64_MAX; // Fixed value
171 sentinel.m_usecWhenOKToNack = INT64_MAX; // Fixed value, for when there is nothing left to nack
172 sentinel.m_usecWhenAckPrior = INT64_MAX; // Time when we need to flush a report on all lower-numbered packets
173
174 // Point at the sentinel
175 m_itPendingAck = m_mapPacketGaps.end();
176 --m_itPendingAck;
177 m_itPendingNack = m_itPendingAck;
178 }
179
180 //-----------------------------------------------------------------------------
Shutdown()181 void SSNPReceiverState::Shutdown()
182 {
183 m_mapUnreliableSegments.clear();
184 m_bufReliableStream.clear();
185 m_mapReliableStreamGaps.clear();
186 m_mapPacketGaps.clear();
187 }
188
189 //-----------------------------------------------------------------------------
SNP_InitializeConnection(SteamNetworkingMicroseconds usecNow)190 void CSteamNetworkConnectionBase::SNP_InitializeConnection( SteamNetworkingMicroseconds usecNow )
191 {
192 m_sendRateData.m_usecTokenBucketTime = usecNow;
193 m_sendRateData.m_flTokenBucket = k_flSendRateBurstOverageAllowance;
194
195 SteamNetworkingMicroseconds usecPing = GetUsecPingWithFallback( this );
196
197 /*
198 * Compute the initial sending rate X_init in the manner of RFC 3390:
199 *
200 * X_init = min(4 * s, max(2 * s, 4380 bytes)) / RTT
201 *
202 * Note that RFC 3390 uses MSS, RFC 4342 refers to RFC 3390, and rfc3448bis
203 * (rev-02) clarifies the use of RFC 3390 with regard to the above formula.
204 */
205 Assert( usecPing > 0 );
206 int64 w_init = Clamp( 4380, 2 * k_cbSteamNetworkingSocketsMaxEncryptedPayloadSend, 4 * k_cbSteamNetworkingSocketsMaxEncryptedPayloadSend );
207 m_sendRateData.m_nCurrentSendRateEstimate = int( k_nMillion * w_init / usecPing );
208
209 // Go ahead and clamp it now
210 SNP_ClampSendRate();
211 }
212
213 //-----------------------------------------------------------------------------
SNP_ShutdownConnection()214 void CSteamNetworkConnectionBase::SNP_ShutdownConnection()
215 {
216 m_senderState.Shutdown();
217 m_receiverState.Shutdown();
218 }
219
220 //-----------------------------------------------------------------------------
SNP_SendMessage(CSteamNetworkingMessage * pSendMessage,SteamNetworkingMicroseconds usecNow,bool * pbThinkImmediately)221 int64 CSteamNetworkConnectionBase::SNP_SendMessage( CSteamNetworkingMessage *pSendMessage, SteamNetworkingMicroseconds usecNow, bool *pbThinkImmediately )
222 {
223 // Connection must be locked, but we don't require the global lock here!
224 m_pLock->AssertHeldByCurrentThread();
225
226 int cbData = (int)pSendMessage->m_cbSize;
227
228 // Assume we won't want to wake up immediately
229 if ( pbThinkImmediately )
230 *pbThinkImmediately = false;
231
232 // Check if we're full
233 if ( m_senderState.PendingBytesTotal() + cbData > m_connectionConfig.m_SendBufferSize.Get() )
234 {
235 SpewWarningRateLimited( usecNow, "Connection already has %u bytes pending, cannot queue any more messages\n", m_senderState.PendingBytesTotal() );
236 pSendMessage->Release();
237 return -k_EResultLimitExceeded;
238 }
239
240 // Check if they try to send a really large message
241 if ( cbData > k_cbMaxUnreliableMsgSizeSend && !( pSendMessage->m_nFlags & k_nSteamNetworkingSend_Reliable ) )
242 {
243 SpewWarningRateLimited( usecNow, "Trying to send a very large (%d bytes) unreliable message. Sending as reliable instead.\n", cbData );
244 pSendMessage->m_nFlags |= k_nSteamNetworkingSend_Reliable;
245 }
246
247 if ( pSendMessage->m_nFlags & k_nSteamNetworkingSend_NoDelay )
248 {
249 // FIXME - need to check how much data is currently pending, and return
250 // k_EResultIgnored if we think it's going to be a while before this
251 // packet goes on the wire.
252 }
253
254 // First, accumulate tokens, and also limit to reasonable burst
255 // if we weren't already waiting to send
256 SNP_ClampSendRate();
257 SNP_TokenBucket_Accumulate( usecNow );
258
259 // Assign a message number
260 pSendMessage->m_nMessageNumber = ++m_senderState.m_nLastSentMsgNum;
261
262 // Reliable, or unreliable?
263 if ( pSendMessage->m_nFlags & k_nSteamNetworkingSend_Reliable )
264 {
265 pSendMessage->SNPSend_SetReliableStreamPos( m_senderState.m_nReliableStreamPos );
266
267 // Generate the header
268 byte *hdr = pSendMessage->SNPSend_ReliableHeader();
269 hdr[0] = 0;
270 byte *hdrEnd = hdr+1;
271 int64 nMsgNumGap = pSendMessage->m_nMessageNumber - m_senderState.m_nLastSendMsgNumReliable;
272 Assert( nMsgNumGap >= 1 );
273 if ( nMsgNumGap > 1 )
274 {
275 hdrEnd = SerializeVarInt( hdrEnd, (uint64)nMsgNumGap );
276 hdr[0] |= 0x40;
277 }
278 if ( cbData < 0x20 )
279 {
280 hdr[0] |= (byte)cbData;
281 }
282 else
283 {
284 hdr[0] |= (byte)( 0x20 | ( cbData & 0x1f ) );
285 hdrEnd = SerializeVarInt( hdrEnd, cbData>>5U );
286 }
287 pSendMessage->m_cbSNPSendReliableHeader = hdrEnd - hdr;
288
289 // Grow the total size of the message by the header
290 pSendMessage->m_cbSize += pSendMessage->m_cbSNPSendReliableHeader;
291
292 // Advance stream pointer
293 m_senderState.m_nReliableStreamPos += pSendMessage->m_cbSize;
294
295 // Update stats
296 ++m_senderState.m_nMessagesSentReliable;
297 m_senderState.m_cbPendingReliable += pSendMessage->m_cbSize;
298
299 // Remember last sent reliable message number, so we can know how to
300 // encode the next one
301 m_senderState.m_nLastSendMsgNumReliable = pSendMessage->m_nMessageNumber;
302
303 Assert( pSendMessage->SNPSend_IsReliable() );
304 }
305 else
306 {
307 pSendMessage->SNPSend_SetReliableStreamPos( 0 );
308 pSendMessage->m_cbSNPSendReliableHeader = 0;
309
310 ++m_senderState.m_nMessagesSentUnreliable;
311 m_senderState.m_cbPendingUnreliable += pSendMessage->m_cbSize;
312
313 Assert( !pSendMessage->SNPSend_IsReliable() );
314 }
315
316 // Add to pending list
317 m_senderState.m_messagesQueued.push_back( pSendMessage );
318 SpewVerboseGroup( m_connectionConfig.m_LogLevel_Message.Get(), "[%s] SendMessage %s: MsgNum=%lld sz=%d\n",
319 GetDescription(),
320 pSendMessage->SNPSend_IsReliable() ? "RELIABLE" : "UNRELIABLE",
321 (long long)pSendMessage->m_nMessageNumber,
322 pSendMessage->m_cbSize );
323
324 // Use Nagle?
325 // We always set the Nagle timer, even if we immediately clear it. This makes our clearing code simpler,
326 // since we can always safely assume that once we find a message with the nagle timer cleared, all messages
327 // queued earlier than this also have it cleared.
328 // FIXME - Don't think this works if the configuration value is changing. Since changing the
329 // config value could violate the assumption that nagle times are increasing. Probably not worth
330 // fixing.
331 pSendMessage->SNPSend_SetUsecNagle( usecNow + m_connectionConfig.m_NagleTime.Get() );
332 if ( pSendMessage->m_nFlags & k_nSteamNetworkingSend_NoNagle )
333 m_senderState.ClearNagleTimers();
334
335 // Save the message number. The code below might end up deleting the message we just queued
336 int64 result = pSendMessage->m_nMessageNumber;
337
338 // Schedule wakeup at the appropriate time. (E.g. right now, if we're ready to send,
339 // or at the Nagle time, if Nagle is active.)
340 //
341 // NOTE: Right now we might not actually be capable of sending end to end data.
342 // But that case is relatively rare, and nothing will break if we try to right now.
343 // On the other hand, just asking the question involved a virtual function call,
344 // and it will return success most of the time, so let's not make the check here.
345 if ( GetState() == k_ESteamNetworkingConnectionState_Connected )
346 {
347 SteamNetworkingMicroseconds usecNextThink = SNP_GetNextThinkTime( usecNow );
348
349 // Ready to send now?
350 if ( usecNextThink > usecNow )
351 {
352
353 // Not ready to send yet. Is it because Nagle, or because we have previous
354 // data queued and are rate limited?
355 if ( usecNextThink > m_senderState.m_messagesQueued.m_pFirst->SNPSend_UsecNagle() )
356 {
357 // It's because of the rate limit
358 SpewVerbose( "[%s] Send RATELIM. QueueTime is %.1fms, SendRate=%.1fk, BytesQueued=%d, ping=%dms\n",
359 GetDescription(),
360 m_sendRateData.CalcTimeUntilNextSend() * 1e-3,
361 m_sendRateData.m_nCurrentSendRateEstimate * ( 1.0/1024.0),
362 m_senderState.PendingBytesTotal(),
363 m_statsEndToEnd.m_ping.m_nSmoothedPing
364 );
365 }
366 else
367 {
368 // Waiting on nagle
369 SpewVerbose( "[%s] Send Nagle %.1fms. QueueTime is %.1fms, SendRate=%.1fk, BytesQueued=%d, ping=%dms\n",
370 GetDescription(),
371 ( m_senderState.m_messagesQueued.m_pFirst->SNPSend_UsecNagle() - usecNow ) * 1e-3,
372 m_sendRateData.CalcTimeUntilNextSend() * 1e-3,
373 m_sendRateData.m_nCurrentSendRateEstimate * ( 1.0/1024.0),
374 m_senderState.PendingBytesTotal(),
375 m_statsEndToEnd.m_ping.m_nSmoothedPing
376 );
377 }
378
379 // Set a wakeup call.
380 EnsureMinThinkTime( usecNextThink );
381 }
382 else
383 {
384
385 // We're ready to send right now. Check if we should!
386 if ( pSendMessage->m_nFlags & k_nSteamNetworkingSend_UseCurrentThread )
387 {
388
389 // We should send in this thread, before the API entry point
390 // that the app used returns. Is the caller gonna handle this?
391 if ( pbThinkImmediately )
392 {
393 // Caller says they will handle it
394 *pbThinkImmediately = true;
395 }
396 else
397 {
398 // Caller wants us to just do it here, if we can
399 CheckConnectionStateOrScheduleWakeUp( usecNow );
400 }
401 }
402 else
403 {
404 // Wake up the service thread ASAP to send this in the background thread
405 SetNextThinkTimeASAP();
406 }
407 }
408 }
409
410 return result;
411 }
412
SNP_FlushMessage(SteamNetworkingMicroseconds usecNow)413 EResult CSteamNetworkConnectionBase::SNP_FlushMessage( SteamNetworkingMicroseconds usecNow )
414 {
415 // Connection must be locked, but we don't require the global lock here!
416 m_pLock->AssertHeldByCurrentThread();
417
418 // If we're not connected, then go ahead and mark the messages ready to send
419 // once we connect, but otherwise don't take any action
420 if ( GetState() != k_ESteamNetworkingConnectionState_Connected )
421 {
422 m_senderState.ClearNagleTimers();
423 return k_EResultIgnored;
424 }
425
426 if ( m_senderState.m_messagesQueued.empty() )
427 return k_EResultOK;
428
429 // If no Nagle timer was set, then there's nothing to do, we should already
430 // be properly scheduled. Don't do work to re-discover that fact.
431 if ( m_senderState.m_messagesQueued.m_pLast->SNPSend_UsecNagle() == 0 )
432 return k_EResultOK;
433
434 // Accumulate tokens, and also limit to reasonable burst
435 // if we weren't already waiting to send before this.
436 // (Clearing the Nagle timers might very well make us want to
437 // send so we want to do this first.)
438 SNP_ClampSendRate();
439 SNP_TokenBucket_Accumulate( usecNow );
440
441 // Clear all Nagle timers
442 m_senderState.ClearNagleTimers();
443
444 // Schedule wakeup at the appropriate time. (E.g. right now, if we're ready to send.)
445 SteamNetworkingMicroseconds usecNextThink = SNP_GetNextThinkTime( usecNow );
446 EnsureMinThinkTime( usecNextThink );
447 return k_EResultOK;
448 }
449
ProcessPlainTextDataChunk(int usecTimeSinceLast,RecvPacketContext_t & ctx)450 bool CSteamNetworkConnectionBase::ProcessPlainTextDataChunk( int usecTimeSinceLast, RecvPacketContext_t &ctx )
451 {
452 #define DECODE_ERROR( ... ) do { \
453 ConnectionState_ProblemDetectedLocally( k_ESteamNetConnectionEnd_Misc_InternalError, __VA_ARGS__ ); \
454 return false; } while(false)
455
456 #define EXPECT_BYTES(n,pszWhatFor) \
457 do { \
458 if ( pDecode + (n) > pEnd ) \
459 DECODE_ERROR( "SNP decode overrun, %d bytes for %s", (n), pszWhatFor ); \
460 } while (false)
461
462 #define READ_8BITU( var, pszWhatFor ) \
463 do { EXPECT_BYTES(1,pszWhatFor); var = *(uint8 *)pDecode; pDecode += 1; } while(false)
464
465 #define READ_16BITU( var, pszWhatFor ) \
466 do { EXPECT_BYTES(2,pszWhatFor); var = LittleWord(*(uint16 *)pDecode); pDecode += 2; } while(false)
467
468 #define READ_24BITU( var, pszWhatFor ) \
469 do { EXPECT_BYTES(3,pszWhatFor); \
470 var = *(uint8 *)pDecode; pDecode += 1; \
471 var |= uint32( LittleWord(*(uint16 *)pDecode) ) << 8U; pDecode += 2; \
472 } while(false)
473
474 #define READ_32BITU( var, pszWhatFor ) \
475 do { EXPECT_BYTES(4,pszWhatFor); var = LittleDWord(*(uint32 *)pDecode); pDecode += 4; } while(false)
476
477 #define READ_48BITU( var, pszWhatFor ) \
478 do { EXPECT_BYTES(6,pszWhatFor); \
479 var = LittleWord( *(uint16 *)pDecode ); pDecode += 2; \
480 var |= uint64( LittleDWord(*(uint32 *)pDecode) ) << 16U; pDecode += 4; \
481 } while(false)
482
483 #define READ_64BITU( var, pszWhatFor ) \
484 do { EXPECT_BYTES(8,pszWhatFor); var = LittleQWord(*(uint64 *)pDecode); pDecode += 8; } while(false)
485
486 #define READ_VARINT( var, pszWhatFor ) \
487 do { pDecode = DeserializeVarInt( pDecode, pEnd, var ); if ( !pDecode ) { DECODE_ERROR( "SNP data chunk decode overflow, varint for %s", pszWhatFor ); } } while(false)
488
489 #define READ_SEGMENT_DATA_SIZE( is_reliable ) \
490 int cbSegmentSize; \
491 { \
492 int sizeFlags = nFrameType & 7; \
493 if ( sizeFlags <= 4 ) \
494 { \
495 uint8 lowerSizeBits; \
496 READ_8BITU( lowerSizeBits, #is_reliable " size lower bits" ); \
497 cbSegmentSize = (sizeFlags<<8) + lowerSizeBits; \
498 if ( pDecode + cbSegmentSize > pEnd ) \
499 { \
500 DECODE_ERROR( "SNP decode overrun %d bytes for %s segment data.", cbSegmentSize, #is_reliable ); \
501 } \
502 } \
503 else if ( sizeFlags == 7 ) \
504 { \
505 cbSegmentSize = pEnd - pDecode; \
506 } \
507 else \
508 { \
509 DECODE_ERROR( "Invalid SNP frame lead byte 0x%02x. (size bits)", nFrameType ); \
510 } \
511 } \
512 const uint8 *pSegmentData = pDecode; \
513 pDecode += cbSegmentSize;
514
515 // Make sure we have initialized the connection
516 Assert( BStateIsActive() );
517
518 const SteamNetworkingMicroseconds usecNow = ctx.m_usecNow;
519 const int64 nPktNum = ctx.m_nPktNum;
520 bool bInhibitMarkReceived = false;
521
522 const int nLogLevelPacketDecode = m_connectionConfig.m_LogLevel_PacketDecode.Get();
523 SpewVerboseGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld\n", GetDescription(), (long long)nPktNum );
524
525 // Decode frames until we get to the end of the payload
526 const byte *pDecode = (const byte *)ctx.m_pPlainText;
527 const byte *pEnd = pDecode + ctx.m_cbPlainText;
528 int64 nCurMsgNum = 0;
529 int64 nDecodeReliablePos = 0;
530 while ( pDecode < pEnd )
531 {
532
533 uint8 nFrameType = *pDecode;
534 ++pDecode;
535 if ( ( nFrameType & 0xc0 ) == 0x00 )
536 {
537
538 //
539 // Unreliable segment
540 //
541
542 // Decode message number
543 if ( nCurMsgNum == 0 )
544 {
545 // First unreliable frame. Message number is absolute, but only bottom N bits are sent
546 static const char szUnreliableMsgNumOffset[] = "unreliable msgnum";
547 int64 nLowerBits, nMask;
548 if ( nFrameType & 0x10 )
549 {
550 READ_32BITU( nLowerBits, szUnreliableMsgNumOffset );
551 nMask = 0xffffffff;
552 nCurMsgNum = NearestWithSameLowerBits( (int32)nLowerBits, m_receiverState.m_nHighestSeenMsgNum );
553 }
554 else
555 {
556 READ_16BITU( nLowerBits, szUnreliableMsgNumOffset );
557 nMask = 0xffff;
558 nCurMsgNum = NearestWithSameLowerBits( (int16)nLowerBits, m_receiverState.m_nHighestSeenMsgNum );
559 }
560 Assert( ( nCurMsgNum & nMask ) == nLowerBits );
561
562 if ( nCurMsgNum <= 0 )
563 {
564 DECODE_ERROR( "SNP decode unreliable msgnum underflow. %llx mod %llx, highest seen %llx",
565 (unsigned long long)nLowerBits, (unsigned long long)( nMask+1 ), (unsigned long long)m_receiverState.m_nHighestSeenMsgNum );
566 }
567 if ( std::abs( nCurMsgNum - m_receiverState.m_nHighestSeenMsgNum ) > (nMask>>2) )
568 {
569 // We really should never get close to this boundary.
570 SpewWarningRateLimited( usecNow, "Sender sent abs unreliable message number using %llx mod %llx, highest seen %llx\n",
571 (unsigned long long)nLowerBits, (unsigned long long)( nMask+1 ), (unsigned long long)m_receiverState.m_nHighestSeenMsgNum );
572 }
573
574 }
575 else
576 {
577 if ( nFrameType & 0x10 )
578 {
579 uint64 nMsgNumOffset;
580 READ_VARINT( nMsgNumOffset, "unreliable msgnum offset" );
581 nCurMsgNum += nMsgNumOffset;
582 }
583 else
584 {
585 ++nCurMsgNum;
586 }
587 }
588 if ( nCurMsgNum > m_receiverState.m_nHighestSeenMsgNum )
589 m_receiverState.m_nHighestSeenMsgNum = nCurMsgNum;
590
591 //
592 // Decode segment offset in message
593 //
594 uint32 nOffset = 0;
595 if ( nFrameType & 0x08 )
596 READ_VARINT( nOffset, "unreliable data offset" );
597
598 //
599 // Decode size, locate segment data
600 //
601 READ_SEGMENT_DATA_SIZE( unreliable )
602
603 // Check if offset+size indicates a message larger than what we support. (Also,
604 // protect against malicious sender sending *extremely* large offset causing overflow.)
605 if ( (int64)nOffset + cbSegmentSize > k_cbMaxUnreliableMsgSizeRecv || cbSegmentSize > k_cbMaxUnreliableSegmentSizeRecv )
606 {
607
608 // Since this is unreliable data, we can just ignore the segment.
609 SpewWarningRateLimited( usecNow, "[%s] Ignoring unreliable segment with invalid offset %u size %d\n",
610 GetDescription(), nOffset, cbSegmentSize );
611 }
612 else
613 {
614
615 // Receive the segment
616 bool bLastSegmentInMessage = ( nFrameType & 0x20 ) != 0;
617 SNP_ReceiveUnreliableSegment( nCurMsgNum, nOffset, pSegmentData, cbSegmentSize, bLastSegmentInMessage, usecNow );
618 }
619 }
620 else if ( ( nFrameType & 0xe0 ) == 0x40 )
621 {
622
623 //
624 // Reliable segment
625 //
626
627 // First reliable segment?
628 if ( nDecodeReliablePos == 0 )
629 {
630
631 // Stream position is absolute. How many bits?
632 static const char szFirstReliableStreamPos[] = "first reliable streampos";
633 int64 nOffset, nMask;
634 switch ( nFrameType & (3<<3) )
635 {
636 case 0<<3: READ_24BITU( nOffset, szFirstReliableStreamPos ); nMask = (1ll<<24)-1; break;
637 case 1<<3: READ_32BITU( nOffset, szFirstReliableStreamPos ); nMask = (1ll<<32)-1; break;
638 case 2<<3: READ_48BITU( nOffset, szFirstReliableStreamPos ); nMask = (1ll<<48)-1; break;
639 default: DECODE_ERROR( "Reserved reliable stream pos size" );
640 }
641
642 // What do we expect to receive next?
643 int64 nExpectNextStreamPos = m_receiverState.m_nReliableStreamPos + len( m_receiverState.m_bufReliableStream );
644
645 // Find the stream offset closest to that
646 nDecodeReliablePos = ( nExpectNextStreamPos & ~nMask ) + nOffset;
647 if ( nDecodeReliablePos + (nMask>>1) < nExpectNextStreamPos )
648 {
649 nDecodeReliablePos += nMask+1;
650 Assert( ( nDecodeReliablePos & nMask ) == nOffset );
651 Assert( nExpectNextStreamPos < nDecodeReliablePos );
652 Assert( nExpectNextStreamPos + (nMask>>1) >= nDecodeReliablePos );
653 }
654 if ( nDecodeReliablePos <= 0 )
655 {
656 DECODE_ERROR( "SNP decode first reliable stream pos underflow. %llx mod %llx, expected next %llx",
657 (unsigned long long)nOffset, (unsigned long long)( nMask+1 ), (unsigned long long)nExpectNextStreamPos );
658 }
659 if ( std::abs( nDecodeReliablePos - nExpectNextStreamPos ) > (nMask>>2) )
660 {
661 // We really should never get close to this boundary.
662 SpewWarningRateLimited( usecNow, "Sender sent reliable stream pos using %llx mod %llx, expected next %llx\n",
663 (unsigned long long)nOffset, (unsigned long long)( nMask+1 ), (unsigned long long)nExpectNextStreamPos );
664 }
665 }
666 else
667 {
668 // Subsequent reliable message encode the position as an offset from previous.
669 static const char szOtherReliableStreamPos[] = "reliable streampos offset";
670 int64 nOffset;
671 switch ( nFrameType & (3<<3) )
672 {
673 case 0<<3: nOffset = 0; break;
674 case 1<<3: READ_8BITU( nOffset, szOtherReliableStreamPos ); break;
675 case 2<<3: READ_16BITU( nOffset, szOtherReliableStreamPos ); break;
676 default: READ_32BITU( nOffset, szOtherReliableStreamPos ); break;
677 }
678 nDecodeReliablePos += nOffset;
679 }
680
681 //
682 // Decode size, locate segment data
683 //
684 READ_SEGMENT_DATA_SIZE( reliable )
685
686 // Ingest the segment.
687 if ( !SNP_ReceiveReliableSegment( nPktNum, nDecodeReliablePos, pSegmentData, cbSegmentSize, usecNow ) )
688 {
689 if ( !BStateIsActive() )
690 return false; // we decided to nuke the connection - abort packet processing
691
692 // We're not able to ingest this reliable segment at the moment,
693 // but we didn't terminate the connection. So do not ack this packet
694 // to the peer. We need them to retransmit
695 bInhibitMarkReceived = true;
696 }
697
698 // Advance pointer for the next reliable segment, if any.
699 nDecodeReliablePos += cbSegmentSize;
700
701 // Decoding rules state that if we have established a message number,
702 // (from an earlier unreliable message), then we advance it.
703 if ( nCurMsgNum > 0 )
704 ++nCurMsgNum;
705 }
706 else if ( ( nFrameType & 0xfc ) == 0x80 )
707 {
708 //
709 // Stop waiting
710 //
711
712 int64 nOffset = 0;
713 static const char szStopWaitingOffset[] = "stop_waiting offset";
714 switch ( nFrameType & 3 )
715 {
716 case 0: READ_8BITU( nOffset, szStopWaitingOffset ); break;
717 case 1: READ_16BITU( nOffset, szStopWaitingOffset ); break;
718 case 2: READ_24BITU( nOffset, szStopWaitingOffset ); break;
719 case 3: READ_64BITU( nOffset, szStopWaitingOffset ); break;
720 }
721 if ( nOffset >= nPktNum )
722 {
723 DECODE_ERROR( "stop_waiting pktNum %llu offset %llu", nPktNum, nOffset );
724 }
725 ++nOffset;
726 int64 nMinPktNumToSendAcks = nPktNum-nOffset;
727 if ( nMinPktNumToSendAcks == m_receiverState.m_nMinPktNumToSendAcks )
728 continue;
729 if ( nMinPktNumToSendAcks < m_receiverState.m_nMinPktNumToSendAcks )
730 {
731 // Sender must never reduce this number! Check for bugs or bogus sender
732 if ( nPktNum >= m_receiverState.m_nPktNumUpdatedMinPktNumToSendAcks )
733 {
734 DECODE_ERROR( "SNP stop waiting reduced %lld (pkt %lld) -> %lld (pkt %lld)",
735 (long long)m_receiverState.m_nMinPktNumToSendAcks,
736 (long long)m_receiverState.m_nPktNumUpdatedMinPktNumToSendAcks,
737 (long long)nMinPktNumToSendAcks,
738 (long long)nPktNum
739 );
740 }
741 continue;
742 }
743 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld stop waiting: %lld (was %lld)",
744 GetDescription(),
745 (long long)nPktNum,
746 (long long)nMinPktNumToSendAcks, (long long)m_receiverState.m_nMinPktNumToSendAcks );
747 m_receiverState.m_nMinPktNumToSendAcks = nMinPktNumToSendAcks;
748 m_receiverState.m_nPktNumUpdatedMinPktNumToSendAcks = nPktNum;
749
750 // Trim from the front of the packet gap list,
751 // we can stop reporting these losses to the sender
752 auto h = m_receiverState.m_mapPacketGaps.begin();
753 while ( h->first <= m_receiverState.m_nMinPktNumToSendAcks )
754 {
755 if ( h->second.m_nEnd > m_receiverState.m_nMinPktNumToSendAcks )
756 {
757 // Ug. You're not supposed to modify the key in a map.
758 // I suppose that's legit, since you could violate the ordering.
759 // but in this case I know that this change is OK.
760 const_cast<int64 &>( h->first ) = m_receiverState.m_nMinPktNumToSendAcks;
761 break;
762 }
763
764 // Were we pending an ack on this?
765 if ( m_receiverState.m_itPendingAck == h )
766 ++m_receiverState.m_itPendingAck;
767
768 // Were we pending a nack on this?
769 if ( m_receiverState.m_itPendingNack == h )
770 {
771 // I am not sure this is even possible.
772 AssertMsg( false, "Expiring packet gap, which had pending NACK" );
773
774 // But just in case, this would be the proper action
775 ++m_receiverState.m_itPendingNack;
776 }
777
778 // Packet loss is in the past. Forget about it and move on
779 h = m_receiverState.m_mapPacketGaps.erase(h);
780 }
781 }
782 else if ( ( nFrameType & 0xf0 ) == 0x90 )
783 {
784
785 //
786 // Ack
787 //
788
789 #if STEAMNETWORKINGSOCKETS_SNP_PARANOIA > 0
790 m_senderState.DebugCheckInFlightPacketMap();
791 #if STEAMNETWORKINGSOCKETS_SNP_PARANOIA == 1
792 if ( ( nPktNum & 255 ) == 0 ) // only do it periodically
793 #endif
794 {
795 m_senderState.DebugCheckInFlightPacketMap();
796 }
797 #endif
798
799 // Parse latest received sequence number
800 int64 nLatestRecvSeqNum;
801 {
802 static const char szAckLatestPktNum[] = "ack latest pktnum";
803 int64 nLowerBits, nMask;
804 if ( nFrameType & 0x40 )
805 {
806 READ_32BITU( nLowerBits, szAckLatestPktNum );
807 nMask = 0xffffffff;
808 nLatestRecvSeqNum = NearestWithSameLowerBits( (int32)nLowerBits, m_statsEndToEnd.m_nNextSendSequenceNumber );
809 }
810 else
811 {
812 READ_16BITU( nLowerBits, szAckLatestPktNum );
813 nMask = 0xffff;
814 nLatestRecvSeqNum = NearestWithSameLowerBits( (int16)nLowerBits, m_statsEndToEnd.m_nNextSendSequenceNumber );
815 }
816 Assert( ( nLatestRecvSeqNum & nMask ) == nLowerBits );
817
818 // Find the message number that is closes to
819 if ( nLatestRecvSeqNum < 0 )
820 {
821 DECODE_ERROR( "SNP decode ack latest pktnum underflow. %llx mod %llx, next send %llx",
822 (unsigned long long)nLowerBits, (unsigned long long)( nMask+1 ), (unsigned long long)m_statsEndToEnd.m_nNextSendSequenceNumber );
823 }
824 if ( std::abs( nLatestRecvSeqNum - m_statsEndToEnd.m_nNextSendSequenceNumber ) > (nMask>>2) )
825 {
826 // We really should never get close to this boundary.
827 SpewWarningRateLimited( usecNow, "Sender sent abs latest recv pkt number using %llx mod %llx, next send %llx\n",
828 (unsigned long long)nLowerBits, (unsigned long long)( nMask+1 ), (unsigned long long)m_statsEndToEnd.m_nNextSendSequenceNumber );
829 }
830 if ( nLatestRecvSeqNum >= m_statsEndToEnd.m_nNextSendSequenceNumber )
831 {
832 DECODE_ERROR( "SNP decode ack latest pktnum %lld (%llx mod %llx), but next outoing packet is %lld (%llx).",
833 (long long)nLatestRecvSeqNum, (unsigned long long)nLowerBits, (unsigned long long)( nMask+1 ),
834 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (unsigned long long)m_statsEndToEnd.m_nNextSendSequenceNumber
835 );
836 }
837 }
838
839 // Locate our bookkeeping for this packet, or the latest one before it
840 // Remember, we have a sentinel with a low, invalid packet number
841 Assert( !m_senderState.m_mapInFlightPacketsByPktNum.empty() );
842 auto inFlightPkt = m_senderState.m_mapInFlightPacketsByPktNum.upper_bound( nLatestRecvSeqNum );
843 --inFlightPkt;
844 Assert( inFlightPkt->first <= nLatestRecvSeqNum );
845
846 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld latest recv %lld, inflight=%lld\n",
847 GetDescription(),
848 (long long)nPktNum, (long long)nLatestRecvSeqNum, (long long)inFlightPkt->first
849 );
850
851 // Parse out delay, and process the ping
852 {
853 uint16 nPackedDelay;
854 READ_16BITU( nPackedDelay, "ack delay" );
855 if ( nPackedDelay != 0xffff && inFlightPkt->first == nLatestRecvSeqNum && inFlightPkt->second.m_pTransport == ctx.m_pTransport )
856 {
857 SteamNetworkingMicroseconds usecDelay = SteamNetworkingMicroseconds( nPackedDelay ) << k_nAckDelayPrecisionShift;
858 SteamNetworkingMicroseconds usecElapsed = usecNow - inFlightPkt->second.m_usecWhenSent;
859 Assert( usecElapsed >= 0 );
860
861 // Account for their reported delay, and calculate ping, in MS
862 int msPing = ( usecElapsed - usecDelay ) / 1000;
863
864 // Does this seem bogus? (We allow a small amount of slop.)
865 // NOTE: A malicious sender could lie about this delay, tricking us
866 // into thinking that the real network latency is low, they are just
867 // delaying their replies. This actually matters, since the ping time
868 // is an input into the rate calculation. So we might need to
869 // occasionally send pings that require an immediately reply, and
870 // if those ping times seem way out of whack with the ones where they are
871 // allowed to send a delay, take action against them.
872 if ( msPing < -1 || msPing > 2000 )
873 {
874 // Either they are lying or some weird timer stuff is happening.
875 // Either way, discard it.
876
877 SpewMsgGroup( m_connectionConfig.m_LogLevel_AckRTT.Get(), "[%s] decode pkt %lld latest recv %lld delay %lluusec INVALID ping %lldusec\n",
878 GetDescription(),
879 (long long)nPktNum, (long long)nLatestRecvSeqNum,
880 (unsigned long long)usecDelay,
881 (long long)usecElapsed
882 );
883 }
884 else
885 {
886 // Clamp, if we have slop
887 if ( msPing < 0 )
888 msPing = 0;
889 ProcessSNPPing( msPing, ctx );
890
891 // Spew
892 SpewVerboseGroup( m_connectionConfig.m_LogLevel_AckRTT.Get(), "[%s] decode pkt %lld latest recv %lld delay %.1fms elapsed %.1fms ping %dms\n",
893 GetDescription(),
894 (long long)nPktNum, (long long)nLatestRecvSeqNum,
895 (float)(usecDelay * 1e-3 ),
896 (float)(usecElapsed * 1e-3 ),
897 msPing
898 );
899 }
900 }
901 }
902
903 // Parse number of blocks
904 int nBlocks = nFrameType&7;
905 if ( nBlocks == 7 )
906 READ_8BITU( nBlocks, "ack num blocks" );
907
908 // If they actually sent us any blocks, that means they are fragmented.
909 // We should make sure and tell them to stop sending us these nacks
910 // and move forward.
911 if ( nBlocks > 0 )
912 {
913 // Decrease flush delay the more blocks they send us.
914 // FIXME - This is not an optimal way to do this. Forcing us to
915 // ack everything is not what we want to do. Instead, we should
916 // use a separate timer for when we need to flush out a stop_waiting
917 // packet!
918 SteamNetworkingMicroseconds usecDelay = 250*1000 / nBlocks;
919 QueueFlushAllAcks( usecNow + usecDelay );
920 }
921
922 // Process ack blocks, working backwards from the latest received sequence number.
923 // Note that we have to parse all this stuff out, even if it's old news (packets older
924 // than the stop_aiting value we sent), because we need to do that to get to the rest
925 // of the packet.
926 bool bAckedReliableRange = false;
927 int64 nPktNumAckEnd = nLatestRecvSeqNum+1;
928 while ( nBlocks >= 0 )
929 {
930
931 // Parse out number of acks/nacks.
932 // Have we parsed all the real blocks?
933 int64 nPktNumAckBegin, nPktNumNackBegin;
934 if ( nBlocks == 0 )
935 {
936 // Implicit block. Everything earlier between the last
937 // NACK and the stop_waiting value is implicitly acked!
938 if ( nPktNumAckEnd <= m_senderState.m_nMinPktWaitingOnAck )
939 break;
940
941 nPktNumAckBegin = m_senderState.m_nMinPktWaitingOnAck;
942 nPktNumNackBegin = nPktNumAckBegin;
943 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld ack last block ack begin %lld\n",
944 GetDescription(),
945 (long long)nPktNum, (long long)nPktNumAckBegin );
946 }
947 else
948 {
949 uint8 nBlockHeader;
950 READ_8BITU( nBlockHeader, "ack block header" );
951
952 // Ack count?
953 int64 numAcks = ( nBlockHeader>> 4 ) & 7;
954 if ( nBlockHeader & 0x80 )
955 {
956 uint64 nUpperBits;
957 READ_VARINT( nUpperBits, "ack count upper bits" );
958 if ( nUpperBits > 100000 )
959 DECODE_ERROR( "Ack count of %llu<<3 is crazy", (unsigned long long)nUpperBits );
960 numAcks |= nUpperBits<<3;
961 }
962 nPktNumAckBegin = nPktNumAckEnd - numAcks;
963 if ( nPktNumAckBegin < 0 )
964 DECODE_ERROR( "Ack range underflow, end=%lld, num=%lld", (long long)nPktNumAckEnd, (long long)numAcks );
965
966 // Extended nack count?
967 int64 numNacks = nBlockHeader & 7;
968 if ( nBlockHeader & 0x08)
969 {
970 uint64 nUpperBits;
971 READ_VARINT( nUpperBits, "nack count upper bits" );
972 if ( nUpperBits > 100000 )
973 DECODE_ERROR( "Nack count of %llu<<3 is crazy", nUpperBits );
974 numNacks |= nUpperBits<<3;
975 }
976 nPktNumNackBegin = nPktNumAckBegin - numNacks;
977 if ( nPktNumNackBegin < 0 )
978 DECODE_ERROR( "Nack range underflow, end=%lld, num=%lld", (long long)nPktNumAckBegin, (long long)numAcks );
979
980 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld nack [%lld,%lld) ack [%lld,%lld)\n",
981 GetDescription(),
982 (long long)nPktNum,
983 (long long)nPktNumNackBegin, (long long)( nPktNumNackBegin + numNacks ),
984 (long long)nPktNumAckBegin, (long long)( nPktNumAckBegin + numAcks )
985 );
986 }
987
988 // Process acks first.
989 Assert( nPktNumAckBegin >= 0 );
990 while ( inFlightPkt->first >= nPktNumAckBegin )
991 {
992 Assert( inFlightPkt->first < nPktNumAckEnd );
993
994 // Scan reliable segments, and see if any are marked for retry or are in flight
995 for ( const SNPRange_t &relRange: inFlightPkt->second.m_vecReliableSegments )
996 {
997 int l = int( relRange.length() );
998
999 // If range is present, it should be in only one of these two tables.
1000 if ( m_senderState.m_listInFlightReliableRange.erase( relRange ) == 0 )
1001 {
1002 if ( m_senderState.m_listReadyRetryReliableRange.erase( relRange ) > 0 )
1003 {
1004
1005 // When we put stuff into the reliable retry list, we mark it as pending again.
1006 // But now it's acked, so it's no longer pending, even though we didn't send it.
1007 Assert( m_senderState.m_cbPendingReliable >= l );
1008 m_senderState.m_cbPendingReliable -= l;
1009
1010 bAckedReliableRange = true;
1011 }
1012 }
1013 else
1014 {
1015 bAckedReliableRange = true;
1016 Assert( m_senderState.m_listReadyRetryReliableRange.count( relRange ) == 0 );
1017
1018 // Less data waiting to be acked
1019 Assert( m_senderState.m_cbSentUnackedReliable >= l );
1020 m_senderState.m_cbSentUnackedReliable -= l;
1021 }
1022 }
1023
1024 // Check if this was the next packet we were going to timeout, then advance
1025 // pointer. This guy didn't timeout.
1026 if ( inFlightPkt == m_senderState.m_itNextInFlightPacketToTimeout )
1027 ++m_senderState.m_itNextInFlightPacketToTimeout;
1028
1029 // No need to track this anymore, remove from our table
1030 inFlightPkt = m_senderState.m_mapInFlightPacketsByPktNum.erase( inFlightPkt );
1031 --inFlightPkt;
1032 m_senderState.MaybeCheckInFlightPacketMap();
1033 }
1034
1035 // Ack of in-flight end-to-end stats?
1036 if ( nPktNumAckBegin <= m_statsEndToEnd.m_pktNumInFlight && m_statsEndToEnd.m_pktNumInFlight < nPktNumAckEnd )
1037 m_statsEndToEnd.InFlightPktAck( usecNow );
1038
1039 // Process nacks.
1040 Assert( nPktNumNackBegin >= 0 );
1041 while ( inFlightPkt->first >= nPktNumNackBegin )
1042 {
1043 Assert( inFlightPkt->first < nPktNumAckEnd );
1044 SNP_SenderProcessPacketNack( inFlightPkt->first, inFlightPkt->second, "NACK" );
1045
1046 // We'll keep the record on hand, though, in case an ACK comes in
1047 --inFlightPkt;
1048 }
1049
1050 // Continue on to the the next older block
1051 nPktNumAckEnd = nPktNumNackBegin;
1052 --nBlocks;
1053 }
1054
1055 // Should we check for discarding reliable messages we are keeping around in case
1056 // of retransmission, since we know now that they were delivered?
1057 if ( bAckedReliableRange )
1058 {
1059 m_senderState.RemoveAckedReliableMessageFromUnackedList();
1060
1061 // Spew where we think the peer is decoding the reliable stream
1062 if ( nLogLevelPacketDecode >= k_ESteamNetworkingSocketsDebugOutputType_Debug )
1063 {
1064
1065 int64 nPeerReliablePos = m_senderState.m_nReliableStreamPos;
1066 if ( !m_senderState.m_listInFlightReliableRange.empty() )
1067 nPeerReliablePos = std::min( nPeerReliablePos, m_senderState.m_listInFlightReliableRange.begin()->first.m_nBegin );
1068 if ( !m_senderState.m_listReadyRetryReliableRange.empty() )
1069 nPeerReliablePos = std::min( nPeerReliablePos, m_senderState.m_listReadyRetryReliableRange.begin()->first.m_nBegin );
1070
1071 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld peer reliable pos = %lld\n",
1072 GetDescription(),
1073 (long long)nPktNum, (long long)nPeerReliablePos );
1074 }
1075 }
1076
1077 // Check if any of this was new info, then advance our stop_waiting value.
1078 if ( nLatestRecvSeqNum > m_senderState.m_nMinPktWaitingOnAck )
1079 {
1080 SpewVerboseGroup( nLogLevelPacketDecode, "[%s] updating min_waiting_on_ack %lld -> %lld\n",
1081 GetDescription(),
1082 (long long)m_senderState.m_nMinPktWaitingOnAck, (long long)nLatestRecvSeqNum );
1083 m_senderState.m_nMinPktWaitingOnAck = nLatestRecvSeqNum;
1084 }
1085 }
1086 else
1087 {
1088 DECODE_ERROR( "Invalid SNP frame lead byte 0x%02x", nFrameType );
1089 }
1090 }
1091
1092 // Should we record that we received it?
1093 if ( bInhibitMarkReceived )
1094 {
1095 // Something really odd. High packet loss / fragmentation.
1096 // Potentially the peer is being abusive and we need
1097 // to protect ourselves.
1098 //
1099 // Act as if the packet was dropped. This will cause the
1100 // peer's sender logic to interpret this as additional packet
1101 // loss and back off. That's a feature, not a bug.
1102 }
1103 else
1104 {
1105
1106 // Update structures needed to populate our ACKs.
1107 // If we received reliable data now, then schedule an ack
1108 bool bScheduleAck = nDecodeReliablePos > 0;
1109 SNP_RecordReceivedPktNum( nPktNum, usecNow, bScheduleAck );
1110 }
1111
1112 // Track end-to-end flow. Even if we decided to tell our peer that
1113 // we did not receive this, we want our own stats to reflect
1114 // that we did. (And we want to be able to quickly reject a
1115 // packet with this same number.)
1116 //
1117 // Also, note that order of operations is important. This call must
1118 // happen after the SNP_RecordReceivedPktNum call above
1119 m_statsEndToEnd.TrackProcessSequencedPacket( nPktNum, usecNow, usecTimeSinceLast );
1120
1121 // Packet can be processed further
1122 return true;
1123
1124 // Make sure these don't get used beyond where we intended them to get used
1125 #undef DECODE_ERROR
1126 #undef EXPECT_BYTES
1127 #undef READ_8BITU
1128 #undef READ_16BITU
1129 #undef READ_24BITU
1130 #undef READ_32BITU
1131 #undef READ_64BITU
1132 #undef READ_VARINT
1133 #undef READ_SEGMENT_DATA_SIZE
1134 }
1135
SNP_SenderProcessPacketNack(int64 nPktNum,SNPInFlightPacket_t & pkt,const char * pszDebug)1136 void CSteamNetworkConnectionBase::SNP_SenderProcessPacketNack( int64 nPktNum, SNPInFlightPacket_t &pkt, const char *pszDebug )
1137 {
1138
1139 // Did we already treat the packet as dropped (implicitly or explicitly)?
1140 if ( pkt.m_bNack )
1141 return;
1142
1143 // Mark as dropped
1144 pkt.m_bNack = true;
1145
1146 // Is this in-flight stats we were expecting an ack for?
1147 if ( m_statsEndToEnd.m_pktNumInFlight == nPktNum )
1148 m_statsEndToEnd.InFlightPktTimeout();
1149
1150 // Scan reliable segments
1151 for ( const SNPRange_t &relRange: pkt.m_vecReliableSegments )
1152 {
1153
1154 // Marked as in-flight?
1155 auto inFlightRange = m_senderState.m_listInFlightReliableRange.find( relRange );
1156 if ( inFlightRange == m_senderState.m_listInFlightReliableRange.end() )
1157 continue;
1158
1159 SpewMsgGroup( m_connectionConfig.m_LogLevel_PacketDecode.Get(), "[%s] pkt %lld %s, queueing retry of reliable range [%lld,%lld)\n",
1160 GetDescription(),
1161 nPktNum,
1162 pszDebug,
1163 relRange.m_nBegin, relRange.m_nEnd );
1164
1165 // The ready-to-retry list counts towards the "pending" stat
1166 int l = int( relRange.length() );
1167 Assert( m_senderState.m_cbSentUnackedReliable >= l );
1168 m_senderState.m_cbSentUnackedReliable -= l;
1169 m_senderState.m_cbPendingReliable += l;
1170
1171 // Move it to the ready for retry list!
1172 // if shouldn't already be there!
1173 Assert( m_senderState.m_listReadyRetryReliableRange.count( relRange ) == 0 );
1174 m_senderState.m_listReadyRetryReliableRange[ inFlightRange->first ] = inFlightRange->second;
1175 m_senderState.m_listInFlightReliableRange.erase( inFlightRange );
1176 }
1177 }
1178
SNP_SenderCheckInFlightPackets(SteamNetworkingMicroseconds usecNow)1179 SteamNetworkingMicroseconds CSteamNetworkConnectionBase::SNP_SenderCheckInFlightPackets( SteamNetworkingMicroseconds usecNow )
1180 {
1181 // Connection must be locked, but we don't require the global lock here!
1182 m_pLock->AssertHeldByCurrentThread();
1183
1184 // Fast path for nothing in flight.
1185 m_senderState.MaybeCheckInFlightPacketMap();
1186 if ( m_senderState.m_mapInFlightPacketsByPktNum.size() <= 1 )
1187 {
1188 Assert( m_senderState.m_itNextInFlightPacketToTimeout == m_senderState.m_mapInFlightPacketsByPktNum.end() );
1189 return k_nThinkTime_Never;
1190 }
1191 Assert( m_senderState.m_mapInFlightPacketsByPktNum.begin()->first < 0 );
1192
1193 SteamNetworkingMicroseconds usecNextRetry = k_nThinkTime_Never;
1194
1195 // Process retry timeout. Here we use a shorter timeout to trigger retry
1196 // than we do to totally forgot about the packet, in case an ack comes in late,
1197 // we can take advantage of it.
1198 SteamNetworkingMicroseconds usecRTO = m_statsEndToEnd.CalcSenderRetryTimeout();
1199 while ( m_senderState.m_itNextInFlightPacketToTimeout != m_senderState.m_mapInFlightPacketsByPktNum.end() )
1200 {
1201 Assert( m_senderState.m_itNextInFlightPacketToTimeout->first > 0 );
1202
1203 // If already nacked, then no use waiting on it, just skip it
1204 if ( !m_senderState.m_itNextInFlightPacketToTimeout->second.m_bNack )
1205 {
1206
1207 // Not yet time to give up?
1208 SteamNetworkingMicroseconds usecRetryPkt = m_senderState.m_itNextInFlightPacketToTimeout->second.m_usecWhenSent + usecRTO;
1209 if ( usecRetryPkt > usecNow )
1210 {
1211 usecNextRetry = usecRetryPkt;
1212 break;
1213 }
1214
1215 // Mark as dropped, and move any reliable contents into the
1216 // retry list.
1217 SNP_SenderProcessPacketNack( m_senderState.m_itNextInFlightPacketToTimeout->first, m_senderState.m_itNextInFlightPacketToTimeout->second, "AckTimeout" );
1218 }
1219
1220 // Advance to next packet waiting to timeout
1221 ++m_senderState.m_itNextInFlightPacketToTimeout;
1222 }
1223
1224 // Skip the sentinel
1225 auto inFlightPkt = m_senderState.m_mapInFlightPacketsByPktNum.begin();
1226 Assert( inFlightPkt->first < 0 );
1227 ++inFlightPkt;
1228
1229 // Expire old packets (all of these should have been marked as nacked)
1230 // Here we need to be careful when selecting an expiry. If the actual RTT
1231 // time suddenly increases, using the current RTT estimate may expire them
1232 // too quickly. This can actually be catastrophic, since if we forgot
1233 // about these packets now, and then they later are acked, we won't be able
1234 // to update our RTT, and so we will be stuck with an RTT estimate that
1235 // is too small
1236 SteamNetworkingMicroseconds usecExpiry = usecRTO*2;
1237 if ( m_statsEndToEnd.m_ping.m_nValidPings < 1 )
1238 {
1239 usecExpiry += k_nMillion;
1240 }
1241 else
1242 {
1243 SteamNetworkingMicroseconds usecMostRecentPingAge = usecNow - m_statsEndToEnd.m_ping.TimeRecvMostRecentPing();
1244 usecMostRecentPingAge = std::min( usecMostRecentPingAge, k_nMillion*3 );
1245 if ( usecMostRecentPingAge > usecExpiry )
1246 usecExpiry = usecMostRecentPingAge;
1247 }
1248
1249 SteamNetworkingMicroseconds usecWhenExpiry = usecNow - usecExpiry;
1250 for (;;)
1251 {
1252 if ( inFlightPkt->second.m_usecWhenSent > usecWhenExpiry )
1253 break;
1254
1255 // Should have already been timed out by the code above
1256 Assert( inFlightPkt->second.m_bNack );
1257 Assert( inFlightPkt != m_senderState.m_itNextInFlightPacketToTimeout );
1258
1259 // Expire it, advance to the next one
1260 inFlightPkt = m_senderState.m_mapInFlightPacketsByPktNum.erase( inFlightPkt );
1261 Assert( !m_senderState.m_mapInFlightPacketsByPktNum.empty() );
1262
1263 // Bail if we've hit the end of the nacks
1264 if ( inFlightPkt == m_senderState.m_mapInFlightPacketsByPktNum.end() )
1265 break;
1266 }
1267
1268 // Make sure we didn't hose data structures
1269 m_senderState.MaybeCheckInFlightPacketMap();
1270
1271 // Return time when we really need to check back in again.
1272 // We don't wake up early just to expire old nacked packets,
1273 // there is no urgency or value in doing that, we can clean
1274 // those up whenever. We only make sure and wake up when we
1275 // need to retry. (And we need to make sure we don't let
1276 // our list of old packets grow unnecessarily long.)
1277 return usecNextRetry;
1278 }
1279
1280 struct EncodedSegment
1281 {
1282 static constexpr int k_cbMaxHdr = 16;
1283 uint8 m_hdr[ k_cbMaxHdr ];
1284 int m_cbHdr; // Doesn't include any size byte
1285 CSteamNetworkingMessage *m_pMsg;
1286 int m_cbSegSize;
1287 int m_nOffset;
1288
SetupReliableSteamNetworkingSocketsLib::EncodedSegment1289 inline void SetupReliable( CSteamNetworkingMessage *pMsg, int64 nBegin, int64 nEnd, int64 nLastReliableStreamPosEnd )
1290 {
1291 Assert( nBegin < nEnd );
1292 //Assert( nBegin + k_cbSteamNetworkingSocketsMaxReliableMessageSegment >= nEnd ); // Max sure we don't exceed max segment size
1293 Assert( pMsg->SNPSend_IsReliable() );
1294
1295 // Start filling out the header with the top three bits = 010,
1296 // identifying this as a reliable segment
1297 uint8 *pHdr = m_hdr;
1298 *(pHdr++) = 0x40;
1299
1300 // First reliable segment in the message?
1301 if ( nLastReliableStreamPosEnd == 0 )
1302 {
1303 // Always use 48-byte offsets, to make sure we are exercising the worst case.
1304 // Later we should optimize this
1305 m_hdr[0] |= 0x10;
1306 *(uint16*)pHdr = LittleWord( uint16( nBegin ) ); pHdr += 2;
1307 *(uint32*)pHdr = LittleDWord( uint32( nBegin>>16 ) ); pHdr += 4;
1308 }
1309 else
1310 {
1311 // Offset from end of previous reliable segment in the same packet
1312 Assert( nBegin >= nLastReliableStreamPosEnd );
1313 int64 nOffset = nBegin - nLastReliableStreamPosEnd;
1314 if ( nOffset == 0)
1315 {
1316 // Nothing to encode
1317 }
1318 else if ( nOffset < 0x100 )
1319 {
1320 m_hdr[0] |= (1<<3);
1321 *pHdr = uint8( nOffset ); pHdr += 1;
1322 }
1323 else if ( nOffset < 0x10000 )
1324 {
1325 m_hdr[0] |= (2<<3);
1326 *(uint16*)pHdr = LittleWord( uint16( nOffset ) ); pHdr += 2;
1327 }
1328 else
1329 {
1330 m_hdr[0] |= (3<<3);
1331 *(uint32*)pHdr = LittleDWord( uint32( nOffset ) ); pHdr += 4;
1332 }
1333 }
1334
1335 m_cbHdr = pHdr-m_hdr;
1336
1337 // Size of the segment. We assume that the whole things fits for now,
1338 // even though it might need to get truncated
1339 int cbSegData = nEnd - nBegin;
1340 Assert( cbSegData > 0 );
1341 Assert( nBegin >= pMsg->SNPSend_ReliableStreamPos() );
1342 Assert( nEnd <= pMsg->SNPSend_ReliableStreamPos() + pMsg->m_cbSize );
1343
1344 m_pMsg = pMsg;
1345 m_nOffset = nBegin - pMsg->SNPSend_ReliableStreamPos();
1346 m_cbSegSize = cbSegData;
1347 }
1348
SetupUnreliableSteamNetworkingSocketsLib::EncodedSegment1349 inline void SetupUnreliable( CSteamNetworkingMessage *pMsg, int nOffset, int64 nLastMsgNum )
1350 {
1351
1352 // Start filling out the header with the top two bits = 00,
1353 // identifying this as an unreliable segment
1354 uint8 *pHdr = m_hdr;
1355 *(pHdr++) = 0x00;
1356
1357 // Encode message number. First unreliable message?
1358 if ( nLastMsgNum == 0 )
1359 {
1360
1361 // Just always encode message number with 32 bits for now,
1362 // to make sure we are hitting the worst case. We can optimize this later
1363 *(uint32*)pHdr = LittleDWord( (uint32)pMsg->m_nMessageNumber ); pHdr += 4;
1364 m_hdr[0] |= 0x10;
1365 }
1366 else
1367 {
1368 // Subsequent unreliable message
1369 Assert( pMsg->m_nMessageNumber > nLastMsgNum );
1370 uint64 nDelta = pMsg->m_nMessageNumber - nLastMsgNum;
1371 if ( nDelta == 1 )
1372 {
1373 // Common case of sequential messages. Don't encode any offset
1374 }
1375 else
1376 {
1377 pHdr = SerializeVarInt( pHdr, nDelta, m_hdr+k_cbMaxHdr );
1378 Assert( pHdr ); // Overflow shouldn't be possible
1379 m_hdr[0] |= 0x10;
1380 }
1381 }
1382
1383 // Encode segment offset within message, except in the special common case of the first segment
1384 if ( nOffset > 0 )
1385 {
1386 pHdr = SerializeVarInt( pHdr, (uint32)( nOffset ), m_hdr+k_cbMaxHdr );
1387 Assert( pHdr ); // Overflow shouldn't be possible
1388 m_hdr[0] |= 0x08;
1389 }
1390
1391 m_cbHdr = pHdr-m_hdr;
1392
1393 // Size of the segment. We assume that the whole things fits for now, event hough it might ned to get truncated
1394 int cbSegData = pMsg->m_cbSize - nOffset;
1395 Assert( cbSegData > 0 || ( cbSegData == 0 && pMsg->m_cbSize == 0 ) ); // We should only send zero-byte segments if the message itself is zero bytes. (Which is legitimate!)
1396
1397 m_pMsg = pMsg;
1398 m_cbSegSize = cbSegData;
1399 m_nOffset = nOffset;
1400 }
1401
1402 };
1403
1404 template <typename T, typename L>
HasOverlappingRange(const SNPRange_t & range,const std_map<SNPRange_t,T,L> & map)1405 inline bool HasOverlappingRange( const SNPRange_t &range, const std_map<SNPRange_t,T,L> &map )
1406 {
1407 auto l = map.lower_bound( range );
1408 if ( l != map.end() )
1409 {
1410 Assert( l->first.m_nBegin >= range.m_nBegin );
1411 if ( l->first.m_nBegin < range.m_nEnd )
1412 return true;
1413 }
1414 auto u = map.upper_bound( range );
1415 if ( u != map.end() )
1416 {
1417 Assert( range.m_nBegin < u->first.m_nBegin );
1418 if ( range.m_nEnd > l->first.m_nBegin )
1419 return true;
1420 }
1421
1422 return false;
1423 }
1424
SNP_SendPacket(CConnectionTransport * pTransport,SendPacketContext_t & ctx)1425 bool CSteamNetworkConnectionBase::SNP_SendPacket( CConnectionTransport *pTransport, SendPacketContext_t &ctx )
1426 {
1427 // To send packets we need both the global lock and the connection lock
1428 AssertLocksHeldByCurrentThread( "SNP_SendPacket" );
1429
1430 // Check calling conditions, and don't crash
1431 if ( !BStateIsActive() || m_senderState.m_mapInFlightPacketsByPktNum.empty() || !pTransport )
1432 {
1433 Assert( BStateIsActive() );
1434 Assert( !m_senderState.m_mapInFlightPacketsByPktNum.empty() );
1435 Assert( pTransport );
1436 return false;
1437 }
1438
1439 SteamNetworkingMicroseconds usecNow = ctx.m_usecNow;
1440
1441 // Get max size of plaintext we could send.
1442 // AES-GCM has a fixed size overhead, for the tag.
1443 // FIXME - but what we if we aren't using AES-GCM!
1444 int cbMaxPlaintextPayload = std::max( 0, ctx.m_cbMaxEncryptedPayload-k_cbSteamNetwokingSocketsEncrytionTagSize );
1445 cbMaxPlaintextPayload = std::min( cbMaxPlaintextPayload, m_cbMaxPlaintextPayloadSend );
1446
1447 uint8 payload[ k_cbSteamNetworkingSocketsMaxPlaintextPayloadSend ];
1448 uint8 *pPayloadEnd = payload + cbMaxPlaintextPayload;
1449 uint8 *pPayloadPtr = payload;
1450
1451 int nLogLevelPacketDecode = m_connectionConfig.m_LogLevel_PacketDecode.Get();
1452 SpewVerboseGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld",
1453 GetDescription(),
1454 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber );
1455
1456 // Stop waiting frame
1457 pPayloadPtr = SNP_SerializeStopWaitingFrame( pPayloadPtr, pPayloadEnd, usecNow );
1458 if ( pPayloadPtr == nullptr )
1459 return false;
1460
1461 // Get list of ack blocks we might want to serialize, and which
1462 // of those acks we really want to flush out right now.
1463 SNPAckSerializerHelper ackHelper;
1464 SNP_GatherAckBlocks( ackHelper, usecNow );
1465
1466 #ifdef SNP_ENABLE_PACKETSENDLOG
1467 PacketSendLog *pLog = push_back_get_ptr( m_vecSendLog );
1468 pLog->m_usecTime = usecNow;
1469 pLog->m_cbPendingReliable = m_senderState.m_cbPendingReliable;
1470 pLog->m_cbPendingUnreliable = m_senderState.m_cbPendingUnreliable;
1471 pLog->m_nPacketGaps = len( m_receiverState.m_mapPacketGaps )-1;
1472 pLog->m_nAckBlocksNeeded = ackHelper.m_nBlocksNeedToAck;
1473 pLog->m_nPktNumNextPendingAck = m_receiverState.m_itPendingAck->first;
1474 pLog->m_usecNextPendingAckTime = m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior;
1475 pLog->m_fltokens = m_sendRateData.m_flTokenBucket;
1476 pLog->m_nMaxPktRecv = m_statsEndToEnd.m_nMaxRecvPktNum;
1477 pLog->m_nMinPktNumToSendAcks = m_receiverState.m_nMinPktNumToSendAcks;
1478 pLog->m_nReliableSegmentsRetry = 0;
1479 pLog->m_nSegmentsSent = 0;
1480 #endif
1481
1482 // How much space do we need to reserve for acks?
1483 int cbReserveForAcks = 0;
1484 if ( m_statsEndToEnd.m_nMaxRecvPktNum > 0 )
1485 {
1486 int cbPayloadRemainingForAcks = pPayloadEnd - pPayloadPtr;
1487 if ( cbPayloadRemainingForAcks >= SNPAckSerializerHelper::k_cbHeaderSize )
1488 {
1489 cbReserveForAcks = SNPAckSerializerHelper::k_cbHeaderSize;
1490 int n = 3; // Assume we want to send a handful
1491 n = std::max( n, ackHelper.m_nBlocksNeedToAck ); // But if we have blocks that need to be flushed now, try to fit all of them
1492 n = std::min( n, ackHelper.m_nBlocks ); // Cannot send more than we actually have
1493 while ( n > 0 )
1494 {
1495 --n;
1496 if ( ackHelper.m_arBlocks[n].m_cbTotalEncodedSize <= cbPayloadRemainingForAcks )
1497 {
1498 cbReserveForAcks = ackHelper.m_arBlocks[n].m_cbTotalEncodedSize;
1499 break;
1500 }
1501 }
1502 }
1503 }
1504
1505 // Check if we are actually going to send data in this packet
1506 if (
1507 m_sendRateData.m_flTokenBucket < 0.0 // No bandwidth available. (Presumably this is a relatively rare out-of-band connectivity check, etc) FIXME should we use a different token bucket per transport?
1508 || !BStateIsConnectedForWirePurposes() // not actually in a connection stats where we should be sending real data yet
1509 || pTransport != m_pTransport // transport is not the selected transport
1510 ) {
1511
1512 // Serialize some acks, if we want to
1513 if ( cbReserveForAcks > 0 )
1514 {
1515 // But if we're going to send any acks, then try to send as many
1516 // as possible, not just the bare minimum.
1517 pPayloadPtr = SNP_SerializeAckBlocks( ackHelper, pPayloadPtr, pPayloadEnd, usecNow );
1518 if ( pPayloadPtr == nullptr )
1519 return false; // bug! Abort
1520
1521 // We don't need to serialize any more acks
1522 cbReserveForAcks = 0;
1523 }
1524
1525 // Truncate the buffer, don't try to fit any data
1526 // !SPEED! - instead of doing this, we could just put all of the segment code below
1527 // in an else() block.
1528 pPayloadEnd = pPayloadPtr;
1529 }
1530
1531 int64 nLastReliableStreamPosEnd = 0;
1532 int cbBytesRemainingForSegments = pPayloadEnd - pPayloadPtr - cbReserveForAcks;
1533 vstd::small_vector<EncodedSegment,8> vecSegments;
1534
1535 // If we need to retry any reliable data, then try to put that in first.
1536 // Bail if we only have a tiny sliver of data left
1537 while ( !m_senderState.m_listReadyRetryReliableRange.empty() && cbBytesRemainingForSegments > 2 )
1538 {
1539 auto h = m_senderState.m_listReadyRetryReliableRange.begin();
1540
1541 // Start a reliable segment
1542 EncodedSegment &seg = *push_back_get_ptr( vecSegments );
1543 seg.SetupReliable( h->second, h->first.m_nBegin, h->first.m_nEnd, nLastReliableStreamPosEnd );
1544 int cbSegTotalWithoutSizeField = seg.m_cbHdr + seg.m_cbSegSize;
1545 if ( cbSegTotalWithoutSizeField > cbBytesRemainingForSegments )
1546 {
1547 // This one won't fit.
1548 vecSegments.pop_back();
1549
1550 // FIXME If there's a decent amount of space left in this packet, it might
1551 // be worthwhile to send what we can. Right now, once we send a reliable range,
1552 // we always retry exactly that range. The only complication would be when we
1553 // receive an ack, we would need to be aware that the acked ranges might not
1554 // exactly match up with the ranges that we sent. Actually this shouldn't
1555 // be that big of a deal. But for now let's always retry the exact ranges that
1556 // things got chopped up during the initial send.
1557
1558 // This should only happen if we have already fit some data in, or
1559 // the caller asked us to see what we could squeeze into a smaller
1560 // packet, or we need to serialized a bunch of acks. If this is an
1561 // opportunity to fill a normal packet and we fail on the first segment,
1562 // we will never make progress and we are hosed!
1563 AssertMsg2(
1564 nLastReliableStreamPosEnd > 0
1565 || cbMaxPlaintextPayload < m_cbMaxPlaintextPayloadSend
1566 || ( cbReserveForAcks > 15 && ackHelper.m_nBlocksNeedToAck > 8 ),
1567 "We cannot fit reliable segment, need %d bytes, only %d remaining", cbSegTotalWithoutSizeField, cbBytesRemainingForSegments
1568 );
1569
1570 // Don't try to put more stuff in the packet, even if we have room. We're
1571 // already having to retry, so this data is already delayed. If we skip ahead
1572 // and put more into this packet, that's just extending the time until we can send
1573 // the next packet.
1574 break;
1575 }
1576
1577 // If we only have a sliver left, then don't try to fit any more.
1578 cbBytesRemainingForSegments -= cbSegTotalWithoutSizeField;
1579 nLastReliableStreamPosEnd = h->first.m_nEnd;
1580
1581 // Assume for now this won't be the last segment, in which case we will also need
1582 // the byte for the size field.
1583 // NOTE: This might cause cbPayloadBytesRemaining to go negative by one! I know
1584 // that seems weird, but it actually keeps the logic below simpler.
1585 cbBytesRemainingForSegments -= 1;
1586
1587 // Remove from retry list. (We'll add to the in-flight list later)
1588 m_senderState.m_listReadyRetryReliableRange.erase( h );
1589
1590 #ifdef SNP_ENABLE_PACKETSENDLOG
1591 ++pLog->m_nReliableSegmentsRetry;
1592 #endif
1593 }
1594
1595 // Did we retry everything we needed to? If not, then don't try to send new stuff,
1596 // before we send those retries.
1597 if ( m_senderState.m_listReadyRetryReliableRange.empty() )
1598 {
1599
1600 // OK, check the outgoing messages, and send as much stuff as we can cram in there
1601 int64 nLastMsgNum = 0;
1602 while ( cbBytesRemainingForSegments > 4 )
1603 {
1604 if ( m_senderState.m_messagesQueued.empty() )
1605 {
1606 m_senderState.m_cbCurrentSendMessageSent = 0;
1607 break;
1608 }
1609 CSteamNetworkingMessage *pSendMsg = m_senderState.m_messagesQueued.m_pFirst;
1610 Assert( m_senderState.m_cbCurrentSendMessageSent < pSendMsg->m_cbSize );
1611
1612 // Start a new segment
1613 EncodedSegment &seg = *push_back_get_ptr( vecSegments );
1614
1615 // Reliable?
1616 bool bLastSegment = false;
1617 if ( pSendMsg->SNPSend_IsReliable() )
1618 {
1619
1620 // FIXME - Coalesce adjacent reliable messages ranges
1621
1622 int64 nBegin = pSendMsg->SNPSend_ReliableStreamPos() + m_senderState.m_cbCurrentSendMessageSent;
1623
1624 // How large would we like this segment to be,
1625 // ignoring how much space is left in the packet.
1626 // We limit the size of reliable segments, to make
1627 // sure that we don't make an excessively large
1628 // one and then have a hard time retrying it later.
1629 int cbDesiredSegSize = pSendMsg->m_cbSize - m_senderState.m_cbCurrentSendMessageSent;
1630 if ( cbDesiredSegSize > m_cbMaxReliableMessageSegment )
1631 {
1632 cbDesiredSegSize = m_cbMaxReliableMessageSegment;
1633 bLastSegment = true;
1634 }
1635
1636 int64 nEnd = nBegin + cbDesiredSegSize;
1637 seg.SetupReliable( pSendMsg, nBegin, nEnd, nLastReliableStreamPosEnd );
1638
1639 // If we encode subsequent
1640 nLastReliableStreamPosEnd = nEnd;
1641 }
1642 else
1643 {
1644 seg.SetupUnreliable( pSendMsg, m_senderState.m_cbCurrentSendMessageSent, nLastMsgNum );
1645 }
1646
1647 // Can't fit the whole thing?
1648 if ( bLastSegment || seg.m_cbHdr + seg.m_cbSegSize > cbBytesRemainingForSegments )
1649 {
1650
1651 // Check if we have enough room to send anything worthwhile.
1652 // Don't send really tiny silver segments at the very end of a packet. That sort of fragmentation
1653 // just makes it more likely for something to drop. Our goal is to reduce the number of packets
1654 // just as much as the total number of bytes, so if we're going to have to send another packet
1655 // anyway, don't send a little sliver of a message at the beginning of a packet
1656 // We need to finish the header by this point if we're going to send anything
1657 int cbMinSegDataSizeToSend = std::min( 16, seg.m_cbSegSize );
1658 if ( seg.m_cbHdr + cbMinSegDataSizeToSend > cbBytesRemainingForSegments )
1659 {
1660 // Don't send this segment now.
1661 vecSegments.pop_back();
1662 break;
1663 }
1664
1665 #ifdef SNP_ENABLE_PACKETSENDLOG
1666 ++pLog->m_nSegmentsSent;
1667 #endif
1668
1669 // Truncate, and leave the message in the queue
1670 seg.m_cbSegSize = std::min( seg.m_cbSegSize, cbBytesRemainingForSegments - seg.m_cbHdr );
1671 m_senderState.m_cbCurrentSendMessageSent += seg.m_cbSegSize;
1672 Assert( m_senderState.m_cbCurrentSendMessageSent < pSendMsg->m_cbSize );
1673 cbBytesRemainingForSegments -= seg.m_cbHdr + seg.m_cbSegSize;
1674 break;
1675 }
1676
1677 // The whole message fit (perhaps exactly, without the size byte)
1678 // Reset send pointer for the next message
1679 Assert( m_senderState.m_cbCurrentSendMessageSent + seg.m_cbSegSize == pSendMsg->m_cbSize );
1680 m_senderState.m_cbCurrentSendMessageSent = 0;
1681
1682 // Remove message from queue,w e have transfered ownership to the segment and will
1683 // dispose of the message when we serialize the segments
1684 m_senderState.m_messagesQueued.pop_front();
1685
1686 // Consume payload bytes
1687 cbBytesRemainingForSegments -= seg.m_cbHdr + seg.m_cbSegSize;
1688
1689 // Assume for now this won't be the last segment, in which case we will also need the byte for the size field.
1690 // NOTE: This might cause cbPayloadBytesRemaining to go negative by one! I know that seems weird, but it actually
1691 // keeps the logic below simpler.
1692 cbBytesRemainingForSegments -= 1;
1693
1694 // Update various accounting, depending on reliable or unreliable
1695 if ( pSendMsg->SNPSend_IsReliable() )
1696 {
1697 // Reliable segments advance the current message number.
1698 // NOTE: If we coalesce adjacent reliable segments, this will probably need to be adjusted
1699 if ( nLastMsgNum > 0 )
1700 ++nLastMsgNum;
1701
1702 // Go ahead and add us to the end of the list of unacked messages
1703 m_senderState.m_unackedReliableMessages.push_back( seg.m_pMsg );
1704 }
1705 else
1706 {
1707 nLastMsgNum = pSendMsg->m_nMessageNumber;
1708
1709 // Set the "This is the last segment in this message" header bit
1710 seg.m_hdr[0] |= 0x20;
1711 }
1712 }
1713 }
1714
1715 // Now we know how much space we need for the segments. If we asked to reserve
1716 // space for acks, we should have at least that much. But we might have more.
1717 // Serialize acks, as much as will fit. If we are badly fragmented and we have
1718 // the space, it's better to keep sending acks over and over to try to clear
1719 // it out as fast as possible.
1720 if ( cbReserveForAcks > 0 )
1721 {
1722
1723 // If we didn't use all the space for data, that's more we could use for acks
1724 int cbAvailForAcks = cbReserveForAcks;
1725 if ( cbBytesRemainingForSegments > 0 )
1726 cbAvailForAcks += cbBytesRemainingForSegments;
1727 uint8 *pAckEnd = pPayloadPtr + cbAvailForAcks;
1728 Assert( pAckEnd <= pPayloadEnd );
1729
1730 uint8 *pAfterAcks = SNP_SerializeAckBlocks( ackHelper, pPayloadPtr, pAckEnd, usecNow );
1731 if ( pAfterAcks == nullptr )
1732 return false; // bug! Abort
1733
1734 int cbAckBytesWritten = pAfterAcks - pPayloadPtr;
1735 if ( cbAckBytesWritten > cbReserveForAcks )
1736 {
1737 // We used more space for acks than was strictly reserved.
1738 // Update space remaining for data segments. We should have the room!
1739 cbBytesRemainingForSegments -= ( cbAckBytesWritten - cbReserveForAcks );
1740 Assert( cbBytesRemainingForSegments >= -1 ); // remember we might go over by one byte
1741 }
1742 else
1743 {
1744 Assert( cbAckBytesWritten == cbReserveForAcks ); // The code above reserves space very carefuly. So if we reserve it, we should fill it!
1745 }
1746
1747 pPayloadPtr = pAfterAcks;
1748 }
1749
1750 // We are gonna send a packet. Start filling out an entry so that when it's acked (or nacked)
1751 // we can know what to do.
1752 Assert( m_senderState.m_mapInFlightPacketsByPktNum.lower_bound( m_statsEndToEnd.m_nNextSendSequenceNumber ) == m_senderState.m_mapInFlightPacketsByPktNum.end() );
1753 std::pair<int64,SNPInFlightPacket_t> pairInsert( m_statsEndToEnd.m_nNextSendSequenceNumber, SNPInFlightPacket_t{ usecNow, false, pTransport, {} } );
1754 SNPInFlightPacket_t &inFlightPkt = pairInsert.second;
1755
1756 // We might have gone over exactly one byte, because we counted the size byte of the last
1757 // segment, which doesn't actually need to be sent
1758 Assert( cbBytesRemainingForSegments >= 0 || ( cbBytesRemainingForSegments == -1 && vecSegments.size() > 0 ) );
1759
1760 // OK, now go through and actually serialize the segments
1761 int nSegments = len( vecSegments );
1762 for ( int idx = 0 ; idx < nSegments ; ++idx )
1763 {
1764 EncodedSegment &seg = vecSegments[ idx ];
1765
1766 // Check if this message is still sitting in the queue. (If so, it has to be the first one!)
1767 bool bStillInQueue = ( seg.m_pMsg == m_senderState.m_messagesQueued.m_pFirst );
1768
1769 // Finish the segment size byte
1770 if ( idx < nSegments-1 )
1771 {
1772 // Stash upper 3 bits into the header
1773 int nUpper3Bits = ( seg.m_cbSegSize>>8 );
1774 Assert( nUpper3Bits <= 4 ); // The values 5 and 6 are reserved and shouldn't be needed due to the MTU we support
1775 seg.m_hdr[0] |= nUpper3Bits;
1776
1777 // And the lower 8 bits follow the other fields
1778 seg.m_hdr[ seg.m_cbHdr++ ] = uint8( seg.m_cbSegSize );
1779 }
1780 else
1781 {
1782 // Set "no explicit size field included, segment extends to end of packet"
1783 seg.m_hdr[0] |= 7;
1784 }
1785
1786 // Double-check that we didn't overflow
1787 Assert( seg.m_cbHdr <= seg.k_cbMaxHdr );
1788
1789 // Copy the header
1790 memcpy( pPayloadPtr, seg.m_hdr, seg.m_cbHdr ); pPayloadPtr += seg.m_cbHdr;
1791 Assert( pPayloadPtr+seg.m_cbSegSize <= pPayloadEnd );
1792
1793 // Reliable?
1794 if ( seg.m_pMsg->SNPSend_IsReliable() )
1795 {
1796 // We should never encode an empty range of the stream, that is worthless.
1797 // (Even an empty reliable message requires some framing in the stream.)
1798 Assert( seg.m_cbSegSize > 0 );
1799
1800 // Copy the unreliable segment into the packet. Does the portion we are serializing
1801 // begin in the header?
1802 if ( seg.m_nOffset < seg.m_pMsg->m_cbSNPSendReliableHeader )
1803 {
1804 int cbCopyHdr = std::min( seg.m_cbSegSize, seg.m_pMsg->m_cbSNPSendReliableHeader - seg.m_nOffset );
1805
1806 memcpy( pPayloadPtr, seg.m_pMsg->SNPSend_ReliableHeader() + seg.m_nOffset, cbCopyHdr );
1807 pPayloadPtr += cbCopyHdr;
1808
1809 int cbCopyBody = seg.m_cbSegSize - cbCopyHdr;
1810 if ( cbCopyBody > 0 )
1811 {
1812 memcpy( pPayloadPtr, seg.m_pMsg->m_pData, cbCopyBody );
1813 pPayloadPtr += cbCopyBody;
1814 }
1815 }
1816 else
1817 {
1818 // This segment is entirely from the message body
1819 memcpy( pPayloadPtr, (char*)seg.m_pMsg->m_pData + seg.m_nOffset - seg.m_pMsg->m_cbSNPSendReliableHeader, seg.m_cbSegSize );
1820 pPayloadPtr += seg.m_cbSegSize;
1821 }
1822
1823
1824 // Remember that this range is in-flight
1825 SNPRange_t range;
1826 range.m_nBegin = seg.m_pMsg->SNPSend_ReliableStreamPos() + seg.m_nOffset;
1827 range.m_nEnd = range.m_nBegin + seg.m_cbSegSize;
1828
1829 // Ranges of the reliable stream that have not been acked should either be
1830 // in flight, or queued for retry. Make sure this range is not already in
1831 // either state.
1832 Assert( !HasOverlappingRange( range, m_senderState.m_listInFlightReliableRange ) );
1833 Assert( !HasOverlappingRange( range, m_senderState.m_listReadyRetryReliableRange ) );
1834
1835 // Spew
1836 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld reliable msg %lld offset %d+%d=%d range [%lld,%lld)\n",
1837 GetDescription(), (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)seg.m_pMsg->m_nMessageNumber,
1838 seg.m_nOffset, seg.m_cbSegSize, seg.m_nOffset+seg.m_cbSegSize,
1839 (long long)range.m_nBegin, (long long)range.m_nEnd );
1840
1841 // Add to table of in-flight reliable ranges
1842 m_senderState.m_listInFlightReliableRange[ range ] = seg.m_pMsg;
1843
1844 // Remember that this packet contained that range
1845 inFlightPkt.m_vecReliableSegments.push_back( range );
1846
1847 // Less reliable data pending
1848 m_senderState.m_cbPendingReliable -= seg.m_cbSegSize;
1849 Assert( m_senderState.m_cbPendingReliable >= 0 );
1850
1851 // More data waiting to be acked
1852 m_senderState.m_cbSentUnackedReliable += seg.m_cbSegSize;
1853 }
1854 else
1855 {
1856 // We should only encode an empty segment if the message itself is empty
1857 Assert( seg.m_cbSegSize > 0 || ( seg.m_cbSegSize == 0 && seg.m_pMsg->m_cbSize == 0 ) );
1858
1859 // Check some stuff
1860 Assert( bStillInQueue == ( seg.m_nOffset + seg.m_cbSegSize < seg.m_pMsg->m_cbSize ) ); // If we ended the message, we should have removed it from the queue
1861 Assert( bStillInQueue == ( ( seg.m_hdr[0] & 0x20 ) == 0 ) );
1862 Assert( bStillInQueue || seg.m_pMsg->m_links.m_pNext == nullptr ); // If not in the queue, we should be detached
1863 Assert( seg.m_pMsg->m_links.m_pPrev == nullptr ); // We should either be at the head of the queue, or detached
1864
1865 // Copy the unreliable segment into the packet
1866 memcpy( pPayloadPtr, (char*)seg.m_pMsg->m_pData + seg.m_nOffset, seg.m_cbSegSize );
1867 pPayloadPtr += seg.m_cbSegSize;
1868
1869 // Spew
1870 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld unreliable msg %lld offset %d+%d=%d\n",
1871 GetDescription(), (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)seg.m_pMsg->m_nMessageNumber,
1872 seg.m_nOffset, seg.m_cbSegSize, seg.m_nOffset+seg.m_cbSegSize );
1873
1874 // Less unreliable data pending
1875 m_senderState.m_cbPendingUnreliable -= seg.m_cbSegSize;
1876 Assert( m_senderState.m_cbPendingUnreliable >= 0 );
1877
1878 // Done with this message? Clean up
1879 if ( !bStillInQueue )
1880 seg.m_pMsg->Release();
1881 }
1882 }
1883
1884 // One last check for overflow
1885 Assert( pPayloadPtr <= pPayloadEnd );
1886 int cbPlainText = pPayloadPtr - payload;
1887 if ( cbPlainText > cbMaxPlaintextPayload )
1888 {
1889 AssertMsg1( false, "Payload exceeded max size of %d\n", cbMaxPlaintextPayload );
1890 return 0;
1891 }
1892
1893 // OK, we have a plaintext payload. Encrypt and send it.
1894 // What cipher are we using?
1895 int nBytesSent = 0;
1896 switch ( m_eNegotiatedCipher )
1897 {
1898 default:
1899 AssertMsg1( false, "Bogus cipher %d", m_eNegotiatedCipher );
1900 break;
1901
1902 case k_ESteamNetworkingSocketsCipher_NULL:
1903 {
1904
1905 // No encryption!
1906 // Ask current transport to deliver it
1907 nBytesSent = pTransport->SendEncryptedDataChunk( payload, cbPlainText, ctx );
1908 }
1909 break;
1910
1911 case k_ESteamNetworkingSocketsCipher_AES_256_GCM:
1912 {
1913
1914 Assert( m_bCryptKeysValid );
1915
1916 // Adjust the IV by the packet number
1917 *(uint64 *)&m_cryptIVSend.m_buf += LittleQWord( m_statsEndToEnd.m_nNextSendSequenceNumber );
1918
1919 // Encrypt the chunk
1920 uint8 arEncryptedChunk[ k_cbSteamNetworkingSocketsMaxEncryptedPayloadSend + 64 ]; // Should not need pad
1921 uint32 cbEncrypted = sizeof(arEncryptedChunk);
1922 DbgVerify( m_cryptContextSend.Encrypt(
1923 payload, cbPlainText, // plaintext
1924 m_cryptIVSend.m_buf, // IV
1925 arEncryptedChunk, &cbEncrypted, // output
1926 nullptr, 0 // no AAD
1927 ) );
1928
1929 //SpewMsg( "Send encrypt IV %llu + %02x%02x%02x%02x encrypted %d %02x%02x%02x%02x\n",
1930 // *(uint64 *)&m_cryptIVSend.m_buf,
1931 // m_cryptIVSend.m_buf[8], m_cryptIVSend.m_buf[9], m_cryptIVSend.m_buf[10], m_cryptIVSend.m_buf[11],
1932 // cbEncrypted,
1933 // arEncryptedChunk[0], arEncryptedChunk[1], arEncryptedChunk[2],arEncryptedChunk[3]
1934 //);
1935
1936 // Restore the IV to the base value
1937 *(uint64 *)&m_cryptIVSend.m_buf -= LittleQWord( m_statsEndToEnd.m_nNextSendSequenceNumber );
1938
1939 Assert( (int)cbEncrypted >= cbPlainText );
1940 Assert( (int)cbEncrypted <= k_cbSteamNetworkingSocketsMaxEncryptedPayloadSend ); // confirm that pad above was not necessary and we never exceed k_nMaxSteamDatagramTransportPayload, even after encrypting
1941
1942 // Ask current transport to deliver it
1943 nBytesSent = pTransport->SendEncryptedDataChunk( arEncryptedChunk, cbEncrypted, ctx );
1944 }
1945 }
1946 if ( nBytesSent <= 0 )
1947 return false;
1948
1949 // We sent a packet. Track it
1950 auto pairInsertResult = m_senderState.m_mapInFlightPacketsByPktNum.insert( pairInsert );
1951 Assert( pairInsertResult.second ); // We should have inserted a new element, not updated an existing element
1952
1953 // If we sent any reliable data, we should expect a reply
1954 if ( !inFlightPkt.m_vecReliableSegments.empty() )
1955 {
1956 m_statsEndToEnd.TrackSentMessageExpectingSeqNumAck( usecNow, true );
1957 // FIXME - should let transport know
1958 }
1959
1960 // If we aren't already tracking anything to timeout, then this is the next one.
1961 if ( m_senderState.m_itNextInFlightPacketToTimeout == m_senderState.m_mapInFlightPacketsByPktNum.end() )
1962 m_senderState.m_itNextInFlightPacketToTimeout = pairInsertResult.first;
1963
1964 #ifdef SNP_ENABLE_PACKETSENDLOG
1965 pLog->m_cbSent = nBytesSent;
1966 #endif
1967
1968 // We spent some tokens
1969 m_sendRateData.m_flTokenBucket -= (float)nBytesSent;
1970 return true;
1971 }
1972
SNP_SentNonDataPacket(CConnectionTransport * pTransport,int cbPkt,SteamNetworkingMicroseconds usecNow)1973 void CSteamNetworkConnectionBase::SNP_SentNonDataPacket( CConnectionTransport *pTransport, int cbPkt, SteamNetworkingMicroseconds usecNow )
1974 {
1975 std::pair<int64,SNPInFlightPacket_t> pairInsert( m_statsEndToEnd.m_nNextSendSequenceNumber-1, SNPInFlightPacket_t{ usecNow, false, pTransport, {} } );
1976 auto pairInsertResult = m_senderState.m_mapInFlightPacketsByPktNum.insert( pairInsert );
1977 Assert( pairInsertResult.second ); // We should have inserted a new element, not updated an existing element. Probably an order ofoperations bug with m_nNextSendSequenceNumber
1978
1979 // Spend tokens from the bucket
1980 m_sendRateData.m_flTokenBucket -= (float)cbPkt;
1981 }
1982
SNP_GatherAckBlocks(SNPAckSerializerHelper & helper,SteamNetworkingMicroseconds usecNow)1983 void CSteamNetworkConnectionBase::SNP_GatherAckBlocks( SNPAckSerializerHelper &helper, SteamNetworkingMicroseconds usecNow )
1984 {
1985 helper.m_nBlocks = 0;
1986 helper.m_nBlocksNeedToAck = 0;
1987
1988 // Fast case for no packet loss we need to ack, which will (hopefully!) be a common case
1989 int n = len( m_receiverState.m_mapPacketGaps ) - 1;
1990 if ( n <= 0 )
1991 return;
1992
1993 // Let's not just flush the acks that are due right now. Let's flush all of them
1994 // that will be due any time before we have the bandwidth to send the next packet.
1995 // (Assuming that we send the max packet size here.)
1996 SteamNetworkingMicroseconds usecSendAcksDueBefore = usecNow;
1997 SteamNetworkingMicroseconds usecTimeUntilNextPacket = SteamNetworkingMicroseconds( ( m_sendRateData.m_flTokenBucket - (float)m_cbMTUPacketSize ) / m_sendRateData.m_flCurrentSendRateUsed * -1e6 );
1998 if ( usecTimeUntilNextPacket > 0 )
1999 usecSendAcksDueBefore += usecTimeUntilNextPacket;
2000
2001 m_receiverState.DebugCheckPackGapMap();
2002
2003 n = std::min( (int)helper.k_nMaxBlocks, n );
2004 auto itNext = m_receiverState.m_mapPacketGaps.begin();
2005
2006 int cbEncodedSize = helper.k_cbHeaderSize;
2007 while ( n > 0 )
2008 {
2009 --n;
2010 auto itCur = itNext;
2011 ++itNext;
2012
2013 Assert( itCur->first < itCur->second.m_nEnd );
2014
2015 // Do we need to report on this block now?
2016 bool bNeedToReport = ( itNext->second.m_usecWhenAckPrior <= usecSendAcksDueBefore );
2017
2018 // Should we wait to NACK this?
2019 if ( itCur == m_receiverState.m_itPendingNack )
2020 {
2021
2022 // Wait to NACK this?
2023 if ( !bNeedToReport )
2024 {
2025 if ( usecNow < itCur->second.m_usecWhenOKToNack )
2026 break;
2027 bNeedToReport = true;
2028 }
2029
2030 // Go ahead and NACK it. If the packet arrives, we will use it.
2031 // But our NACK may cause the sender to retransmit.
2032 ++m_receiverState.m_itPendingNack;
2033 }
2034
2035 SNPAckSerializerHelper::Block &block = helper.m_arBlocks[ helper.m_nBlocks ];
2036 block.m_nNack = uint32( itCur->second.m_nEnd - itCur->first );
2037
2038 int64 nAckEnd;
2039 SteamNetworkingMicroseconds usecWhenSentLast;
2040 if ( n == 0 )
2041 {
2042 // itNext should be the sentinel
2043 Assert( itNext->first == INT64_MAX );
2044 nAckEnd = m_statsEndToEnd.m_nMaxRecvPktNum+1;
2045 usecWhenSentLast = m_statsEndToEnd.m_usecTimeLastRecvSeq;
2046 }
2047 else
2048 {
2049 nAckEnd = itNext->first;
2050 usecWhenSentLast = itNext->second.m_usecWhenReceivedPktBefore;
2051 }
2052 Assert( itCur->second.m_nEnd < nAckEnd );
2053 block.m_nAck = uint32( nAckEnd - itCur->second.m_nEnd );
2054
2055 block.m_nLatestPktNum = uint32( nAckEnd-1 );
2056 block.m_nEncodedTimeSinceLatestPktNum = SNPAckSerializerHelper::EncodeTimeSince( usecNow, usecWhenSentLast );
2057
2058 // When we encode 7+ blocks, the header grows by one byte
2059 // to store an explicit count
2060 if ( helper.m_nBlocks == 6 )
2061 ++cbEncodedSize;
2062
2063 // This block
2064 ++cbEncodedSize;
2065 if ( block.m_nAck > 7 )
2066 cbEncodedSize += VarIntSerializedSize( block.m_nAck>>3 );
2067 if ( block.m_nNack > 7 )
2068 cbEncodedSize += VarIntSerializedSize( block.m_nNack>>3 );
2069 block.m_cbTotalEncodedSize = cbEncodedSize;
2070
2071 // FIXME Here if the caller knows they are working with limited space,
2072 // they could tell us how much space they have and we could bail
2073 // if we already know we're over
2074
2075 ++helper.m_nBlocks;
2076
2077 // Do we really need to try to flush the ack/nack for that block out now?
2078 if ( bNeedToReport )
2079 helper.m_nBlocksNeedToAck = helper.m_nBlocks;
2080 }
2081 }
2082
SNP_SerializeAckBlocks(const SNPAckSerializerHelper & helper,uint8 * pOut,const uint8 * pOutEnd,SteamNetworkingMicroseconds usecNow)2083 uint8 *CSteamNetworkConnectionBase::SNP_SerializeAckBlocks( const SNPAckSerializerHelper &helper, uint8 *pOut, const uint8 *pOutEnd, SteamNetworkingMicroseconds usecNow )
2084 {
2085
2086 // We shouldn't be called if we never received anything
2087 Assert( m_statsEndToEnd.m_nMaxRecvPktNum > 0 );
2088
2089 // No room even for the header?
2090 if ( pOut + SNPAckSerializerHelper::k_cbHeaderSize > pOutEnd )
2091 return pOut;
2092
2093 // !KLUDGE! For now limit number of blocks, and always use 16-bit ID.
2094 // Later we might want to make this code smarter.
2095 COMPILE_TIME_ASSERT( SNPAckSerializerHelper::k_cbHeaderSize == 5 );
2096 uint8 *pAckHeaderByte = pOut;
2097 ++pOut;
2098 uint16 *pLatestPktNum = (uint16 *)pOut;
2099 pOut += 2;
2100 uint16 *pTimeSinceLatestPktNum = (uint16 *)pOut;
2101 pOut += 2;
2102
2103 // 10011000 - ack frame designator, with 16-bit last-received sequence number, and no ack blocks
2104 *pAckHeaderByte = 0x98;
2105
2106 int nLogLevelPacketDecode = m_connectionConfig.m_LogLevel_PacketDecode.Get();
2107
2108 #ifdef SNP_ENABLE_PACKETSENDLOG
2109 PacketSendLog *pLog = &m_vecSendLog[ m_vecSendLog.size()-1 ];
2110 #endif
2111
2112 // Fast case for no packet loss we need to ack, which will (hopefully!) be a common case
2113 if ( m_receiverState.m_mapPacketGaps.size() == 1 )
2114 {
2115 int64 nLastRecvPktNum = m_statsEndToEnd.m_nMaxRecvPktNum;
2116 *pLatestPktNum = LittleWord( (uint16)nLastRecvPktNum );
2117 *pTimeSinceLatestPktNum = LittleWord( (uint16)SNPAckSerializerHelper::EncodeTimeSince( usecNow, m_statsEndToEnd.m_usecTimeLastRecvSeq ) );
2118
2119 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld last recv %lld (no loss)\n",
2120 GetDescription(),
2121 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)nLastRecvPktNum
2122 );
2123 m_receiverState.m_mapPacketGaps.rbegin()->second.m_usecWhenAckPrior = INT64_MAX; // Clear timer, we wrote everything we needed to
2124
2125 #ifdef SNP_ENABLE_PACKETSENDLOG
2126 pLog->m_nAckBlocksSent = 0;
2127 pLog->m_nAckEnd = nLastRecvPktNum;
2128 #endif
2129
2130 return pOut;
2131 }
2132
2133 // Fit as many blocks as possible.
2134 // (Unless we are badly fragmented and are trying to squeeze in what
2135 // we can at the end of a packet, this won't ever iterate
2136 int nBlocks = helper.m_nBlocks;
2137 uint8 *pExpectedOutEnd;
2138 for (;;)
2139 {
2140
2141 // Not sending any blocks at all? (Either they don't fit, or we are waiting because we don't
2142 // want to nack yet.) Just fill in the header with the oldest ack
2143 if ( nBlocks == 0 )
2144 {
2145 auto itOldestGap = m_receiverState.m_mapPacketGaps.begin();
2146 int64 nLastRecvPktNum = itOldestGap->first-1;
2147 *pLatestPktNum = LittleWord( uint16( nLastRecvPktNum ) );
2148 *pTimeSinceLatestPktNum = LittleWord( (uint16)SNPAckSerializerHelper::EncodeTimeSince( usecNow, itOldestGap->second.m_usecWhenReceivedPktBefore ) );
2149
2150 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld last recv %lld (no blocks, actual last recv=%lld)\n",
2151 GetDescription(),
2152 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)nLastRecvPktNum, (long long)m_statsEndToEnd.m_nMaxRecvPktNum
2153 );
2154
2155 #ifdef SNP_ENABLE_PACKETSENDLOG
2156 pLog->m_nAckBlocksSent = 0;
2157 pLog->m_nAckEnd = nLastRecvPktNum;
2158 #endif
2159
2160 // Acked packets before this gap. Were we waiting to flush them?
2161 if ( itOldestGap == m_receiverState.m_itPendingAck )
2162 {
2163 // Mark it as sent
2164 m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior = INT64_MAX;
2165 ++m_receiverState.m_itPendingAck;
2166 }
2167
2168 // NOTE: We did NOT nack anything just now
2169 return pOut;
2170 }
2171
2172 int cbTotalEncoded = helper.m_arBlocks[nBlocks-1].m_cbTotalEncodedSize;
2173 pExpectedOutEnd = pAckHeaderByte + cbTotalEncoded; // Save for debugging below
2174 if ( pExpectedOutEnd <= pOutEnd )
2175 break;
2176
2177 // Won't fit, peel off the newest one, see if the earlier ones will fit
2178 --nBlocks;
2179 }
2180
2181 // OK, we know how many blocks we are going to write. Finish the header byte
2182 Assert( nBlocks == uint8(nBlocks) );
2183 if ( nBlocks > 6 )
2184 {
2185 *pAckHeaderByte |= 7;
2186 *(pOut++) = uint8( nBlocks );
2187 }
2188 else
2189 {
2190 *pAckHeaderByte |= uint8( nBlocks );
2191 }
2192
2193 // Locate the first one we will serialize.
2194 // (It's the newest one, which is the last one in the list).
2195 const SNPAckSerializerHelper::Block *pBlock = &helper.m_arBlocks[nBlocks-1];
2196
2197 // Latest packet number and time
2198 *pLatestPktNum = LittleWord( uint16( pBlock->m_nLatestPktNum ) );
2199 *pTimeSinceLatestPktNum = LittleWord( pBlock->m_nEncodedTimeSinceLatestPktNum );
2200
2201 // Full packet number, for spew
2202 int64 nAckEnd = ( m_statsEndToEnd.m_nMaxRecvPktNum & ~(int64)(~(uint32)0) ) | pBlock->m_nLatestPktNum;
2203 ++nAckEnd;
2204
2205 #ifdef SNP_ENABLE_PACKETSENDLOG
2206 pLog->m_nAckBlocksSent = nBlocks;
2207 pLog->m_nAckEnd = nAckEnd;
2208 #endif
2209
2210 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld last recv %lld (%d blocks, actual last recv=%lld)\n",
2211 GetDescription(),
2212 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)(nAckEnd-1), nBlocks, (long long)m_statsEndToEnd.m_nMaxRecvPktNum
2213 );
2214
2215 // Check for a common case where we report on everything
2216 if ( nAckEnd > m_statsEndToEnd.m_nMaxRecvPktNum )
2217 {
2218 Assert( nAckEnd == m_statsEndToEnd.m_nMaxRecvPktNum+1 );
2219 for (;;)
2220 {
2221 m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior = INT64_MAX;
2222 if ( m_receiverState.m_itPendingAck->first == INT64_MAX )
2223 break;
2224 ++m_receiverState.m_itPendingAck;
2225 }
2226 m_receiverState.m_itPendingNack = m_receiverState.m_itPendingAck;
2227 }
2228 else
2229 {
2230
2231 // Advance pointer to next block that needs to be acked,
2232 // past the ones we are about to ack.
2233 if ( m_receiverState.m_itPendingAck->first <= nAckEnd )
2234 {
2235 do
2236 {
2237 m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior = INT64_MAX;
2238 ++m_receiverState.m_itPendingAck;
2239 } while ( m_receiverState.m_itPendingAck->first <= nAckEnd );
2240 }
2241
2242 // Advance pointer to next block that needs to be nacked, past the ones
2243 // we are about to nack.
2244 while ( m_receiverState.m_itPendingNack->first < nAckEnd )
2245 ++m_receiverState.m_itPendingNack;
2246 }
2247
2248 // Serialize the blocks into the packet, from newest to oldest
2249 while ( pBlock >= helper.m_arBlocks )
2250 {
2251 uint8 *pAckBlockHeaderByte = pOut;
2252 ++pOut;
2253
2254 // Encode ACK (number of packets successfully received)
2255 {
2256 if ( pBlock->m_nAck < 8 )
2257 {
2258 // Small block of packets. Encode directly in the header.
2259 *pAckBlockHeaderByte = uint8(pBlock->m_nAck << 4);
2260 }
2261 else
2262 {
2263 // Larger block of received packets. Put lowest bits in the header,
2264 // and overflow using varint. This is probably going to be pretty
2265 // common.
2266 *pAckBlockHeaderByte = 0x80 | ( uint8(pBlock->m_nAck & 7) << 4 );
2267 pOut = SerializeVarInt( pOut, pBlock->m_nAck>>3, pOutEnd );
2268 if ( pOut == nullptr )
2269 {
2270 AssertMsg( false, "Overflow serializing packet ack varint count" );
2271 return nullptr;
2272 }
2273 }
2274 }
2275
2276 // Encode NACK (number of packets dropped)
2277 {
2278 if ( pBlock->m_nNack < 8 )
2279 {
2280 // Small block of packets. Encode directly in the header.
2281 *pAckBlockHeaderByte |= uint8(pBlock->m_nNack);
2282 }
2283 else
2284 {
2285 // Larger block of dropped packets. Put lowest bits in the header,
2286 // and overflow using varint. This is probably going to be less common than
2287 // large ACK runs, but not totally uncommon. Losing one or two packets is
2288 // really common, but loss events often involve a lost of many packets in a run.
2289 *pAckBlockHeaderByte |= 0x08 | uint8(pBlock->m_nNack & 7);
2290 pOut = SerializeVarInt( pOut, pBlock->m_nNack >> 3, pOutEnd );
2291 if ( pOut == nullptr )
2292 {
2293 AssertMsg( false, "Overflow serializing packet nack varint count" );
2294 return nullptr;
2295 }
2296 }
2297 }
2298
2299 // Debug
2300 int64 nAckBegin = nAckEnd - pBlock->m_nAck;
2301 int64 nNackBegin = nAckBegin - pBlock->m_nNack;
2302 SpewDebugGroup( nLogLevelPacketDecode, "[%s] encode pkt %lld nack [%lld,%lld) ack [%lld,%lld) \n",
2303 GetDescription(),
2304 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber,
2305 (long long)nNackBegin, (long long)nAckBegin,
2306 (long long)nAckBegin, (long long)nAckEnd
2307 );
2308 nAckEnd = nNackBegin;
2309 Assert( nAckEnd > 0 ); // Make sure we don't try to ack packet 0 or below
2310
2311 // Move backwards in time
2312 --pBlock;
2313 }
2314
2315 // Make sure when we were checking what would fit, we correctly calculated serialized size
2316 Assert( pOut == pExpectedOutEnd );
2317
2318 return pOut;
2319 }
2320
SNP_SerializeStopWaitingFrame(uint8 * pOut,const uint8 * pOutEnd,SteamNetworkingMicroseconds usecNow)2321 uint8 *CSteamNetworkConnectionBase::SNP_SerializeStopWaitingFrame( uint8 *pOut, const uint8 *pOutEnd, SteamNetworkingMicroseconds usecNow )
2322 {
2323 // For now, we will always write this. We should optimize this and try to be
2324 // smart about when to send it (probably maybe once per RTT, or when N packets
2325 // have been received or N blocks accumulate?)
2326
2327 // Calculate offset from the current sequence number
2328 int64 nOffset = m_statsEndToEnd.m_nNextSendSequenceNumber - m_senderState.m_nMinPktWaitingOnAck;
2329 AssertMsg2( nOffset > 0, "Told peer to stop acking up to %lld, but latest packet we have sent is %lld", (long long)m_senderState.m_nMinPktWaitingOnAck, (long long)m_statsEndToEnd.m_nNextSendSequenceNumber );
2330 SpewVerboseGroup( m_connectionConfig.m_LogLevel_PacketDecode.Get(), "[%s] encode pkt %lld stop_waiting offset %lld = %lld",
2331 GetDescription(),
2332 (long long)m_statsEndToEnd.m_nNextSendSequenceNumber, (long long)nOffset, (long long)m_senderState.m_nMinPktWaitingOnAck );
2333
2334 // Subtract one, as a *tiny* optimization, since they cannot possible have
2335 // acknowledged this packet we are serializing already
2336 --nOffset;
2337
2338 // Now encode based on number of bits needed
2339 if ( nOffset < 0x100 )
2340 {
2341 if ( pOut + 2 > pOutEnd )
2342 return pOut;
2343 *pOut = 0x80;
2344 ++pOut;
2345 *pOut = uint8( nOffset );
2346 ++pOut;
2347 }
2348 else if ( nOffset < 0x10000 )
2349 {
2350 if ( pOut + 3 > pOutEnd )
2351 return pOut;
2352 *pOut = 0x81;
2353 ++pOut;
2354 *(uint16*)pOut = LittleWord( uint16( nOffset ) );
2355 pOut += 2;
2356 }
2357 else if ( nOffset < 0x1000000 )
2358 {
2359 if ( pOut + 4 > pOutEnd )
2360 return pOut;
2361 *pOut = 0x82;
2362 ++pOut;
2363 *pOut = uint8( nOffset ); // Wire format is little endian, so lowest 8 bits first
2364 ++pOut;
2365 *(uint16*)pOut = LittleWord( uint16( nOffset>>8 ) );
2366 pOut += 2;
2367 }
2368 else
2369 {
2370 if ( pOut + 9 > pOutEnd )
2371 return pOut;
2372 *pOut = 0x83;
2373 ++pOut;
2374 *(uint64*)pOut = LittleQWord( nOffset );
2375 pOut += 8;
2376 }
2377
2378 Assert( pOut <= pOutEnd );
2379 return pOut;
2380 }
2381
SNP_ReceiveUnreliableSegment(int64 nMsgNum,int nOffset,const void * pSegmentData,int cbSegmentSize,bool bLastSegmentInMessage,SteamNetworkingMicroseconds usecNow)2382 void CSteamNetworkConnectionBase::SNP_ReceiveUnreliableSegment( int64 nMsgNum, int nOffset, const void *pSegmentData, int cbSegmentSize, bool bLastSegmentInMessage, SteamNetworkingMicroseconds usecNow )
2383 {
2384 SpewDebugGroup( m_connectionConfig.m_LogLevel_PacketDecode.Get(), "[%s] RX msg %lld offset %d+%d=%d %02x ... %02x\n", GetDescription(), nMsgNum, nOffset, cbSegmentSize, nOffset+cbSegmentSize, ((byte*)pSegmentData)[0], ((byte*)pSegmentData)[cbSegmentSize-1] );
2385
2386 // Ignore data segments when we are not going to process them (e.g. linger)
2387 if ( GetState() != k_ESteamNetworkingConnectionState_Connected )
2388 {
2389 SpewDebugGroup( m_connectionConfig.m_LogLevel_PacketDecode.Get(), "[%s] discarding msg %lld [%d,%d) as connection is in state %d\n",
2390 GetDescription(),
2391 nMsgNum,
2392 nOffset, nOffset+cbSegmentSize,
2393 (int)GetState() );
2394 return;
2395 }
2396
2397 // Check for a common special case: non-fragmented message.
2398 if ( nOffset == 0 && bLastSegmentInMessage )
2399 {
2400
2401 // Deliver it immediately, don't go through the fragmentation assembly process below.
2402 // (Although that would work.)
2403 ReceivedMessage( pSegmentData, cbSegmentSize, nMsgNum, k_nSteamNetworkingSend_Unreliable, usecNow );
2404 return;
2405 }
2406
2407 // Limit number of unreliable segments we store. We just use a fixed
2408 // limit, rather than trying to be smart by expiring based on time or whatever.
2409 if ( len( m_receiverState.m_mapUnreliableSegments ) > k_nMaxBufferedUnreliableSegments )
2410 {
2411 auto itDelete = m_receiverState.m_mapUnreliableSegments.begin();
2412
2413 // If we're going to delete some, go ahead and delete all of them for this
2414 // message.
2415 int64 nDeleteMsgNum = itDelete->first.m_nMsgNum;
2416 do {
2417 itDelete = m_receiverState.m_mapUnreliableSegments.erase( itDelete );
2418 } while ( itDelete != m_receiverState.m_mapUnreliableSegments.end() && itDelete->first.m_nMsgNum == nDeleteMsgNum );
2419
2420 // Warn if the message we are receiving is older (or the same) than the one
2421 // we are deleting. If sender is legit, then it probably means that we have
2422 // something tuned badly.
2423 if ( nDeleteMsgNum >= nMsgNum )
2424 {
2425 // Spew, but rate limit in case of malicious sender
2426 SpewWarningRateLimited( usecNow, "[%s] SNP expiring unreliable segments for msg %lld, while receiving unreliable segments for msg %lld\n",
2427 GetDescription(), (long long)nDeleteMsgNum, (long long)nMsgNum );
2428 }
2429 }
2430
2431 // Message fragment. Find/insert the entry in our reassembly queue
2432 // I really hate this syntax and interface.
2433 SSNPRecvUnreliableSegmentKey key;
2434 key.m_nMsgNum = nMsgNum;
2435 key.m_nOffset = nOffset;
2436 SSNPRecvUnreliableSegmentData &data = m_receiverState.m_mapUnreliableSegments[ key ];
2437 if ( data.m_cbSegSize >= 0 )
2438 {
2439 // We got another segment starting at the same offset. This is weird, since they shouldn't
2440 // be doing. But remember that we're working on top of UDP, which could deliver packets
2441 // multiple times. We'll spew about it, just in case it indicates a bug in this code or the sender.
2442 SpewWarningRateLimited( usecNow, "[%s] Received unreliable msg %lld segment offset %d twice. Sizes %d,%d, last=%d,%d\n",
2443 GetDescription(), nMsgNum, nOffset, data.m_cbSegSize, cbSegmentSize, (int)data.m_bLast, (int)bLastSegmentInMessage );
2444
2445 // Just drop the segment. Note that the sender might have sent a longer segment from the previous
2446 // one, in which case this segment contains new data, and is not therefore redundant. That seems
2447 // "legal", but very weird, and not worth handling. If senders do retransmit unreliable segments
2448 // (perhaps FEC?) then they need to retransmit the exact same segments.
2449 return;
2450 }
2451
2452 // Segment in the map either just got inserted, or is a subset of the segment
2453 // we just received. Replace it.
2454 data.m_cbSegSize = cbSegmentSize;
2455 Assert( !data.m_bLast );
2456 data.m_bLast = bLastSegmentInMessage;
2457 memcpy( data.m_buf, pSegmentData, cbSegmentSize );
2458
2459 // Now check if that completed the message
2460 key.m_nOffset = 0;
2461 auto itMsgStart = m_receiverState.m_mapUnreliableSegments.lower_bound( key );
2462 auto end = m_receiverState.m_mapUnreliableSegments.end();
2463 Assert( itMsgStart != end );
2464 auto itMsgLast = itMsgStart;
2465 int cbMessageSize = 0;
2466 for (;;)
2467 {
2468 // Is this the thing we expected?
2469 if ( itMsgLast->first.m_nMsgNum != nMsgNum || itMsgLast->first.m_nOffset > cbMessageSize )
2470 return; // We've got a gap.
2471
2472 // Update. This code looks more complicated than strictly necessary, but it works
2473 // if we have overlapping segments.
2474 cbMessageSize = std::max( cbMessageSize, itMsgLast->first.m_nOffset + itMsgLast->second.m_cbSegSize );
2475
2476 // Is that the end?
2477 if ( itMsgLast->second.m_bLast )
2478 break;
2479
2480 // Still looking for the end
2481 ++itMsgLast;
2482 if ( itMsgLast == end )
2483 return;
2484 }
2485
2486 CSteamNetworkingMessage *pMsg = CSteamNetworkingMessage::New( this, cbMessageSize, nMsgNum, k_nSteamNetworkingSend_Unreliable, usecNow );
2487 if ( !pMsg )
2488 return;
2489
2490 // OK, we have the complete message! Gather the
2491 // segments into a contiguous buffer
2492 for (;;)
2493 {
2494 Assert( itMsgStart->first.m_nMsgNum == nMsgNum );
2495 memcpy( (char *)pMsg->m_pData + itMsgStart->first.m_nOffset, itMsgStart->second.m_buf, itMsgStart->second.m_cbSegSize );
2496
2497 // Done?
2498 if ( itMsgStart->second.m_bLast )
2499 break;
2500
2501 // Remove entry from list, and move onto the next entry
2502 itMsgStart = m_receiverState.m_mapUnreliableSegments.erase( itMsgStart );
2503 }
2504
2505 // Erase the last segment, and anything else we might have hanging around
2506 // for this message (???)
2507 do {
2508 itMsgStart = m_receiverState.m_mapUnreliableSegments.erase( itMsgStart );
2509 } while ( itMsgStart != end && itMsgStart->first.m_nMsgNum == nMsgNum );
2510
2511 // Deliver the message.
2512 ReceivedMessage( pMsg );
2513 }
2514
SNP_ReceiveReliableSegment(int64 nPktNum,int64 nSegBegin,const uint8 * pSegmentData,int cbSegmentSize,SteamNetworkingMicroseconds usecNow)2515 bool CSteamNetworkConnectionBase::SNP_ReceiveReliableSegment( int64 nPktNum, int64 nSegBegin, const uint8 *pSegmentData, int cbSegmentSize, SteamNetworkingMicroseconds usecNow )
2516 {
2517 int nLogLevelPacketDecode = m_connectionConfig.m_LogLevel_PacketDecode.Get();
2518
2519 // Calculate segment end stream position
2520 int64 nSegEnd = nSegBegin + cbSegmentSize;
2521
2522 // Spew
2523 SpewVerboseGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld reliable range [%lld,%lld)\n",
2524 GetDescription(),
2525 (long long)nPktNum,
2526 (long long)nSegBegin, (long long)nSegEnd );
2527
2528 // No segment data? Seems fishy, but if it happens, just skip it.
2529 Assert( cbSegmentSize >= 0 );
2530 if ( cbSegmentSize <= 0 )
2531 {
2532 // Spew but rate limit in case of malicious sender
2533 SpewWarningRateLimited( usecNow, "[%s] decode pkt %lld empty reliable segment?\n",
2534 GetDescription(),
2535 (long long)nPktNum );
2536 return true;
2537 }
2538
2539 // Ignore data segments when we are not going to process them (e.g. linger)
2540 switch ( GetState() )
2541 {
2542 case k_ESteamNetworkingConnectionState_Connected:
2543 case k_ESteamNetworkingConnectionState_FindingRoute: // Go ahead and process it here. The higher level code should change the state soon enough.
2544 break;
2545
2546 case k_ESteamNetworkingConnectionState_Linger:
2547 case k_ESteamNetworkingConnectionState_FinWait:
2548 // Discard data, but continue processing packet
2549 SpewVerboseGroup( nLogLevelPacketDecode, "[%s] discarding pkt %lld [%lld,%lld) as connection is in state %d\n",
2550 GetDescription(),
2551 (long long)nPktNum,
2552 (long long)nSegBegin, (long long)nSegEnd,
2553 (int)GetState() );
2554 return true;
2555
2556 default:
2557 // Higher level code should probably not call this in these states
2558 AssertMsg( false, "Unexpected state %d", GetState() );
2559 return false;
2560 }
2561
2562 // Check if the entire thing is stuff we have already received, then
2563 // we can discard it
2564 if ( nSegEnd <= m_receiverState.m_nReliableStreamPos )
2565 return true;
2566
2567 // !SPEED! Should we have a fast path here for small messages
2568 // where we have nothing buffered, and avoid all the copying into the
2569 // stream buffer and decode directly.
2570
2571 // What do we expect to receive next?
2572 const int64 nExpectNextStreamPos = m_receiverState.m_nReliableStreamPos + len( m_receiverState.m_bufReliableStream );
2573
2574 // Check if we need to grow the reliable buffer to hold the data
2575 if ( nSegEnd > nExpectNextStreamPos )
2576 {
2577 int64 cbNewSize = nSegEnd - m_receiverState.m_nReliableStreamPos;
2578 Assert( cbNewSize > len( m_receiverState.m_bufReliableStream ) );
2579
2580 // Check if we have too much data buffered, just stop processing
2581 // this packet, and forget we ever received it. We need to protect
2582 // against a malicious sender trying to create big gaps. If they
2583 // are legit, they will notice that we go back and fill in the gaps
2584 // and we will get caught up.
2585 if ( cbNewSize > k_cbMaxBufferedReceiveReliableData )
2586 {
2587 // Stop processing the packet, and don't ack it.
2588 // This indicates the connection is in pretty bad shape,
2589 // so spew about it. But rate limit in case of malicious sender
2590 SpewWarningRateLimited( usecNow, "[%s] decode pkt %lld abort. %lld bytes reliable data buffered [%lld-%lld), new size would be %lld to %lld\n",
2591 GetDescription(),
2592 (long long)nPktNum,
2593 (long long)m_receiverState.m_bufReliableStream.size(),
2594 (long long)m_receiverState.m_nReliableStreamPos,
2595 (long long)( m_receiverState.m_nReliableStreamPos + m_receiverState.m_bufReliableStream.size() ),
2596 (long long)cbNewSize, (long long)nSegEnd
2597 );
2598 return false; // DO NOT ACK THIS PACKET
2599 }
2600
2601 // Check if this is going to make a new gap
2602 if ( nSegBegin > nExpectNextStreamPos )
2603 {
2604 if ( !m_receiverState.m_mapReliableStreamGaps.empty() )
2605 {
2606
2607 // We should never have a gap at the very end of the buffer.
2608 // (Why would we extend the buffer, unless we needed to to
2609 // store some data?)
2610 Assert( m_receiverState.m_mapReliableStreamGaps.rbegin()->second < nExpectNextStreamPos );
2611
2612 // We need to add a new gap. See if we're already too fragmented.
2613 if ( len( m_receiverState.m_mapReliableStreamGaps ) >= k_nMaxReliableStreamGaps_Extend )
2614 {
2615 // Stop processing the packet, and don't ack it
2616 // This indicates the connection is in pretty bad shape,
2617 // so spew about it. But rate limit in case of malicious sender
2618 SpewWarningRateLimited( usecNow, "[%s] decode pkt %lld abort. Reliable stream already has %d fragments, first is [%lld,%lld), last is [%lld,%lld), new segment is [%lld,%lld)\n",
2619 GetDescription(),
2620 (long long)nPktNum,
2621 len( m_receiverState.m_mapReliableStreamGaps ),
2622 (long long)m_receiverState.m_mapReliableStreamGaps.begin()->first, (long long)m_receiverState.m_mapReliableStreamGaps.begin()->second,
2623 (long long)m_receiverState.m_mapReliableStreamGaps.rbegin()->first, (long long)m_receiverState.m_mapReliableStreamGaps.rbegin()->second,
2624 (long long)nSegBegin, (long long)nSegEnd
2625 );
2626 return false; // DO NOT ACK THIS PACKET
2627 }
2628 }
2629
2630 // Add a gap
2631 m_receiverState.m_mapReliableStreamGaps[ nExpectNextStreamPos ] = nSegBegin;
2632 }
2633 m_receiverState.m_bufReliableStream.resize( size_t( cbNewSize ) );
2634 }
2635
2636 // If segment overlapped the existing buffer, we might need to discard the front
2637 // bit or discard a gap that was filled
2638 if ( nSegBegin < nExpectNextStreamPos )
2639 {
2640
2641 // Check if the front bit has already been processed, then skip it
2642 if ( nSegBegin < m_receiverState.m_nReliableStreamPos )
2643 {
2644 int nSkip = m_receiverState.m_nReliableStreamPos - nSegBegin;
2645 cbSegmentSize -= nSkip;
2646 pSegmentData += nSkip;
2647 nSegBegin += nSkip;
2648 }
2649 Assert( nSegBegin < nSegEnd );
2650
2651 // Check if this filled in one or more gaps (or made a hole in the middle!)
2652 if ( !m_receiverState.m_mapReliableStreamGaps.empty() )
2653 {
2654 auto gapFilled = m_receiverState.m_mapReliableStreamGaps.upper_bound( nSegBegin );
2655 if ( gapFilled != m_receiverState.m_mapReliableStreamGaps.begin() )
2656 {
2657 --gapFilled;
2658 Assert( gapFilled->first < gapFilled->second ); // Make sure we don't have degenerate/invalid gaps in our table
2659 Assert( gapFilled->first <= nSegBegin ); // Make sure we located the gap we think we located
2660 if ( gapFilled->second > nSegBegin ) // gap is not entirely before this segment
2661 {
2662 do {
2663
2664 // Common case where we fill exactly at the start
2665 if ( nSegBegin == gapFilled->first )
2666 {
2667 if ( nSegEnd < gapFilled->second )
2668 {
2669 // We filled the first bit of the gap. Chop off the front bit that we filled.
2670 // We cast away const here because we know that we aren't violating the ordering constraints
2671 const_cast<int64&>( gapFilled->first ) = nSegEnd;
2672 break;
2673 }
2674
2675 // Filled the whole gap.
2676 // Erase, and move forward in case this also fills more gaps
2677 // !SPEED! Since exactly filing the gap should be common, we might
2678 // check specifically for that case and early out here.
2679 gapFilled = m_receiverState.m_mapReliableStreamGaps.erase( gapFilled );
2680 }
2681 else if ( nSegEnd >= gapFilled->second )
2682 {
2683 // Chop off the end of the gap
2684 Assert( nSegBegin < gapFilled->second );
2685 gapFilled->second = nSegBegin;
2686
2687 // And maybe subsequent gaps!
2688 ++gapFilled;
2689 }
2690 else
2691 {
2692 // We are fragmenting.
2693 Assert( nSegBegin > gapFilled->first );
2694 Assert( nSegEnd < gapFilled->second );
2695
2696 // Protect against malicious sender. A good sender will
2697 // fill the gaps in stream position order and not fragment
2698 // like this
2699 if ( len( m_receiverState.m_mapReliableStreamGaps ) >= k_nMaxReliableStreamGaps_Fragment )
2700 {
2701 // Stop processing the packet, and don't ack it
2702 SpewWarningRateLimited( usecNow, "[%s] decode pkt %lld abort. Reliable stream already has %d fragments, first is [%lld,%lld), last is [%lld,%lld). We don't want to fragment [%lld,%lld) with new segment [%lld,%lld)\n",
2703 GetDescription(),
2704 (long long)nPktNum,
2705 len( m_receiverState.m_mapReliableStreamGaps ),
2706 (long long)m_receiverState.m_mapReliableStreamGaps.begin()->first, (long long)m_receiverState.m_mapReliableStreamGaps.begin()->second,
2707 (long long)m_receiverState.m_mapReliableStreamGaps.rbegin()->first, (long long)m_receiverState.m_mapReliableStreamGaps.rbegin()->second,
2708 (long long)gapFilled->first, (long long)gapFilled->second,
2709 (long long)nSegBegin, (long long)nSegEnd
2710 );
2711 return false; // DO NOT ACK THIS PACKET
2712 }
2713
2714 // Save bounds of the right side
2715 int64 nRightHandBegin = nSegEnd;
2716 int64 nRightHandEnd = gapFilled->second;
2717
2718 // Truncate the left side
2719 gapFilled->second = nSegBegin;
2720
2721 // Add the right hand gap
2722 m_receiverState.m_mapReliableStreamGaps[ nRightHandBegin ] = nRightHandEnd;
2723
2724 // And we know that we cannot possible have covered any more gaps
2725 break;
2726 }
2727
2728 // In some rare cases we might fill more than one gap with a single segment.
2729 // So keep searching forward.
2730 } while ( gapFilled != m_receiverState.m_mapReliableStreamGaps.end() && gapFilled->first < nSegEnd );
2731 }
2732 }
2733 }
2734 }
2735
2736 // Copy the data into the buffer.
2737 // It might be redundant, but if so, we aren't going to take the
2738 // time to figure that out.
2739 int nBufOffset = nSegBegin - m_receiverState.m_nReliableStreamPos;
2740 Assert( nBufOffset >= 0 );
2741 Assert( nBufOffset+cbSegmentSize <= len( m_receiverState.m_bufReliableStream ) );
2742 memcpy( &m_receiverState.m_bufReliableStream[nBufOffset], pSegmentData, cbSegmentSize );
2743
2744 // Figure out how many valid bytes are at the head of the buffer
2745 int nNumReliableBytes;
2746 if ( m_receiverState.m_mapReliableStreamGaps.empty() )
2747 {
2748 nNumReliableBytes = len( m_receiverState.m_bufReliableStream );
2749 }
2750 else
2751 {
2752 auto firstGap = m_receiverState.m_mapReliableStreamGaps.begin();
2753 Assert( firstGap->first >= m_receiverState.m_nReliableStreamPos );
2754 if ( firstGap->first < nSegBegin )
2755 {
2756 // There's gap in front of us, and therefore if we didn't have
2757 // a complete reliable message before, we don't have one now.
2758 Assert( firstGap->second <= nSegBegin );
2759 return true;
2760 }
2761
2762 // We do have a gap, but it's somewhere after this segment.
2763 Assert( firstGap->first >= nSegEnd );
2764 nNumReliableBytes = firstGap->first - m_receiverState.m_nReliableStreamPos;
2765 Assert( nNumReliableBytes > 0 );
2766 Assert( nNumReliableBytes < len( m_receiverState.m_bufReliableStream ) ); // The last byte in the buffer should always be valid!
2767 }
2768 Assert( nNumReliableBytes > 0 );
2769
2770 // OK, now dispatch as many reliable messages as are now available
2771 do
2772 {
2773
2774 // OK, if we get here, we have some data. Attempt to decode a reliable message.
2775 // NOTE: If the message is really big, we will end up doing this parsing work
2776 // each time we get a new packet. We could cache off the result if we find out
2777 // that it's worth while. It should be pretty fast, though, so let's keep the
2778 // code simple until we know that it's worthwhile.
2779 uint8 *pReliableStart = &m_receiverState.m_bufReliableStream[0];
2780 uint8 *pReliableDecode = pReliableStart;
2781 uint8 *pReliableEnd = pReliableDecode + nNumReliableBytes;
2782
2783 // Spew
2784 SpewDebugGroup( nLogLevelPacketDecode, "[%s] decode pkt %lld valid reliable bytes = %d [%lld,%lld)\n",
2785 GetDescription(),
2786 (long long)nPktNum, nNumReliableBytes,
2787 (long long)m_receiverState.m_nReliableStreamPos,
2788 (long long)( m_receiverState.m_nReliableStreamPos + nNumReliableBytes ) );
2789
2790 // Sanity check that we have a valid header byte.
2791 uint8 nHeaderByte = *(pReliableDecode++);
2792 if ( nHeaderByte & 0x80 )
2793 {
2794 ConnectionState_ProblemDetectedLocally( k_ESteamNetConnectionEnd_Misc_InternalError, "Invalid reliable message header byte 0x%02x", nHeaderByte );
2795 return false;
2796 }
2797
2798 // Parse the message number
2799 int64 nMsgNum = m_receiverState.m_nLastRecvReliableMsgNum;
2800 if ( nHeaderByte & 0x40 )
2801 {
2802 uint64 nOffset;
2803 pReliableDecode = DeserializeVarInt( pReliableDecode, pReliableEnd, nOffset );
2804 if ( pReliableDecode == nullptr )
2805 {
2806 // We haven't received all of the message
2807 return true; // Packet OK and can be acked.
2808 }
2809
2810 nMsgNum += nOffset;
2811
2812 // Sanity check against a HUGE jump in the message number.
2813 // This is almost certainly bogus. (OKOK, yes it is theoretically
2814 // possible. But for now while this thing is still under development,
2815 // most likely it's a bug. Eventually we can lessen these to handle
2816 // the case where the app decides to send literally a million unreliable
2817 // messages in between reliable messages. The second condition is probably
2818 // legit, though.)
2819 if ( nOffset > 1000000 || nMsgNum > m_receiverState.m_nHighestSeenMsgNum+10000 )
2820 {
2821 ConnectionState_ProblemDetectedLocally( k_ESteamNetConnectionEnd_Misc_InternalError,
2822 "Reliable message number lurch. Last reliable %lld, offset %llu, highest seen %lld",
2823 (long long)m_receiverState.m_nLastRecvReliableMsgNum, (unsigned long long)nOffset,
2824 (long long)m_receiverState.m_nHighestSeenMsgNum );
2825 return false;
2826 }
2827 }
2828 else
2829 {
2830 ++nMsgNum;
2831 }
2832
2833 // Check for updating highest message number seen, so we know how to interpret
2834 // message numbers from the sender with only the lowest N bits present.
2835 // And yes, we want to do this even if we end up not processing the entire message
2836 if ( nMsgNum > m_receiverState.m_nHighestSeenMsgNum )
2837 m_receiverState.m_nHighestSeenMsgNum = nMsgNum;
2838
2839 // Parse message size.
2840 int cbMsgSize = nHeaderByte&0x1f;
2841 if ( nHeaderByte & 0x20 )
2842 {
2843 uint64 nMsgSizeUpperBits;
2844 pReliableDecode = DeserializeVarInt( pReliableDecode, pReliableEnd, nMsgSizeUpperBits );
2845 if ( pReliableDecode == nullptr )
2846 {
2847 // We haven't received all of the message
2848 return true; // Packet OK and can be acked.
2849 }
2850
2851 // Sanity check size. Note that we do this check before we shift,
2852 // to protect against overflow.
2853 // (Although DeserializeVarInt doesn't detect overflow...)
2854 if ( nMsgSizeUpperBits > (uint64)k_cbMaxMessageSizeRecv<<5 )
2855 {
2856 ConnectionState_ProblemDetectedLocally( k_ESteamNetConnectionEnd_Misc_InternalError,
2857 "Reliable message size too large. (%llu<<5 + %d)",
2858 (unsigned long long)nMsgSizeUpperBits, cbMsgSize );
2859 return false;
2860 }
2861
2862 // Compute total size, and check it again
2863 cbMsgSize += int( nMsgSizeUpperBits<<5 );
2864 if ( cbMsgSize > k_cbMaxMessageSizeRecv )
2865 {
2866 ConnectionState_ProblemDetectedLocally( k_ESteamNetConnectionEnd_Misc_InternalError,
2867 "Reliable message size %d too large.", cbMsgSize );
2868 return false;
2869 }
2870 }
2871
2872 // Do we have the full thing?
2873 if ( pReliableDecode+cbMsgSize > pReliableEnd )
2874 {
2875 // Ouch, we did all that work and still don't have the whole message.
2876 return true; // packet is OK, can be acked, and continue processing it
2877 }
2878
2879 // We have a full message! Queue it
2880 if ( !ReceivedMessage( pReliableDecode, cbMsgSize, nMsgNum, k_nSteamNetworkingSend_Reliable, usecNow ) )
2881 return false; // Weird failure. Most graceful response is to not ack this packet, and maybe we will work next on retry.
2882 pReliableDecode += cbMsgSize;
2883 int cbStreamConsumed = pReliableDecode-pReliableStart;
2884
2885 // Advance bookkeeping
2886 m_receiverState.m_nLastRecvReliableMsgNum = nMsgNum;
2887 m_receiverState.m_nReliableStreamPos += cbStreamConsumed;
2888
2889 // Remove the data from the from the front of the buffer
2890 pop_from_front( m_receiverState.m_bufReliableStream, cbStreamConsumed );
2891
2892 // We might have more in the stream that is ready to dispatch right now.
2893 nNumReliableBytes -= cbStreamConsumed;
2894 } while ( nNumReliableBytes > 0 );
2895
2896 return true; // packet is OK, can be acked, and continue processing it
2897 }
2898
SNP_RecordReceivedPktNum(int64 nPktNum,SteamNetworkingMicroseconds usecNow,bool bScheduleAck)2899 void CSteamNetworkConnectionBase::SNP_RecordReceivedPktNum( int64 nPktNum, SteamNetworkingMicroseconds usecNow, bool bScheduleAck )
2900 {
2901
2902 // Check if sender has already told us they don't need us to
2903 // account for packets this old anymore
2904 if ( unlikely( nPktNum < m_receiverState.m_nMinPktNumToSendAcks ) )
2905 return;
2906
2907 // Fast path for the (hopefully) most common case of packets arriving in order
2908 if ( likely( nPktNum == m_statsEndToEnd.m_nMaxRecvPktNum+1 ) )
2909 {
2910 if ( bScheduleAck ) // fast path for all unreliable data (common when we are just being used for transport)
2911 {
2912 // Schedule ack of this packet (since we are the highest numbered
2913 // packet, that means reporting on everything)
2914 QueueFlushAllAcks( usecNow + k_usecMaxDataAckDelay );
2915 }
2916 return;
2917 }
2918
2919 // At this point, ack invariants should be met
2920 m_receiverState.DebugCheckPackGapMap();
2921
2922 // Latest time that this packet should be acked.
2923 // (We might already be scheduled to send and ack that would include this packet.)
2924 SteamNetworkingMicroseconds usecScheduleAck = bScheduleAck ? usecNow + k_usecMaxDataAckDelay : INT64_MAX;
2925
2926 // Check if this introduced a gap since the last sequence packet we have received
2927 if ( nPktNum > m_statsEndToEnd.m_nMaxRecvPktNum )
2928 {
2929
2930 // Protect against malicious sender!
2931 if ( len( m_receiverState.m_mapPacketGaps ) >= k_nMaxPacketGaps )
2932 return; // Nope, we will *not* actually mark the packet as received
2933
2934 // Add a gap for the skipped packet(s).
2935 int64 nBegin = m_statsEndToEnd.m_nMaxRecvPktNum+1;
2936 std::pair<int64,SSNPPacketGap> x;
2937 x.first = nBegin;
2938 x.second.m_nEnd = nPktNum;
2939 x.second.m_usecWhenReceivedPktBefore = m_statsEndToEnd.m_usecTimeLastRecvSeq;
2940 x.second.m_usecWhenAckPrior = m_receiverState.m_mapPacketGaps.rbegin()->second.m_usecWhenAckPrior;
2941
2942 // When should we nack this?
2943 x.second.m_usecWhenOKToNack = usecNow;
2944 if ( nPktNum < m_statsEndToEnd.m_nMaxRecvPktNum + 3 )
2945 x.second.m_usecWhenOKToNack += k_usecNackFlush;
2946
2947 auto iter = m_receiverState.m_mapPacketGaps.insert( x ).first;
2948
2949 SpewMsgGroup( m_connectionConfig.m_LogLevel_PacketGaps.Get(), "[%s] drop %d pkts [%lld-%lld)",
2950 GetDescription(),
2951 (int)( nPktNum - nBegin ),
2952 (long long)nBegin, (long long)nPktNum );
2953
2954 // Remember that we need to send a NACK
2955 if ( m_receiverState.m_itPendingNack->first == INT64_MAX )
2956 {
2957 m_receiverState.m_itPendingNack = iter;
2958 }
2959 else
2960 {
2961 // Pending nacks should be for older packet, not newer
2962 Assert( m_receiverState.m_itPendingNack->first < nBegin );
2963 }
2964
2965 // Back up if we we had a flush of everything scheduled
2966 if ( m_receiverState.m_itPendingAck->first == INT64_MAX && m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior < INT64_MAX )
2967 {
2968 Assert( iter->second.m_usecWhenAckPrior == m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior );
2969 m_receiverState.m_itPendingAck = iter;
2970 }
2971
2972 // At this point, ack invariants should be met
2973 m_receiverState.DebugCheckPackGapMap();
2974
2975 // Schedule ack of this packet (since we are the highest numbered
2976 // packet, that means reporting on everything) by the requested
2977 // time
2978 QueueFlushAllAcks( usecScheduleAck );
2979 }
2980 else
2981 {
2982
2983 // Check if this filed a gap
2984 auto itGap = m_receiverState.m_mapPacketGaps.upper_bound( nPktNum );
2985 if ( itGap == m_receiverState.m_mapPacketGaps.end() )
2986 {
2987 AssertMsg( false, "[%s] Cannot locate gap, or processing packet %lld multiple times. %s",
2988 GetDescription(), (long long)nPktNum,
2989 m_statsEndToEnd.RecvPktNumStateDebugString().c_str() );
2990 return;
2991 }
2992 if ( itGap == m_receiverState.m_mapPacketGaps.begin() )
2993 {
2994 AssertMsg( false, "[%s] Cannot locate gap, or processing packet %lld multiple times. [%lld,%lld) %s",
2995 GetDescription(), (long long)nPktNum, (long long)itGap->first, (long long)itGap->second.m_nEnd,
2996 m_statsEndToEnd.RecvPktNumStateDebugString().c_str() );
2997 return;
2998 }
2999 --itGap;
3000 if ( itGap->first > nPktNum || itGap->second.m_nEnd <= nPktNum )
3001 {
3002 // We already received this packet. But this should be impossible now,
3003 // we should be rejecting duplicate packet numbers earlier
3004 AssertMsg( false, "[%s] Packet gap bug. %lld [%lld,%lld) %s",
3005 GetDescription(), (long long)nPktNum, (long long)itGap->first, (long long)itGap->second.m_nEnd,
3006 m_statsEndToEnd.RecvPktNumStateDebugString().c_str() );
3007 return;
3008 }
3009
3010 // Packet is in a gap where we previously thought packets were lost.
3011 // (Packets arriving out of order.)
3012
3013 // Last packet in gap?
3014 if ( itGap->second.m_nEnd-1 == nPktNum )
3015 {
3016 // Single-packet gap?
3017 if ( itGap->first == nPktNum )
3018 {
3019 // Were we waiting to ack/nack this? Then move forward to the next gap, if any
3020 usecScheduleAck = std::min( usecScheduleAck, itGap->second.m_usecWhenAckPrior );
3021 if ( m_receiverState.m_itPendingAck == itGap )
3022 ++m_receiverState.m_itPendingAck;
3023 if ( m_receiverState.m_itPendingNack == itGap )
3024 ++m_receiverState.m_itPendingNack;
3025
3026 // Save time when we needed to ack the packets before this gap
3027 SteamNetworkingMicroseconds usecWhenAckPrior = itGap->second.m_usecWhenAckPrior;
3028
3029 // Gap is totally filled. Erase, and move to the next one,
3030 // if any, so we can schedule ack below
3031 itGap = m_receiverState.m_mapPacketGaps.erase( itGap );
3032
3033 // Were we scheduled to ack the packets before this? If so, then
3034 // we still need to do that, only now when we send that ack, we will
3035 // ack the packets after this gap as well, since they will be included
3036 // in the same ack block.
3037 //
3038 // NOTE: This is based on what was scheduled to be acked before we got
3039 // this packet. If we need to update the schedule to ack the current
3040 // packet, we will do that below. However, usually if previous
3041 // packets were already scheduled to be acked, then that deadline time
3042 // will be sooner usecScheduleAck, so the code below will not actually
3043 // do anything.
3044 if ( usecWhenAckPrior < itGap->second.m_usecWhenAckPrior )
3045 {
3046 itGap->second.m_usecWhenAckPrior = usecWhenAckPrior;
3047 }
3048 else
3049 {
3050 // Otherwise, we might not have any acks scheduled. In that
3051 // case, the invariant is that m_itPendingAck should point at the sentinel
3052 if ( m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior == INT64_MAX )
3053 {
3054 m_receiverState.m_itPendingAck = m_receiverState.m_mapPacketGaps.end();
3055 --m_receiverState.m_itPendingAck;
3056 Assert( m_receiverState.m_itPendingAck->first == INT64_MAX );
3057 }
3058 }
3059
3060 SpewVerboseGroup( m_connectionConfig.m_LogLevel_PacketGaps.Get(), "[%s] decode pkt %lld, single pkt gap filled", GetDescription(), (long long)nPktNum );
3061
3062 // At this point, ack invariants should be met
3063 m_receiverState.DebugCheckPackGapMap();
3064 }
3065 else
3066 {
3067 // Shrink gap by one from the end
3068 --itGap->second.m_nEnd;
3069 Assert( itGap->first < itGap->second.m_nEnd );
3070
3071 SpewVerboseGroup( m_connectionConfig.m_LogLevel_PacketGaps.Get(), "[%s] decode pkt %lld, last packet in gap, reduced to [%lld,%lld)", GetDescription(),
3072 (long long)nPktNum, (long long)itGap->first, (long long)itGap->second.m_nEnd );
3073
3074 // Move to the next gap so we can schedule ack below
3075 ++itGap;
3076
3077 // At this point, ack invariants should be met
3078 m_receiverState.DebugCheckPackGapMap();
3079 }
3080 }
3081 else if ( itGap->first == nPktNum )
3082 {
3083 // First packet in multi-packet gap.
3084 // Shrink packet from the front
3085 // Cast away const to allow us to modify the key.
3086 // We know this won't break the map ordering
3087 ++const_cast<int64&>( itGap->first );
3088 Assert( itGap->first < itGap->second.m_nEnd );
3089 itGap->second.m_usecWhenReceivedPktBefore = usecNow;
3090
3091 SpewVerboseGroup( m_connectionConfig.m_LogLevel_PacketGaps.Get(), "[%s] decode pkt %lld, first packet in gap, reduced to [%lld,%lld)", GetDescription(),
3092 (long long)nPktNum, (long long)itGap->first, (long long)itGap->second.m_nEnd );
3093
3094 // At this point, ack invariants should be met
3095 m_receiverState.DebugCheckPackGapMap();
3096 }
3097 else
3098 {
3099 // Packet is in the middle of the gap. We'll need to fragment this gap
3100 // Protect against malicious sender!
3101 if ( len( m_receiverState.m_mapPacketGaps ) >= k_nMaxPacketGaps )
3102 return; // Nope, we will *not* actually mark the packet as received
3103
3104 // Locate the next block so we can set the schedule time
3105 auto itNext = itGap;
3106 ++itNext;
3107
3108 // Start making a new gap to account for the upper end
3109 std::pair<int64,SSNPPacketGap> upper;
3110 upper.first = nPktNum+1;
3111 upper.second.m_nEnd = itGap->second.m_nEnd;
3112 upper.second.m_usecWhenReceivedPktBefore = usecNow;
3113 if ( itNext == m_receiverState.m_itPendingAck )
3114 upper.second.m_usecWhenAckPrior = INT64_MAX;
3115 else
3116 upper.second.m_usecWhenAckPrior = itNext->second.m_usecWhenAckPrior;
3117 upper.second.m_usecWhenOKToNack = itGap->second.m_usecWhenOKToNack;
3118
3119 // Truncate the current gap
3120 itGap->second.m_nEnd = nPktNum;
3121 Assert( itGap->first < itGap->second.m_nEnd );
3122
3123 SpewVerboseGroup( m_connectionConfig.m_LogLevel_PacketGaps.Get(), "[%s] decode pkt %lld, gap split [%lld,%lld) and [%lld,%lld)", GetDescription(),
3124 (long long)nPktNum, (long long)itGap->first, (long long)itGap->second.m_nEnd, upper.first, upper.second.m_nEnd );
3125
3126 // Insert a new gap to account for the upper end, and
3127 // advance iterator to it, so that we can schedule ack below
3128 itGap = m_receiverState.m_mapPacketGaps.insert( upper ).first;
3129
3130 // At this point, ack invariants should be met
3131 m_receiverState.DebugCheckPackGapMap();
3132 }
3133
3134 Assert( itGap != m_receiverState.m_mapPacketGaps.end() );
3135
3136 // Need to schedule ack (earlier than it is already scheduled)?
3137 if ( usecScheduleAck < itGap->second.m_usecWhenAckPrior )
3138 {
3139
3140 // Earlier than the current thing being scheduled?
3141 if ( usecScheduleAck <= m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior )
3142 {
3143
3144 // We're next, set the time
3145 itGap->second.m_usecWhenAckPrior = usecScheduleAck;
3146
3147 // Any schedules for lower-numbered packets are superseded
3148 // by this one.
3149 if ( m_receiverState.m_itPendingAck->first <= itGap->first )
3150 {
3151 while ( m_receiverState.m_itPendingAck != itGap )
3152 {
3153 m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior = INT64_MAX;
3154 ++m_receiverState.m_itPendingAck;
3155 }
3156 }
3157 else
3158 {
3159 // If our number is lower than the thing that was scheduled next,
3160 // then back up and re-schedule any blocks in between to be effectively
3161 // the same time as they would have been flushed before.
3162 SteamNetworkingMicroseconds usecOldSched = m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior;
3163 while ( --m_receiverState.m_itPendingAck != itGap )
3164 {
3165 m_receiverState.m_itPendingAck->second.m_usecWhenAckPrior = usecOldSched;
3166 }
3167 }
3168 }
3169 else
3170 {
3171 // We're not the next thing that needs to be acked.
3172
3173 if ( itGap->first < m_receiverState.m_itPendingAck->first )
3174 {
3175 // We're a lowered numbered packet, so this request is subsumed by the
3176 // request to flush more packets at an earlier time,
3177 // and we don't need to do anything.
3178
3179 }
3180 else
3181 {
3182
3183 // We need to ack a bit earlier
3184 itGap->second.m_usecWhenAckPrior = usecScheduleAck;
3185
3186 // Now the only way for our invariants to be violated is for lower
3187 // numbered blocks to have later scheduled times.
3188 Assert( itGap != m_receiverState.m_mapPacketGaps.begin() );
3189 while ( (--itGap)->second.m_usecWhenAckPrior > usecScheduleAck )
3190 {
3191 Assert( itGap != m_receiverState.m_mapPacketGaps.begin() );
3192 itGap->second.m_usecWhenAckPrior = usecScheduleAck;
3193 }
3194 }
3195 }
3196
3197 // Make sure we didn't screw things up
3198 m_receiverState.DebugCheckPackGapMap();
3199 }
3200
3201 // Make sure are scheduled to wake up
3202 if ( bScheduleAck )
3203 EnsureMinThinkTime( m_receiverState.TimeWhenFlushAcks() );
3204 }
3205 }
3206
SNP_ClampSendRate()3207 int CSteamNetworkConnectionBase::SNP_ClampSendRate()
3208 {
3209 // Get effective clamp limits. We clamp the limits themselves to be safe
3210 // and make sure they are sane
3211 int nMin = Clamp( m_connectionConfig.m_SendRateMin.Get(), 1024, 100*1024*1024 );
3212 int nMax = Clamp( m_connectionConfig.m_SendRateMax.Get(), nMin, 100*1024*1024 );
3213
3214 // Check if application has disabled bandwidth estimation
3215 if ( nMin == nMax )
3216 {
3217 m_sendRateData.m_nCurrentSendRateEstimate = nMin;
3218 m_sendRateData.m_flCurrentSendRateUsed = m_sendRateData.m_nCurrentSendRateEstimate;
3219 // FIXME - Note that in this case we are effectively application limited. We'll want to note this in the future
3220 }
3221 else
3222 {
3223
3224 // Clamp it, adjusting the value if it's out of range
3225 if ( m_sendRateData.m_nCurrentSendRateEstimate >= nMax )
3226 {
3227 m_sendRateData.m_nCurrentSendRateEstimate = nMax;
3228 // FIXME - Note that in this case we are effectively application limited. We'll want to note this in the future
3229 }
3230 else if ( m_sendRateData.m_nCurrentSendRateEstimate < nMin )
3231 {
3232 m_sendRateData.m_nCurrentSendRateEstimate = nMin;
3233 }
3234
3235 // FIXME - In the future we might implement BBR probe cycle
3236 m_sendRateData.m_flCurrentSendRateUsed = m_sendRateData.m_nCurrentSendRateEstimate;
3237 }
3238
3239 // Return value
3240 return m_sendRateData.m_nCurrentSendRateEstimate;
3241 }
3242
3243 // Returns next think time
SNP_ThinkSendState(SteamNetworkingMicroseconds usecNow)3244 SteamNetworkingMicroseconds CSteamNetworkConnectionBase::SNP_ThinkSendState( SteamNetworkingMicroseconds usecNow )
3245 {
3246 // Accumulate tokens based on how long it's been since last time
3247 SNP_ClampSendRate();
3248 SNP_TokenBucket_Accumulate( usecNow );
3249
3250 // Calculate next time we want to take action. If it isn't right now, then we're either idle or throttled.
3251 // Importantly, this will also check for retry timeout
3252 SteamNetworkingMicroseconds usecNextThink = SNP_GetNextThinkTime( usecNow );
3253 if ( usecNextThink > usecNow )
3254 return usecNextThink;
3255
3256 // Keep sending packets until we run out of tokens
3257 int nPacketsSent = 0;
3258 while ( m_pTransport )
3259 {
3260
3261 if ( nPacketsSent > k_nMaxPacketsPerThink )
3262 {
3263 // We're sending too much at one time. Nuke token bucket so that
3264 // we'll be ready to send again very soon, but not immediately.
3265 // We don't want the outer code to complain that we are requesting
3266 // a wakeup call in the past
3267 m_sendRateData.m_flTokenBucket = m_sendRateData.m_flCurrentSendRateUsed * -0.0005f;
3268 return usecNow + 1000;
3269 }
3270
3271 // Check if we have anything to send.
3272 if ( usecNow < m_receiverState.TimeWhenFlushAcks() && usecNow < SNP_TimeWhenWantToSendNextPacket() )
3273 {
3274
3275 // We've sent everything we want to send. Limit our reserve to a
3276 // small burst overage, in case we had built up an excess reserve
3277 // before due to the scheduler waking us up late.
3278 if ( m_sendRateData.m_flTokenBucket > k_flSendRateBurstOverageAllowance )
3279 m_sendRateData.m_flTokenBucket = k_flSendRateBurstOverageAllowance;
3280 break;
3281 }
3282
3283 // Send the next data packet.
3284 if ( !m_pTransport->SendDataPacket( usecNow ) )
3285 {
3286 // Problem sending packet. Nuke token bucket, but request
3287 // a wakeup relatively quick to check on our state again
3288 m_sendRateData.m_flTokenBucket = m_sendRateData.m_flCurrentSendRateUsed * -0.001f;
3289 return usecNow + 2000;
3290 }
3291
3292 // We spent some tokens, do we have any left?
3293 if ( m_sendRateData.m_flTokenBucket < 0.0f )
3294 break;
3295
3296 // Limit number of packets sent at a time, even if the scheduler is really bad
3297 // or somebody holds the lock for along time, or we wake up late for whatever reason
3298 ++nPacketsSent;
3299 }
3300
3301 // Return time when we need to check in again.
3302 SteamNetworkingMicroseconds usecNextAction = SNP_GetNextThinkTime( usecNow );
3303 Assert( usecNextAction > usecNow );
3304 return usecNextAction;
3305 }
3306
SNP_TokenBucket_Accumulate(SteamNetworkingMicroseconds usecNow)3307 void CSteamNetworkConnectionBase::SNP_TokenBucket_Accumulate( SteamNetworkingMicroseconds usecNow )
3308 {
3309 // If we're not connected, just keep our bucket full
3310 if ( !BStateIsConnectedForWirePurposes() )
3311 {
3312 m_sendRateData.m_flTokenBucket = k_flSendRateBurstOverageAllowance;
3313 m_sendRateData.m_usecTokenBucketTime = usecNow;
3314 return;
3315 }
3316
3317 float flElapsed = ( usecNow - m_sendRateData.m_usecTokenBucketTime ) * 1e-6;
3318 m_sendRateData.m_flTokenBucket += m_sendRateData.m_flCurrentSendRateUsed * flElapsed;
3319 m_sendRateData.m_usecTokenBucketTime = usecNow;
3320 // If we don't currently have any packets ready to send right now,
3321 // then go ahead and limit the tokens. If we do have packets ready
3322 // to send right now, then we must assume that we would be trying to
3323 // wakeup as soon as we are ready to send the next packet, and thus
3324 // any excess tokens we accumulate are because the scheduler woke
3325 // us up late, and we are not actually bursting
3326 if ( m_sendRateData.m_flTokenBucket > k_flSendRateBurstOverageAllowance && SNP_TimeWhenWantToSendNextPacket() > usecNow )
3327 {
3328 m_sendRateData.m_flTokenBucket = k_flSendRateBurstOverageAllowance;
3329 }
3330 }
3331
QueueFlushAllAcks(SteamNetworkingMicroseconds usecWhen)3332 void SSNPReceiverState::QueueFlushAllAcks( SteamNetworkingMicroseconds usecWhen )
3333 {
3334 DebugCheckPackGapMap();
3335
3336 Assert( usecWhen > 0 ); // zero is reserved and should never be used as a requested wake time
3337
3338 // if we're already scheduled for earlier, then there cannot be any work to do
3339 auto it = m_mapPacketGaps.end();
3340 --it;
3341 if ( it->second.m_usecWhenAckPrior <= usecWhen )
3342 return;
3343 it->second.m_usecWhenAckPrior = usecWhen;
3344
3345 // Nothing partial scheduled?
3346 if ( m_itPendingAck == it )
3347 return;
3348
3349 if ( m_itPendingAck->second.m_usecWhenAckPrior >= usecWhen )
3350 {
3351 do
3352 {
3353 m_itPendingAck->second.m_usecWhenAckPrior = INT64_MAX;
3354 ++m_itPendingAck;
3355 } while ( m_itPendingAck != it );
3356 DebugCheckPackGapMap();
3357 }
3358 else
3359 {
3360 // Maintain invariant
3361 while ( (--it)->second.m_usecWhenAckPrior >= usecWhen )
3362 it->second.m_usecWhenAckPrior = usecWhen;
3363 DebugCheckPackGapMap();
3364 }
3365 }
3366
3367 #if STEAMNETWORKINGSOCKETS_SNP_PARANOIA > 1
DebugCheckPackGapMap() const3368 void SSNPReceiverState::DebugCheckPackGapMap() const
3369 {
3370 int64 nPrevEnd = 0;
3371 SteamNetworkingMicroseconds usecPrevAck = 0;
3372 bool bFoundPendingAck = false;
3373 for ( auto it: m_mapPacketGaps )
3374 {
3375 Assert( it.first > nPrevEnd );
3376 if ( it.first == m_itPendingAck->first )
3377 {
3378 Assert( !bFoundPendingAck );
3379 bFoundPendingAck = true;
3380 if ( it.first < INT64_MAX )
3381 Assert( it.second.m_usecWhenAckPrior < INT64_MAX );
3382 }
3383 else if ( !bFoundPendingAck )
3384 {
3385 Assert( it.second.m_usecWhenAckPrior == INT64_MAX );
3386 }
3387 else
3388 {
3389 Assert( it.second.m_usecWhenAckPrior >= usecPrevAck );
3390 }
3391 usecPrevAck = it.second.m_usecWhenAckPrior;
3392 if ( it.first == INT64_MAX )
3393 {
3394 Assert( it.second.m_nEnd == INT64_MAX );
3395 }
3396 else
3397 {
3398 Assert( it.first < it.second.m_nEnd );
3399 }
3400 nPrevEnd = it.second.m_nEnd;
3401 }
3402 Assert( nPrevEnd == INT64_MAX );
3403 }
3404 #endif
3405
SNP_TimeWhenWantToSendNextPacket() const3406 SteamNetworkingMicroseconds CSteamNetworkConnectionBase::SNP_TimeWhenWantToSendNextPacket() const
3407 {
3408 // Connection must be locked, but we don't require the global lock here!
3409 m_pLock->AssertHeldByCurrentThread();
3410
3411 // We really shouldn't be trying to do this when not connected
3412 if ( !BStateIsConnectedForWirePurposes() )
3413 {
3414 AssertMsg( false, "We shouldn't be asking about sending packets when not fully connected" );
3415 return k_nThinkTime_Never;
3416 }
3417
3418 // Reliable triggered? Then send it right now
3419 if ( !m_senderState.m_listReadyRetryReliableRange.empty() )
3420 return 0;
3421
3422 // Anything queued?
3423 SteamNetworkingMicroseconds usecNextSend;
3424 if ( m_senderState.m_messagesQueued.empty() )
3425 {
3426
3427 // Queue is empty, nothing to send except perhaps nacks (below)
3428 Assert( m_senderState.PendingBytesTotal() == 0 );
3429 usecNextSend = k_nThinkTime_Never;
3430 }
3431 else
3432 {
3433
3434 // FIXME acks, stop_waiting?
3435
3436 // Have we got at least a full packet ready to go?
3437 if ( m_senderState.PendingBytesTotal() >= m_cbMaxPlaintextPayloadSend )
3438 // Send it ASAP
3439 return 0;
3440
3441 // We have less than a full packet's worth of data. Wait until
3442 // the Nagle time, if we have one
3443 usecNextSend = m_senderState.m_messagesQueued.m_pFirst->SNPSend_UsecNagle();
3444 }
3445
3446 // Check if the receiver wants to send a NACK.
3447 usecNextSend = std::min( usecNextSend, m_receiverState.m_itPendingNack->second.m_usecWhenOKToNack );
3448
3449 // Return the earlier of the two
3450 return usecNextSend;
3451 }
3452
SNP_GetNextThinkTime(SteamNetworkingMicroseconds usecNow)3453 SteamNetworkingMicroseconds CSteamNetworkConnectionBase::SNP_GetNextThinkTime( SteamNetworkingMicroseconds usecNow )
3454 {
3455 // Connection must be locked, but we don't require the global lock here!
3456 m_pLock->AssertHeldByCurrentThread();
3457
3458 // We really shouldn't be trying to do this when not connected
3459 if ( !BStateIsConnectedForWirePurposes() )
3460 {
3461 AssertMsg( false, "We shouldn't be trying to think SNP when not fully connected" );
3462 return k_nThinkTime_Never;
3463 }
3464
3465 // We cannot send any packets if we don't have transport
3466 if ( !m_pTransport )
3467 return k_nThinkTime_Never;
3468
3469 // Start with the time when the receiver needs to flush out ack.
3470 SteamNetworkingMicroseconds usecNextThink = m_receiverState.TimeWhenFlushAcks();
3471
3472 // Check retransmit timers. If they have expired, this will move reliable
3473 // segments into the "ready to retry" list, which will cause
3474 // TimeWhenWantToSendNextPacket to think we want to send data. If nothing has timed out,
3475 // it will return the time when we need to check back in. Or, if everything is idle it will
3476 // return "never" (very large number).
3477 SteamNetworkingMicroseconds usecNextRetry = SNP_SenderCheckInFlightPackets( usecNow );
3478
3479 // If we want to send packets, then we might need to wake up and take action
3480 SteamNetworkingMicroseconds usecTimeWantToSend = SNP_TimeWhenWantToSendNextPacket();
3481 usecTimeWantToSend = std::min( usecNextRetry, usecTimeWantToSend );
3482 if ( usecTimeWantToSend < usecNextThink )
3483 {
3484
3485 // Time when we *could* send the next packet, ignoring Nagle
3486 SteamNetworkingMicroseconds usecNextSend = usecNow;
3487 SteamNetworkingMicroseconds usecQueueTime = m_sendRateData.CalcTimeUntilNextSend();
3488 if ( usecQueueTime > 0 )
3489 {
3490 usecNextSend += usecQueueTime;
3491
3492 // Add a small amount of fudge here, so that we don't wake up too early and think
3493 // we're not ready yet, causing us to spin our wheels. Our token bucket system
3494 // should keep us sending at the correct overall rate. Remember that the
3495 // underlying kernel timer/wake resolution might be 1 or 2ms, (E.g. Windows.)
3496 usecNextSend += 25;
3497 }
3498
3499 // Time when we will next send is the greater of when we want to and when we can
3500 usecNextSend = std::max( usecNextSend, usecTimeWantToSend );
3501
3502 // Earlier than any other reason to wake up?
3503 usecNextThink = std::min( usecNextThink, usecNextSend );
3504 }
3505
3506 return usecNextThink;
3507 }
3508
SNP_PopulateDetailedStats(SteamDatagramLinkStats & info)3509 void CSteamNetworkConnectionBase::SNP_PopulateDetailedStats( SteamDatagramLinkStats &info )
3510 {
3511 info.m_latest.m_nSendRate = SNP_ClampSendRate();
3512 info.m_latest.m_nPendingBytes = m_senderState.m_cbPendingUnreliable + m_senderState.m_cbPendingReliable;
3513 info.m_lifetime.m_nMessagesSentReliable = m_senderState.m_nMessagesSentReliable;
3514 info.m_lifetime.m_nMessagesSentUnreliable = m_senderState.m_nMessagesSentUnreliable;
3515 info.m_lifetime.m_nMessagesRecvReliable = m_receiverState.m_nMessagesRecvReliable;
3516 info.m_lifetime.m_nMessagesRecvUnreliable = m_receiverState.m_nMessagesRecvUnreliable;
3517 }
3518
SNP_PopulateQuickStats(SteamNetworkingQuickConnectionStatus & info,SteamNetworkingMicroseconds usecNow)3519 void CSteamNetworkConnectionBase::SNP_PopulateQuickStats( SteamNetworkingQuickConnectionStatus &info, SteamNetworkingMicroseconds usecNow )
3520 {
3521 info.m_nSendRateBytesPerSecond = SNP_ClampSendRate();
3522 info.m_cbPendingUnreliable = m_senderState.m_cbPendingUnreliable;
3523 info.m_cbPendingReliable = m_senderState.m_cbPendingReliable;
3524 info.m_cbSentUnackedReliable = m_senderState.m_cbSentUnackedReliable;
3525 if ( GetState() == k_ESteamNetworkingConnectionState_Connected )
3526 {
3527
3528 // Accumulate tokens so that we can properly predict when the next time we'll be able to send something is
3529 SNP_TokenBucket_Accumulate( usecNow );
3530
3531 //
3532 // Time until we can send the next packet
3533 // If anything is already queued, then that will have to go out first. Round it down
3534 // to the nearest packet.
3535 //
3536 // NOTE: This ignores the precise details of SNP framing. If there are tons of
3537 // small packets, it'll actually be worse. We might be able to approximate that
3538 // the framing overhead better by also counting up the number of *messages* pending.
3539 // Probably not worth it here, but if we had that number available, we'd use it.
3540 int cbPendingTotal = m_senderState.PendingBytesTotal() / m_cbMaxMessageNoFragment * m_cbMaxMessageNoFragment;
3541
3542 // Adjust based on how many tokens we have to spend now (or if we are already
3543 // over-budget and have to wait until we could spend another)
3544 cbPendingTotal -= (int)m_sendRateData.m_flTokenBucket;
3545 if ( cbPendingTotal <= 0 )
3546 {
3547 // We could send it right now.
3548 info.m_usecQueueTime = 0;
3549 }
3550 else
3551 {
3552 info.m_usecQueueTime = (int64)cbPendingTotal * k_nMillion / SNP_ClampSendRate(); // NOTE: Always use the current estimated bandwidth, NOT
3553 }
3554 }
3555 else
3556 {
3557 // We'll never be able to send it. (Or, we don't know when that will be.)
3558 info.m_usecQueueTime = INT64_MAX;
3559 }
3560 }
3561
3562 } // namespace SteamNetworkingSocketsLib
3563