1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 * @ingroup Database
20 */
21
22namespace Wikimedia\Rdbms;
23
24use BagOStuff;
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use RuntimeException;
28use WANObjectCache;
29use Wikimedia\ScopedCallback;
30
31/**
32 * Basic DB load monitor with no external dependencies
33 *
34 * Uses both server-local and shared caches for server state information.
35 *
36 * The "domain" parameters are unused, though they might be used in the future.
37 * Therefore, at present, this assumes one channel of replication per server.
38 *
39 * @ingroup Database
40 */
41class LoadMonitor implements ILoadMonitor {
42	/** @var ILoadBalancer */
43	protected $lb;
44	/** @var BagOStuff */
45	protected $srvCache;
46	/** @var WANObjectCache */
47	protected $wanCache;
48	/** @var LoggerInterface */
49	protected $replLogger;
50
51	/** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
52	private $movingAveRatio;
53	/** @var int Amount of replication lag in seconds before warnings are logged */
54	private $lagWarnThreshold;
55
56	/** @var float|null */
57	private $wallClockOverride;
58
59	/** @var bool Whether the "server states" cache key is in the process of being updated */
60	private $serverStatesKeyLocked = false;
61
62	/** @var int Default 'max lag' in seconds when unspecified */
63	private const LAG_WARN_THRESHOLD = 10;
64
65	/** @var int cache key version */
66	private const VERSION = 1;
67	/** @var int Maximum effective logical TTL for server state cache */
68	private const POLL_PERIOD_MS = 500;
69	/** @var int How long to cache server states including time past logical expiration */
70	private const STATE_PRESERVE_TTL = 60;
71	/** @var int Max interval within which a server state refresh should happen */
72	private const TIME_TILL_REFRESH = 1;
73
74	/**
75	 * @param ILoadBalancer $lb
76	 * @param BagOStuff $srvCache
77	 * @param WANObjectCache $wCache
78	 * @param array $options
79	 *   - movingAveRatio: moving average constant for server weight updates based on lag
80	 *   - lagWarnThreshold: how many seconds of lag trigger warnings
81	 */
82	public function __construct(
83		ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = []
84	) {
85		$this->lb = $lb;
86		$this->srvCache = $srvCache;
87		$this->wanCache = $wCache;
88		$this->replLogger = new NullLogger();
89
90		$this->movingAveRatio = $options['movingAveRatio'] ?? 0.1;
91		$this->lagWarnThreshold = $options['lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD;
92	}
93
94	public function setLogger( LoggerInterface $logger ) {
95		$this->replLogger = $logger;
96	}
97
98	final public function scaleLoads( array &$weightByServer, $domain ) {
99		$serverIndexes = array_keys( $weightByServer );
100		$states = $this->getServerStates( $serverIndexes, $domain );
101		$newScalesByServer = $states['weightScales'];
102		foreach ( $weightByServer as $i => $weight ) {
103			if ( isset( $newScalesByServer[$i] ) ) {
104				$weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] );
105			} else { // server recently added to config?
106				$host = $this->lb->getServerName( $i );
107				$this->replLogger->error( __METHOD__ . ": host $host not in cache" );
108			}
109		}
110	}
111
112	final public function getLagTimes( array $serverIndexes, $domain ) {
113		return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
114	}
115
116	/**
117	 * @param array $serverIndexes
118	 * @param string|bool $domain
119	 * @return array
120	 * @throws DBAccessError
121	 */
122	protected function getServerStates( array $serverIndexes, $domain ) {
123		// Represent the cluster by the name of the master DB
124		$cluster = $this->lb->getServerName( $this->lb->getWriterIndex() );
125
126		// Randomize logical TTLs to reduce stampedes
127		$ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3;
128		$minAsOfTime = $this->getCurrentTime() - $ageStaleSec;
129
130		// (a) Check the local server cache
131		$srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes );
132		$value = $this->srvCache->get( $srvCacheKey );
133		if ( $value && $value['timestamp'] > $minAsOfTime ) {
134			$this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" );
135
136			return $value; // cache hit
137		}
138
139		// (b) Value is stale/missing; try to use/refresh the shared cache
140		$scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 );
141		if ( !$scopedLock && $value ) {
142			$this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" );
143			// (b1) Another thread on this server is already checking the shared cache
144			return $value;
145		}
146
147		// (b2) This thread gets to check the shared cache or (b3) value is missing
148		$staleValue = $value;
149		$updated = false; // whether the regeneration callback ran
150		$value = $this->wanCache->getWithSetCallback(
151			$this->getStatesCacheKey( $this->wanCache, $serverIndexes ),
152			self::TIME_TILL_REFRESH, // 1 second logical expiry
153			function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) {
154				// Sanity check for circular recursion in computeServerStates()/getWeightScale().
155				// Mainly, connection attempts should use LoadBalancer::getServerConnection()
156				// rather than something that will pick a server based on the server states.
157				$scopedLock = $this->acquireServerStatesLoopGuard();
158				if ( !$scopedLock ) {
159					throw new RuntimeException(
160						"Circular recursion detected while regenerating server states cache. " .
161						"This may indicate improper connection handling in " . get_class( $this )
162					);
163				}
164
165				$updated = true;
166
167				return $this->computeServerStates(
168					$serverIndexes,
169					$domain,
170					$oldValue ?: $staleValue // fallback to local cache stale value
171				);
172			},
173			[
174				// One thread can update at a time; others use the old value
175				'lockTSE' => self::STATE_PRESERVE_TTL,
176				'staleTTL' => self::STATE_PRESERVE_TTL,
177				// If there is no shared stale value then use the local cache stale value;
178				// When even that is not possible, then use the trivial value below.
179				'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes )
180			]
181		);
182
183		if ( $updated ) {
184			$this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" );
185		} else {
186			$this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" );
187		}
188
189		// Backfill the local server cache
190		if ( $scopedLock ) {
191			$this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL );
192		}
193
194		return $value;
195	}
196
197	/**
198	 * @param array $serverIndexes
199	 * @param string|bool $domain
200	 * @param array|false $priorStates
201	 * @return array
202	 * @throws DBAccessError
203	 */
204	protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) {
205		// Check if there is just a master DB (no replication involved)
206		if ( $this->lb->getServerCount() <= 1 ) {
207			return $this->getPlaceholderServerStates( $serverIndexes );
208		}
209
210		$priorScales = $priorStates ? $priorStates['weightScales'] : [];
211
212		$lagTimes = [];
213		$weightScales = [];
214		foreach ( $serverIndexes as $i ) {
215			$isMaster = ( $i == $this->lb->getWriterIndex() );
216			// If the master DB has zero load, then typical read queries do not use it.
217			// In that case, avoid connecting to it since this method might run in any
218			// datacenter, and the master DB might be geographically remote.
219			if ( $isMaster && $this->lb->getServerInfo( $i )['load'] <= 0 ) {
220				$lagTimes[$i] = 0;
221				// Callers only use this DB if they have *no choice* anyway (e.g. writes)
222				$weightScales[$i] = 1.0;
223				continue;
224			}
225
226			$host = $this->lb->getServerName( $i );
227			# Handles with open transactions are avoided since they might be subject
228			# to REPEATABLE-READ snapshots, which could affect the lag estimate query.
229			$flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS;
230			$conn = $this->lb->getAnyOpenConnection( $i, $flags );
231			if ( $conn ) {
232				$close = false; // already open
233			} else {
234				// Get a connection to this server without triggering other server connections
235				$conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
236				$close = true; // new connection
237			}
238
239			// Get new weight scale using a moving average of the naïve and prior values
240			$lastScale = $priorScales[$i] ?? 1.0;
241			$naiveScale = $this->getWeightScale( $i, $conn ?: null );
242			$newScale = $this->getNewScaleViaMovingAve(
243				$lastScale,
244				$naiveScale,
245				$this->movingAveRatio
246			);
247
248			// Scale from 0% to 100% of nominal weight (sanity)
249			$weightScales[$i] = max( $newScale, 0.0 );
250
251			// Mark replication lag on this server as "false" if it is unreacheable
252			if ( !$conn ) {
253				$lagTimes[$i] = $isMaster ? 0 : false;
254				$this->replLogger->error(
255					__METHOD__ . ": host {db_server} is unreachable",
256					[ 'db_server' => $host ]
257				);
258				continue;
259			}
260
261			// Determine the amount of replication lag on this server
262			try {
263				$lagTimes[$i] = $conn->getLag();
264			} catch ( DBError $e ) {
265				// Mark the lag time as "false" if it cannot be queried
266				$lagTimes[$i] = false;
267			}
268
269			if ( $lagTimes[$i] === false ) {
270				$this->replLogger->error(
271					__METHOD__ . ": host {db_server} is not replicating?",
272					[ 'db_server' => $host ]
273				);
274			} elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) {
275				$this->replLogger->warning(
276					"Server {dbserver} has {lag} seconds of lag (>= {maxlag})",
277					[
278						'dbserver' => $host,
279						'lag' => $lagTimes[$i],
280						'maxlag' => $this->lagWarnThreshold
281					]
282				);
283			}
284
285			if ( $close ) {
286				# Close the connection to avoid sleeper connections piling up.
287				# Note that the caller will pick one of these DBs and reconnect,
288				# which is slightly inefficient, but this only matters for the lag
289				# time cache miss cache, which is far less common that cache hits.
290				$this->lb->closeConnection( $conn );
291			}
292		}
293
294		return [
295			'lagTimes' => $lagTimes,
296			'weightScales' => $weightScales,
297			'timestamp' => $this->getCurrentTime()
298		];
299	}
300
301	/**
302	 * @param int[] $serverIndexes
303	 * @return array
304	 */
305	private function getPlaceholderServerStates( array $serverIndexes ) {
306		return [
307			'lagTimes' => array_fill_keys( $serverIndexes, 0 ),
308			'weightScales' => array_fill_keys( $serverIndexes, 1.0 ),
309			'timestamp' => $this->getCurrentTime()
310		];
311	}
312
313	/**
314	 * @param int $index Server index
315	 * @param IDatabase|null $conn Connection handle or null on connection failure
316	 * @return float
317	 * @since 1.28
318	 */
319	protected function getWeightScale( $index, IDatabase $conn = null ) {
320		return $conn ? 1.0 : 0.0;
321	}
322
323	/**
324	 * Get the moving average weight scale given a naive and the last iteration value
325	 *
326	 * One case of particular note is if a server totally cannot have its state queried.
327	 * Ideally, the scale should be able to drop from 1.0 to a miniscule amount (say 0.001)
328	 * fairly quickly. To get the time to reach 0.001, some calculations can be done:
329	 *
330	 * SCALE = $naiveScale * $movAveRatio + $lastScale * (1 - $movAveRatio)
331	 * SCALE = 0 * $movAveRatio + $lastScale * (1 - $movAveRatio)
332	 * SCALE = $lastScale * (1 - $movAveRatio)
333	 *
334	 * Given a starting weight scale of 1.0:
335	 * 1.0 * (1 - $movAveRatio)^(# iterations) = 0.001
336	 * ceil( log<1 - $movAveRatio>(0.001) ) = (# iterations)
337	 * t = (# iterations) * (POLL_PERIOD + SHARED_CACHE_TTL)
338	 * t = (# iterations) * (1e3 * POLL_PERIOD_MS + SHARED_CACHE_TTL)
339	 *
340	 * If $movAveRatio is 0.5, then:
341	 * t = ceil( log<0.5>(0.01) ) * 1.5 = 7 * 1.5 = 10.5 seconds [for 1% scale]
342	 * t = ceil( log<0.5>(0.001) ) * 1.5 = 10 * 1.5 = 15 seconds [for 0.1% scale]
343	 *
344	 * If $movAveRatio is 0.8, then:
345	 * t = ceil( log<0.2>(0.01) ) * 1.5 = 3 * 1.5 = 4.5 seconds [for 1% scale]
346	 * t = ceil( log<0.2>(0.001) ) * 1.5 = 5 * 1.5 = 7.5 seconds [for 0.1% scale]
347	 *
348	 * Use of connection failure rate can greatly speed this process up
349	 *
350	 * @param float $lastScale Current moving average of scaling factors
351	 * @param float $naiveScale New scaling factor
352	 * @param float $movAveRatio Weight given to the new value
353	 * @return float
354	 * @since 1.35
355	 */
356	protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) {
357		return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale;
358	}
359
360	/**
361	 * @param WANObjectCache|BagOStuff $cache
362	 * @param array $serverIndexes
363	 * @return string
364	 */
365	private function getStatesCacheKey( $cache, array $serverIndexes ) {
366		sort( $serverIndexes );
367		// Lag is per-server, not per-DB, so key on the master DB name
368		return $cache->makeGlobalKey(
369			'rdbms-server-states',
370			self::VERSION,
371			$this->lb->getServerName( $this->lb->getWriterIndex() ),
372			implode( '-', $serverIndexes )
373		);
374	}
375
376	/**
377	 * @return ScopedCallback|null
378	 */
379	private function acquireServerStatesLoopGuard() {
380		if ( $this->serverStatesKeyLocked ) {
381			return null; // locked
382		}
383
384		$this->serverStatesKeyLocked = true; // lock
385
386		return new ScopedCallback( function () {
387			$this->serverStatesKeyLocked = false; // unlock
388		} );
389	}
390
391	/**
392	 * @return float UNIX timestamp
393	 * @codeCoverageIgnore
394	 */
395	protected function getCurrentTime() {
396		return $this->wallClockOverride ?: microtime( true );
397	}
398
399	/**
400	 * @param float|null &$time Mock UNIX timestamp for testing
401	 * @codeCoverageIgnore
402	 */
403	public function setMockTime( &$time ) {
404		$this->wallClockOverride =& $time;
405	}
406}
407