1 //====== Copyright Valve Corporation, All rights reserved. ====================
2
3 #ifdef __GNUC__
4 // src/public/tier0/basetypes.h:104:30: error: assuming signed overflow does not occur when assuming that (X + c) < X is always false [-Werror=strict-overflow]
5 // current steamrt:scout gcc "g++ (SteamRT 4.8.4-1ubuntu15~12.04+steamrt1.2+srt1) 4.8.4" requires this at the top due to optimizations
6 #pragma GCC diagnostic ignored "-Wstrict-overflow"
7 #endif
8
9 #include <tier1/utlpriorityqueue.h>
10
11 #include "steamnetworkingsockets_thinker.h"
12
13 #ifdef IS_STEAMDATAGRAMROUTER
14 #include "router/sdr.h"
15 #else
16 #include "clientlib/steamnetworkingsockets_lowlevel.h"
17 #endif
18
19 // memdbgon must be the last include file in a .cpp file!!!
20 #include "tier0/memdbgon.h"
21
22 namespace SteamNetworkingSocketsLib {
23
24 /////////////////////////////////////////////////////////////////////////////
25 //
26 // Periodic processing
27 //
28 /////////////////////////////////////////////////////////////////////////////
29
30 struct ThinkerLess
31 {
operator ()SteamNetworkingSocketsLib::ThinkerLess32 bool operator()( const IThinker *a, const IThinker *b ) const
33 {
34 return a->GetNextThinkTime() > b->GetNextThinkTime();
35 }
36 };
37 class ThinkerSetIndex
38 {
39 public:
SetIndex(IThinker * p,int idx)40 static void SetIndex( IThinker *p, int idx ) { p->m_queueIndex = idx; }
41 };
42
43 static CUtlPriorityQueue<IThinker*,ThinkerLess,ThinkerSetIndex> s_queueThinkers;
44
IThinker()45 IThinker::IThinker()
46 : m_usecNextThinkTime( k_nThinkTime_Never )
47 , m_queueIndex( -1 )
48 {
49 }
50
~IThinker()51 IThinker::~IThinker()
52 {
53 ClearNextThinkTime();
54 }
55
56 #ifdef __GNUC__
57 // older steamrt:scout gcc requires this also, probably getting confused by unbalanced push/pop
58 #pragma GCC diagnostic ignored "-Wstrict-overflow"
59 #endif
60
61 #ifdef IS_STEAMDATAGRAMROUTER
unlockSteamNetworkingSocketsLib::ShortDurationLock62 struct ShortDurationLock { inline void lock() {}; inline void unlock() {}; };
63 static ShortDurationLock s_mutexThinkerTable;
64 #else
65 static ShortDurationLock s_mutexThinkerTable( "thinker" );
66 #endif
67
68 // Base class isn't lockable
TryLock() const69 bool IThinker::TryLock() const { return true; }
70
InternalEnsureMinThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)71 void IThinker::InternalEnsureMinThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
72 {
73 s_mutexThinkerTable.lock();
74 if ( usecTargetThinkTime < m_usecNextThinkTime )
75 InternalSetNextThinkTime( usecTargetThinkTime );
76 s_mutexThinkerTable.unlock();
77 }
78
SetNextThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)79 void IThinker::SetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
80 {
81 if ( usecTargetThinkTime == m_usecNextThinkTime )
82 return;
83 s_mutexThinkerTable.lock();
84 InternalSetNextThinkTime( usecTargetThinkTime );
85 s_mutexThinkerTable.unlock();
86 }
87
InternalSetNextThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)88 void IThinker::InternalSetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
89 {
90
91
92 // Protect against us blowing up because of an invalid think time.
93 // Zero is reserved (since it often means there is an uninitialized value),
94 // and our initial time value is effectively infinite compared to the
95 // intervals we deal with in this code, so we should never need to deal
96 // with a timestamp that far in the past. See k_nThinkTime_ASAP
97 if ( unlikely( usecTargetThinkTime <= 0 ) )
98 {
99 AssertMsg1( false, "Attempt to set target think time to %lld", (long long)usecTargetThinkTime );
100 usecTargetThinkTime = SteamNetworkingSockets_GetLocalTimestamp() + 2000;
101 }
102
103 // Clearing it?
104 if ( usecTargetThinkTime == k_nThinkTime_Never )
105 {
106 if ( m_queueIndex >= 0 )
107 {
108 Assert( s_queueThinkers.Element( m_queueIndex ) == this );
109 s_queueThinkers.RemoveAt( m_queueIndex );
110 Assert( m_queueIndex == -1 );
111 }
112
113 m_usecNextThinkTime = k_nThinkTime_Never;
114 return;
115 }
116
117 // Save current time when the next thinker wants service
118 #ifndef IS_STEAMDATAGRAMROUTER
119 SteamNetworkingMicroseconds usecNextWake = ( s_queueThinkers.Count() > 0 ) ? s_queueThinkers.ElementAtHead()->GetNextThinkTime() : k_nThinkTime_Never;
120 #endif
121
122 // Not currently scheduled?
123 if ( m_queueIndex < 0 )
124 {
125 Assert( m_usecNextThinkTime == k_nThinkTime_Never );
126 m_usecNextThinkTime = usecTargetThinkTime;
127 s_queueThinkers.Insert( this );
128 }
129 else
130 {
131
132 // We're already scheduled.
133 Assert( s_queueThinkers.Element( m_queueIndex ) == this );
134 Assert( m_usecNextThinkTime != k_nThinkTime_Never );
135
136 // Set the new schedule time
137 m_usecNextThinkTime = usecTargetThinkTime;
138
139 // And update our position in the queue
140 s_queueThinkers.RevaluateElement( m_queueIndex );
141 }
142
143 // Check that we know our place
144 Assert( m_queueIndex >= 0 );
145 Assert( s_queueThinkers.Element( m_queueIndex ) == this );
146
147 #ifndef IS_STEAMDATAGRAMROUTER
148 // Do we need service before we were previously schedule to wake up?
149 // If so, wake the thread now so that it can redo its schedule work
150 // NOTE: On Windows we could use a waitable timer. This would avoid
151 // waking up the service thread just to re-schedule when it should
152 // wake up for real.
153 if ( m_usecNextThinkTime < usecNextWake )
154 WakeSteamDatagramThread();
155 #endif
156 }
157
Thinker_GetNextScheduledThinkTime()158 SteamNetworkingMicroseconds IThinker::Thinker_GetNextScheduledThinkTime()
159 {
160 SteamNetworkingMicroseconds usecResult = k_nThinkTime_Never;
161 s_mutexThinkerTable.lock();
162 if ( s_queueThinkers.Count() )
163 usecResult = s_queueThinkers.ElementAtHead()->GetNextThinkTime();
164 s_mutexThinkerTable.unlock();
165 return usecResult;
166 }
167
Thinker_ProcessThinkers()168 void IThinker::Thinker_ProcessThinkers()
169 {
170 // We need the lock to access the thinker queue
171 s_mutexThinkerTable.lock();
172
173 // Until the queue is empty
174 int nIterations = 0;
175 while ( s_queueThinkers.Count() > 0 )
176 {
177
178 // Grab the head element
179 IThinker *pNextThinker = s_queueThinkers.ElementAtHead();
180
181 // Refetch timestamp each time. The reason is that certain thinkers
182 // may pass through to other systems (e.g. fake lag) that fetch the time.
183 // If we don't update the time here, that code may have used the newer
184 // timestamp (e.g. to mark when a packet was received) and then
185 // in our next iteration, we will use an older timestamp to process
186 // a thinker.
187 SteamNetworkingMicroseconds usecNow = SteamNetworkingSockets_GetLocalTimestamp();
188
189 // Scheduled too far in the future?
190 if ( pNextThinker->GetNextThinkTime() >= usecNow )
191 {
192 // Keep waiting
193 break;
194 }
195
196 ++nIterations;
197 if ( nIterations > 10000 )
198 {
199 AssertMsg1( false, "Processed thinkers %d times -- probably one thinker keeps requesting an immediate wakeup call.", nIterations );
200 break;
201 }
202
203 // Try to acquire the thinker's lock, if any
204 if ( pNextThinker->TryLock() )
205 {
206
207 // Go ahead and clear his think time now and remove him
208 // from the heap. He needs to schedule a new think time
209 // if heeds service again. For thinkers that need frequent
210 // service, removing them and then re-inserting them when
211 // they reschedule is a bit of extra work that could be
212 // optimized by trying to not remove them now, but adjusting
213 // them once we know when they want to think. But this
214 // is probably just a bit too complicated for the expected
215 // benefit. If the number of total Thinkers is relatively
216 // small (which it probably will be), the heap operations
217 // are probably negligible.
218 pNextThinker->InternalSetNextThinkTime( k_nThinkTime_Never );
219
220 // Release the global thinker table lock, so that other threads
221 // can schedule work while we are doing work here.
222 // (E.g. other connections can be accessed in the main thread,
223 // and can mark the connection to wake up.)
224 s_mutexThinkerTable.unlock();
225
226 // Execute callback. (Note: this could result
227 // in self-destruction or essentially any change
228 // to the rest of the queue.)
229 pNextThinker->Think( usecNow );
230
231 // Re-acquire table lock for the next check
232 s_mutexThinkerTable.lock();
233 }
234 else
235 {
236 // Deadlock! Should be extremely rare. Reschedule him for 1ms in the
237 // future, and we'll try again.
238 pNextThinker->InternalSetNextThinkTime( usecNow + 1000 );
239 }
240 }
241
242 // Release table lock
243 s_mutexThinkerTable.unlock();
244 }
245
246 #ifdef DBGFLAG_VALIDATE
Thinker_ValidateStatics(CValidator & validator)247 void Thinker_ValidateStatics( CValidator &validator )
248 {
249 ValidateRecursive( s_queueThinkers );
250 }
251
Validate(CValidator & validator,const char * pchName)252 void IThinker::Validate( CValidator &validator, const char *pchName )
253 {
254 }
255
256 #endif
257
258 } // namespace SteamNetworkingSocketsLib
259
260