1 //====== Copyright Valve Corporation, All rights reserved. ====================
2 
3 #ifdef __GNUC__
4 	// src/public/tier0/basetypes.h:104:30: error: assuming signed overflow does not occur when assuming that (X + c) < X is always false [-Werror=strict-overflow]
5 	// current steamrt:scout gcc "g++ (SteamRT 4.8.4-1ubuntu15~12.04+steamrt1.2+srt1) 4.8.4" requires this at the top due to optimizations
6 	#pragma GCC diagnostic ignored "-Wstrict-overflow"
7 #endif
8 
9 #include <tier1/utlpriorityqueue.h>
10 
11 #include "steamnetworkingsockets_thinker.h"
12 
13 #ifdef IS_STEAMDATAGRAMROUTER
14 	#include "router/sdr.h"
15 #else
16 	#include "clientlib/steamnetworkingsockets_lowlevel.h"
17 #endif
18 
19 // memdbgon must be the last include file in a .cpp file!!!
20 #include "tier0/memdbgon.h"
21 
22 namespace SteamNetworkingSocketsLib {
23 
24 /////////////////////////////////////////////////////////////////////////////
25 //
26 // Periodic processing
27 //
28 /////////////////////////////////////////////////////////////////////////////
29 
30 struct ThinkerLess
31 {
operator ()SteamNetworkingSocketsLib::ThinkerLess32 	bool operator()( const IThinker *a, const IThinker *b ) const
33 	{
34 		return a->GetNextThinkTime() > b->GetNextThinkTime();
35 	}
36 };
37 class ThinkerSetIndex
38 {
39 public:
SetIndex(IThinker * p,int idx)40 	static void SetIndex( IThinker *p, int idx ) { p->m_queueIndex = idx; }
41 };
42 
43 static CUtlPriorityQueue<IThinker*,ThinkerLess,ThinkerSetIndex> s_queueThinkers;
44 
IThinker()45 IThinker::IThinker()
46 : m_usecNextThinkTime( k_nThinkTime_Never )
47 , m_queueIndex( -1 )
48 {
49 }
50 
~IThinker()51 IThinker::~IThinker()
52 {
53 	ClearNextThinkTime();
54 }
55 
56 #ifdef __GNUC__
57 	// older steamrt:scout gcc requires this also, probably getting confused by unbalanced push/pop
58 	#pragma GCC diagnostic ignored "-Wstrict-overflow"
59 #endif
60 
61 #ifdef IS_STEAMDATAGRAMROUTER
unlockSteamNetworkingSocketsLib::ShortDurationLock62 struct ShortDurationLock { inline void lock() {}; inline void unlock() {}; };
63 static ShortDurationLock s_mutexThinkerTable;
64 #else
65 static ShortDurationLock s_mutexThinkerTable( "thinker" );
66 #endif
67 
68 // Base class isn't lockable
TryLock() const69 bool IThinker::TryLock() const { return true; }
70 
InternalEnsureMinThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)71 void IThinker::InternalEnsureMinThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
72 {
73 	s_mutexThinkerTable.lock();
74 	if ( usecTargetThinkTime < m_usecNextThinkTime )
75 		InternalSetNextThinkTime( usecTargetThinkTime );
76 	s_mutexThinkerTable.unlock();
77 }
78 
SetNextThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)79 void IThinker::SetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
80 {
81 	if ( usecTargetThinkTime == m_usecNextThinkTime )
82 		return;
83 	s_mutexThinkerTable.lock();
84 	InternalSetNextThinkTime( usecTargetThinkTime );
85 	s_mutexThinkerTable.unlock();
86 }
87 
InternalSetNextThinkTime(SteamNetworkingMicroseconds usecTargetThinkTime)88 void IThinker::InternalSetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
89 {
90 
91 
92 	// Protect against us blowing up because of an invalid think time.
93 	// Zero is reserved (since it often means there is an uninitialized value),
94 	// and our initial time value is effectively infinite compared to the
95 	// intervals we deal with in this code, so we should never need to deal
96 	// with a timestamp that far in the past.  See k_nThinkTime_ASAP
97 	if ( unlikely( usecTargetThinkTime <= 0 ) )
98 	{
99 		AssertMsg1( false, "Attempt to set target think time to %lld", (long long)usecTargetThinkTime );
100 		usecTargetThinkTime = SteamNetworkingSockets_GetLocalTimestamp() + 2000;
101 	}
102 
103 	// Clearing it?
104 	if ( usecTargetThinkTime == k_nThinkTime_Never )
105 	{
106 		if ( m_queueIndex >= 0 )
107 		{
108 			Assert( s_queueThinkers.Element( m_queueIndex ) == this );
109 			s_queueThinkers.RemoveAt( m_queueIndex );
110 			Assert( m_queueIndex == -1 );
111 		}
112 
113 		m_usecNextThinkTime = k_nThinkTime_Never;
114 		return;
115 	}
116 
117 	// Save current time when the next thinker wants service
118 	#ifndef IS_STEAMDATAGRAMROUTER
119 		SteamNetworkingMicroseconds usecNextWake = ( s_queueThinkers.Count() > 0 ) ? s_queueThinkers.ElementAtHead()->GetNextThinkTime() : k_nThinkTime_Never;
120 	#endif
121 
122 	// Not currently scheduled?
123 	if ( m_queueIndex < 0 )
124 	{
125 		Assert( m_usecNextThinkTime == k_nThinkTime_Never );
126 		m_usecNextThinkTime = usecTargetThinkTime;
127 		s_queueThinkers.Insert( this );
128 	}
129 	else
130 	{
131 
132 		// We're already scheduled.
133 		Assert( s_queueThinkers.Element( m_queueIndex ) == this );
134 		Assert( m_usecNextThinkTime != k_nThinkTime_Never );
135 
136 		// Set the new schedule time
137 		m_usecNextThinkTime = usecTargetThinkTime;
138 
139 		// And update our position in the queue
140 		s_queueThinkers.RevaluateElement( m_queueIndex );
141 	}
142 
143 	// Check that we know our place
144 	Assert( m_queueIndex >= 0 );
145 	Assert( s_queueThinkers.Element( m_queueIndex ) == this );
146 
147 	#ifndef IS_STEAMDATAGRAMROUTER
148 		// Do we need service before we were previously schedule to wake up?
149 		// If so, wake the thread now so that it can redo its schedule work
150 		// NOTE: On Windows we could use a waitable timer.  This would avoid
151 		// waking up the service thread just to re-schedule when it should
152 		// wake up for real.
153 		if ( m_usecNextThinkTime < usecNextWake )
154 			WakeSteamDatagramThread();
155 	#endif
156 }
157 
Thinker_GetNextScheduledThinkTime()158 SteamNetworkingMicroseconds IThinker::Thinker_GetNextScheduledThinkTime()
159 {
160 	SteamNetworkingMicroseconds usecResult = k_nThinkTime_Never;
161 	s_mutexThinkerTable.lock();
162 	if ( s_queueThinkers.Count() )
163 		usecResult = s_queueThinkers.ElementAtHead()->GetNextThinkTime();
164 	s_mutexThinkerTable.unlock();
165 	return usecResult;
166 }
167 
Thinker_ProcessThinkers()168 void IThinker::Thinker_ProcessThinkers()
169 {
170 	// We need the lock to access the thinker queue
171 	s_mutexThinkerTable.lock();
172 
173 	// Until the queue is empty
174 	int nIterations = 0;
175 	while ( s_queueThinkers.Count() > 0 )
176 	{
177 
178 		// Grab the head element
179 		IThinker *pNextThinker = s_queueThinkers.ElementAtHead();
180 
181 		// Refetch timestamp each time.  The reason is that certain thinkers
182 		// may pass through to other systems (e.g. fake lag) that fetch the time.
183 		// If we don't update the time here, that code may have used the newer
184 		// timestamp (e.g. to mark when a packet was received) and then
185 		// in our next iteration, we will use an older timestamp to process
186 		// a thinker.
187 		SteamNetworkingMicroseconds usecNow = SteamNetworkingSockets_GetLocalTimestamp();
188 
189 		// Scheduled too far in the future?
190 		if ( pNextThinker->GetNextThinkTime() >= usecNow )
191 		{
192 			// Keep waiting
193 			break;
194 		}
195 
196 		++nIterations;
197 		if ( nIterations > 10000 )
198 		{
199 			AssertMsg1( false, "Processed thinkers %d times -- probably one thinker keeps requesting an immediate wakeup call.", nIterations );
200 			break;
201 		}
202 
203 		// Try to acquire the thinker's lock, if any
204 		if ( pNextThinker->TryLock() )
205 		{
206 
207 			// Go ahead and clear his think time now and remove him
208 			// from the heap.  He needs to schedule a new think time
209 			// if heeds service again.  For thinkers that need frequent
210 			// service, removing them and then re-inserting them when
211 			// they reschedule is a bit of extra work that could be
212 			// optimized by trying to not remove them now, but adjusting
213 			// them once we know when they want to think.  But this
214 			// is probably just a bit too complicated for the expected
215 			// benefit.  If the number of total Thinkers is relatively
216 			// small (which it probably will be), the heap operations
217 			// are probably negligible.
218 			pNextThinker->InternalSetNextThinkTime( k_nThinkTime_Never );
219 
220 			// Release the global thinker table lock, so that other threads
221 			// can schedule work while we are doing work here.
222 			// (E.g. other connections can be accessed in the main thread,
223 			// and can mark the connection to wake up.)
224 			s_mutexThinkerTable.unlock();
225 
226 			// Execute callback.  (Note: this could result
227 			// in self-destruction or essentially any change
228 			// to the rest of the queue.)
229 			pNextThinker->Think( usecNow );
230 
231 			// Re-acquire table lock for the next check
232 			s_mutexThinkerTable.lock();
233 		}
234 		else
235 		{
236 			// Deadlock!  Should be extremely rare.  Reschedule him for 1ms in the
237 			// future, and we'll try again.
238 			pNextThinker->InternalSetNextThinkTime( usecNow + 1000 );
239 		}
240 	}
241 
242 	// Release table lock
243 	s_mutexThinkerTable.unlock();
244 }
245 
246 #ifdef DBGFLAG_VALIDATE
Thinker_ValidateStatics(CValidator & validator)247 void Thinker_ValidateStatics( CValidator &validator )
248 {
249 	ValidateRecursive( s_queueThinkers );
250 }
251 
Validate(CValidator & validator,const char * pchName)252 void IThinker::Validate( CValidator &validator, const char *pchName )
253 {
254 }
255 
256 #endif
257 
258 } // namespace SteamNetworkingSocketsLib
259 
260