1<?php 2/** 3 * This program is free software; you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation; either version 2 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License along 14 * with this program; if not, write to the Free Software Foundation, Inc., 15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 16 * http://www.gnu.org/copyleft/gpl.html 17 * 18 * @file 19 * @ingroup Database 20 */ 21 22namespace Wikimedia\Rdbms; 23 24use BagOStuff; 25use Psr\Log\LoggerInterface; 26use Psr\Log\NullLogger; 27use RuntimeException; 28use WANObjectCache; 29use Wikimedia\ScopedCallback; 30 31/** 32 * Basic DB load monitor with no external dependencies 33 * 34 * Uses both server-local and shared caches for server state information. 35 * 36 * The "domain" parameters are unused, though they might be used in the future. 37 * Therefore, at present, this assumes one channel of replication per server. 38 * 39 * @ingroup Database 40 */ 41class LoadMonitor implements ILoadMonitor { 42 /** @var ILoadBalancer */ 43 protected $lb; 44 /** @var BagOStuff */ 45 protected $srvCache; 46 /** @var WANObjectCache */ 47 protected $wanCache; 48 /** @var LoggerInterface */ 49 protected $replLogger; 50 51 /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */ 52 private $movingAveRatio; 53 /** @var int Amount of replication lag in seconds before warnings are logged */ 54 private $lagWarnThreshold; 55 56 /** @var float|null */ 57 private $wallClockOverride; 58 59 /** @var bool Whether the "server states" cache key is in the process of being updated */ 60 private $serverStatesKeyLocked = false; 61 62 /** @var int Default 'max lag' in seconds when unspecified */ 63 private const LAG_WARN_THRESHOLD = 10; 64 65 /** @var int cache key version */ 66 private const VERSION = 1; 67 /** @var int Maximum effective logical TTL for server state cache */ 68 private const POLL_PERIOD_MS = 500; 69 /** @var int How long to cache server states including time past logical expiration */ 70 private const STATE_PRESERVE_TTL = 60; 71 /** @var int Max interval within which a server state refresh should happen */ 72 private const TIME_TILL_REFRESH = 1; 73 74 /** 75 * @param ILoadBalancer $lb 76 * @param BagOStuff $srvCache 77 * @param WANObjectCache $wCache 78 * @param array $options 79 * - movingAveRatio: moving average constant for server weight updates based on lag 80 * - lagWarnThreshold: how many seconds of lag trigger warnings 81 */ 82 public function __construct( 83 ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, array $options = [] 84 ) { 85 $this->lb = $lb; 86 $this->srvCache = $srvCache; 87 $this->wanCache = $wCache; 88 $this->replLogger = new NullLogger(); 89 90 $this->movingAveRatio = $options['movingAveRatio'] ?? 0.1; 91 $this->lagWarnThreshold = $options['lagWarnThreshold'] ?? self::LAG_WARN_THRESHOLD; 92 } 93 94 public function setLogger( LoggerInterface $logger ) { 95 $this->replLogger = $logger; 96 } 97 98 final public function scaleLoads( array &$weightByServer, $domain ) { 99 $serverIndexes = array_keys( $weightByServer ); 100 $states = $this->getServerStates( $serverIndexes, $domain ); 101 $newScalesByServer = $states['weightScales']; 102 foreach ( $weightByServer as $i => $weight ) { 103 if ( isset( $newScalesByServer[$i] ) ) { 104 $weightByServer[$i] = (int)ceil( $weight * $newScalesByServer[$i] ); 105 } else { // server recently added to config? 106 $host = $this->lb->getServerName( $i ); 107 $this->replLogger->error( __METHOD__ . ": host $host not in cache" ); 108 } 109 } 110 } 111 112 final public function getLagTimes( array $serverIndexes, $domain ) { 113 return $this->getServerStates( $serverIndexes, $domain )['lagTimes']; 114 } 115 116 /** 117 * @param array $serverIndexes 118 * @param string|bool $domain 119 * @return array 120 * @throws DBAccessError 121 */ 122 protected function getServerStates( array $serverIndexes, $domain ) { 123 // Represent the cluster by the name of the master DB 124 $cluster = $this->lb->getServerName( $this->lb->getWriterIndex() ); 125 126 // Randomize logical TTLs to reduce stampedes 127 $ageStaleSec = mt_rand( 1, self::POLL_PERIOD_MS ) / 1e3; 128 $minAsOfTime = $this->getCurrentTime() - $ageStaleSec; 129 130 // (a) Check the local server cache 131 $srvCacheKey = $this->getStatesCacheKey( $this->srvCache, $serverIndexes ); 132 $value = $this->srvCache->get( $srvCacheKey ); 133 if ( $value && $value['timestamp'] > $minAsOfTime ) { 134 $this->replLogger->debug( __METHOD__ . ": used fresh '$cluster' cluster status" ); 135 136 return $value; // cache hit 137 } 138 139 // (b) Value is stale/missing; try to use/refresh the shared cache 140 $scopedLock = $this->srvCache->getScopedLock( $srvCacheKey, 0, 10 ); 141 if ( !$scopedLock && $value ) { 142 $this->replLogger->debug( __METHOD__ . ": used stale '$cluster' cluster status" ); 143 // (b1) Another thread on this server is already checking the shared cache 144 return $value; 145 } 146 147 // (b2) This thread gets to check the shared cache or (b3) value is missing 148 $staleValue = $value; 149 $updated = false; // whether the regeneration callback ran 150 $value = $this->wanCache->getWithSetCallback( 151 $this->getStatesCacheKey( $this->wanCache, $serverIndexes ), 152 self::TIME_TILL_REFRESH, // 1 second logical expiry 153 function ( $oldValue, &$ttl ) use ( $serverIndexes, $domain, $staleValue, &$updated ) { 154 // Sanity check for circular recursion in computeServerStates()/getWeightScale(). 155 // Mainly, connection attempts should use LoadBalancer::getServerConnection() 156 // rather than something that will pick a server based on the server states. 157 $scopedLock = $this->acquireServerStatesLoopGuard(); 158 if ( !$scopedLock ) { 159 throw new RuntimeException( 160 "Circular recursion detected while regenerating server states cache. " . 161 "This may indicate improper connection handling in " . get_class( $this ) 162 ); 163 } 164 165 $updated = true; 166 167 return $this->computeServerStates( 168 $serverIndexes, 169 $domain, 170 $oldValue ?: $staleValue // fallback to local cache stale value 171 ); 172 }, 173 [ 174 // One thread can update at a time; others use the old value 175 'lockTSE' => self::STATE_PRESERVE_TTL, 176 'staleTTL' => self::STATE_PRESERVE_TTL, 177 // If there is no shared stale value then use the local cache stale value; 178 // When even that is not possible, then use the trivial value below. 179 'busyValue' => $staleValue ?: $this->getPlaceholderServerStates( $serverIndexes ) 180 ] 181 ); 182 183 if ( $updated ) { 184 $this->replLogger->info( __METHOD__ . ": regenerated '$cluster' cluster status" ); 185 } else { 186 $this->replLogger->debug( __METHOD__ . ": used cached '$cluster' cluster status" ); 187 } 188 189 // Backfill the local server cache 190 if ( $scopedLock ) { 191 $this->srvCache->set( $srvCacheKey, $value, self::STATE_PRESERVE_TTL ); 192 } 193 194 return $value; 195 } 196 197 /** 198 * @param array $serverIndexes 199 * @param string|bool $domain 200 * @param array|false $priorStates 201 * @return array 202 * @throws DBAccessError 203 */ 204 protected function computeServerStates( array $serverIndexes, $domain, $priorStates ) { 205 // Check if there is just a master DB (no replication involved) 206 if ( $this->lb->getServerCount() <= 1 ) { 207 return $this->getPlaceholderServerStates( $serverIndexes ); 208 } 209 210 $priorScales = $priorStates ? $priorStates['weightScales'] : []; 211 212 $lagTimes = []; 213 $weightScales = []; 214 foreach ( $serverIndexes as $i ) { 215 $isMaster = ( $i == $this->lb->getWriterIndex() ); 216 // If the master DB has zero load, then typical read queries do not use it. 217 // In that case, avoid connecting to it since this method might run in any 218 // datacenter, and the master DB might be geographically remote. 219 if ( $isMaster && $this->lb->getServerInfo( $i )['load'] <= 0 ) { 220 $lagTimes[$i] = 0; 221 // Callers only use this DB if they have *no choice* anyway (e.g. writes) 222 $weightScales[$i] = 1.0; 223 continue; 224 } 225 226 $host = $this->lb->getServerName( $i ); 227 # Handles with open transactions are avoided since they might be subject 228 # to REPEATABLE-READ snapshots, which could affect the lag estimate query. 229 $flags = ILoadBalancer::CONN_TRX_AUTOCOMMIT | ILoadBalancer::CONN_SILENCE_ERRORS; 230 $conn = $this->lb->getAnyOpenConnection( $i, $flags ); 231 if ( $conn ) { 232 $close = false; // already open 233 } else { 234 // Get a connection to this server without triggering other server connections 235 $conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags ); 236 $close = true; // new connection 237 } 238 239 // Get new weight scale using a moving average of the naïve and prior values 240 $lastScale = $priorScales[$i] ?? 1.0; 241 $naiveScale = $this->getWeightScale( $i, $conn ?: null ); 242 $newScale = $this->getNewScaleViaMovingAve( 243 $lastScale, 244 $naiveScale, 245 $this->movingAveRatio 246 ); 247 248 // Scale from 0% to 100% of nominal weight (sanity) 249 $weightScales[$i] = max( $newScale, 0.0 ); 250 251 // Mark replication lag on this server as "false" if it is unreacheable 252 if ( !$conn ) { 253 $lagTimes[$i] = $isMaster ? 0 : false; 254 $this->replLogger->error( 255 __METHOD__ . ": host {db_server} is unreachable", 256 [ 'db_server' => $host ] 257 ); 258 continue; 259 } 260 261 // Determine the amount of replication lag on this server 262 try { 263 $lagTimes[$i] = $conn->getLag(); 264 } catch ( DBError $e ) { 265 // Mark the lag time as "false" if it cannot be queried 266 $lagTimes[$i] = false; 267 } 268 269 if ( $lagTimes[$i] === false ) { 270 $this->replLogger->error( 271 __METHOD__ . ": host {db_server} is not replicating?", 272 [ 'db_server' => $host ] 273 ); 274 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold ) { 275 $this->replLogger->warning( 276 "Server {dbserver} has {lag} seconds of lag (>= {maxlag})", 277 [ 278 'dbserver' => $host, 279 'lag' => $lagTimes[$i], 280 'maxlag' => $this->lagWarnThreshold 281 ] 282 ); 283 } 284 285 if ( $close ) { 286 # Close the connection to avoid sleeper connections piling up. 287 # Note that the caller will pick one of these DBs and reconnect, 288 # which is slightly inefficient, but this only matters for the lag 289 # time cache miss cache, which is far less common that cache hits. 290 $this->lb->closeConnection( $conn ); 291 } 292 } 293 294 return [ 295 'lagTimes' => $lagTimes, 296 'weightScales' => $weightScales, 297 'timestamp' => $this->getCurrentTime() 298 ]; 299 } 300 301 /** 302 * @param int[] $serverIndexes 303 * @return array 304 */ 305 private function getPlaceholderServerStates( array $serverIndexes ) { 306 return [ 307 'lagTimes' => array_fill_keys( $serverIndexes, 0 ), 308 'weightScales' => array_fill_keys( $serverIndexes, 1.0 ), 309 'timestamp' => $this->getCurrentTime() 310 ]; 311 } 312 313 /** 314 * @param int $index Server index 315 * @param IDatabase|null $conn Connection handle or null on connection failure 316 * @return float 317 * @since 1.28 318 */ 319 protected function getWeightScale( $index, IDatabase $conn = null ) { 320 return $conn ? 1.0 : 0.0; 321 } 322 323 /** 324 * Get the moving average weight scale given a naive and the last iteration value 325 * 326 * One case of particular note is if a server totally cannot have its state queried. 327 * Ideally, the scale should be able to drop from 1.0 to a miniscule amount (say 0.001) 328 * fairly quickly. To get the time to reach 0.001, some calculations can be done: 329 * 330 * SCALE = $naiveScale * $movAveRatio + $lastScale * (1 - $movAveRatio) 331 * SCALE = 0 * $movAveRatio + $lastScale * (1 - $movAveRatio) 332 * SCALE = $lastScale * (1 - $movAveRatio) 333 * 334 * Given a starting weight scale of 1.0: 335 * 1.0 * (1 - $movAveRatio)^(# iterations) = 0.001 336 * ceil( log<1 - $movAveRatio>(0.001) ) = (# iterations) 337 * t = (# iterations) * (POLL_PERIOD + SHARED_CACHE_TTL) 338 * t = (# iterations) * (1e3 * POLL_PERIOD_MS + SHARED_CACHE_TTL) 339 * 340 * If $movAveRatio is 0.5, then: 341 * t = ceil( log<0.5>(0.01) ) * 1.5 = 7 * 1.5 = 10.5 seconds [for 1% scale] 342 * t = ceil( log<0.5>(0.001) ) * 1.5 = 10 * 1.5 = 15 seconds [for 0.1% scale] 343 * 344 * If $movAveRatio is 0.8, then: 345 * t = ceil( log<0.2>(0.01) ) * 1.5 = 3 * 1.5 = 4.5 seconds [for 1% scale] 346 * t = ceil( log<0.2>(0.001) ) * 1.5 = 5 * 1.5 = 7.5 seconds [for 0.1% scale] 347 * 348 * Use of connection failure rate can greatly speed this process up 349 * 350 * @param float $lastScale Current moving average of scaling factors 351 * @param float $naiveScale New scaling factor 352 * @param float $movAveRatio Weight given to the new value 353 * @return float 354 * @since 1.35 355 */ 356 protected function getNewScaleViaMovingAve( $lastScale, $naiveScale, $movAveRatio ) { 357 return $movAveRatio * $naiveScale + ( 1 - $movAveRatio ) * $lastScale; 358 } 359 360 /** 361 * @param WANObjectCache|BagOStuff $cache 362 * @param array $serverIndexes 363 * @return string 364 */ 365 private function getStatesCacheKey( $cache, array $serverIndexes ) { 366 sort( $serverIndexes ); 367 // Lag is per-server, not per-DB, so key on the master DB name 368 return $cache->makeGlobalKey( 369 'rdbms-server-states', 370 self::VERSION, 371 $this->lb->getServerName( $this->lb->getWriterIndex() ), 372 implode( '-', $serverIndexes ) 373 ); 374 } 375 376 /** 377 * @return ScopedCallback|null 378 */ 379 private function acquireServerStatesLoopGuard() { 380 if ( $this->serverStatesKeyLocked ) { 381 return null; // locked 382 } 383 384 $this->serverStatesKeyLocked = true; // lock 385 386 return new ScopedCallback( function () { 387 $this->serverStatesKeyLocked = false; // unlock 388 } ); 389 } 390 391 /** 392 * @return float UNIX timestamp 393 * @codeCoverageIgnore 394 */ 395 protected function getCurrentTime() { 396 return $this->wallClockOverride ?: microtime( true ); 397 } 398 399 /** 400 * @param float|null &$time Mock UNIX timestamp for testing 401 * @codeCoverageIgnore 402 */ 403 public function setMockTime( &$time ) { 404 $this->wallClockOverride =& $time; 405 } 406} 407