1<?php 2/** 3 * Job queue code for federated queues. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 */ 22 23/** 24 * Class to handle enqueueing and running of background jobs for federated queues 25 * 26 * This class allows for queues to be partitioned into smaller queues. 27 * A partition is defined by the configuration for a JobQueue instance. 28 * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a 29 * JobQueueFederated instance, which itself would consist of three JobQueueRedis 30 * instances, each using their own redis server. This would allow for the jobs 31 * to be split (evenly or based on weights) across multiple servers if a single 32 * server becomes impractical or expensive. Different JobQueue classes can be mixed. 33 * 34 * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue 35 * is inherited by the partition queues. Additional configuration defines what 36 * section each wiki is in, what partition queues each section uses (and their weight), 37 * and the JobQueue configuration for each partition. Some sections might only need a 38 * single queue partition, like the sections for groups of small wikis. 39 * 40 * If used for performance, then $wgMainCacheType should be set to memcached/redis. 41 * Note that "fifo" cannot be used for the ordering, since the data is distributed. 42 * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, 43 * queue classes used by this should ignore down servers (with TTL) to avoid slowness. 44 * 45 * @ingroup JobQueue 46 * @since 1.22 47 */ 48class JobQueueFederated extends JobQueue { 49 /** @var HashRing */ 50 protected $partitionRing; 51 /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */ 52 protected $partitionQueues = []; 53 54 /** @var int Maximum number of partitions to try */ 55 protected $maxPartitionsTry; 56 57 /** 58 * @param array $params Possible keys: 59 * - sectionsByWiki : A map of wiki IDs to section names. 60 * Wikis will default to using the section "default". 61 * - partitionsBySection : Map of section names to maps of (partition name => weight). 62 * A section called 'default' must be defined if not all wikis 63 * have explicitly defined sections. 64 * - configByPartition : Map of queue partition names to configuration arrays. 65 * These configuration arrays are passed to JobQueue::factory(). 66 * The options set here are overridden by those passed to this 67 * the federated queue itself (e.g. 'order' and 'claimTTL'). 68 * - maxPartitionsTry : Maximum number of times to attempt job insertion using 69 * different partition queues. This improves availability 70 * during failure, at the cost of added latency and somewhat 71 * less reliable job de-duplication mechanisms. 72 * @throws MWException 73 */ 74 protected function __construct( array $params ) { 75 parent::__construct( $params ); 76 $section = $params['sectionsByWiki'][$this->domain] ?? 'default'; 77 if ( !isset( $params['partitionsBySection'][$section] ) ) { 78 throw new MWException( "No configuration for section '$section'." ); 79 } 80 $this->maxPartitionsTry = $params['maxPartitionsTry'] ?? 2; 81 // Get the full partition map 82 $partitionMap = $params['partitionsBySection'][$section]; 83 arsort( $partitionMap, SORT_NUMERIC ); 84 // Get the config to pass to merge into each partition queue config 85 $baseConfig = $params; 86 foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry', 87 'partitionsBySection', 'configByPartition', ] as $o 88 ) { 89 unset( $baseConfig[$o] ); // partition queue doesn't care about this 90 } 91 // Get the partition queue objects 92 foreach ( $partitionMap as $partition => $w ) { 93 if ( !isset( $params['configByPartition'][$partition] ) ) { 94 throw new MWException( "No configuration for partition '$partition'." ); 95 } 96 $this->partitionQueues[$partition] = JobQueue::factory( 97 $baseConfig + $params['configByPartition'][$partition] ); 98 } 99 // Ring of all partitions 100 $this->partitionRing = new HashRing( $partitionMap ); 101 } 102 103 protected function supportedOrders() { 104 // No FIFO due to partitioning, though "rough timestamp order" is supported 105 return [ 'undefined', 'random', 'timestamp' ]; 106 } 107 108 protected function optimalOrder() { 109 return 'undefined'; // defer to the partitions 110 } 111 112 protected function supportsDelayedJobs() { 113 foreach ( $this->partitionQueues as $queue ) { 114 if ( !$queue->supportsDelayedJobs() ) { 115 return false; 116 } 117 } 118 119 return true; 120 } 121 122 protected function doIsEmpty() { 123 $empty = true; 124 $failed = 0; 125 foreach ( $this->partitionQueues as $queue ) { 126 try { 127 $empty = $empty && $queue->doIsEmpty(); 128 } catch ( JobQueueError $e ) { 129 ++$failed; 130 $this->logException( $e ); 131 } 132 } 133 $this->throwErrorIfAllPartitionsDown( $failed ); 134 135 return $empty; 136 } 137 138 protected function doGetSize() { 139 return $this->getCrossPartitionSum( 'size', 'doGetSize' ); 140 } 141 142 protected function doGetAcquiredCount() { 143 return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); 144 } 145 146 protected function doGetDelayedCount() { 147 return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); 148 } 149 150 protected function doGetAbandonedCount() { 151 return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); 152 } 153 154 /** 155 * @param string $type 156 * @param string $method 157 * @return int 158 */ 159 protected function getCrossPartitionSum( $type, $method ) { 160 $count = 0; 161 $failed = 0; 162 foreach ( $this->partitionQueues as $queue ) { 163 try { 164 $count += $queue->$method(); 165 } catch ( JobQueueError $e ) { 166 ++$failed; 167 $this->logException( $e ); 168 } 169 } 170 $this->throwErrorIfAllPartitionsDown( $failed ); 171 172 return $count; 173 } 174 175 protected function doBatchPush( array $jobs, $flags ) { 176 // Local ring variable that may be changed to point to a new ring on failure 177 $partitionRing = $this->partitionRing; 178 // Try to insert the jobs and update $partitionsTry on any failures. 179 // Retry to insert any remaning jobs again, ignoring the bad partitions. 180 $jobsLeft = $jobs; 181 for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { 182 try { 183 $partitionRing->getLiveLocationWeights(); 184 } catch ( UnexpectedValueException $e ) { 185 break; // all servers down; nothing to insert to 186 } 187 $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); 188 } 189 if ( count( $jobsLeft ) ) { 190 throw new JobQueueError( 191 "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); 192 } 193 } 194 195 /** 196 * @param array $jobs 197 * @param HashRing &$partitionRing 198 * @param int $flags 199 * @throws JobQueueError 200 * @return IJobSpecification[] List of Job object that could not be inserted 201 */ 202 protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { 203 $jobsLeft = []; 204 205 // Because jobs are spread across partitions, per-job de-duplication needs 206 // to use a consistent hash to avoid allowing duplicate jobs per partition. 207 // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. 208 $uJobsByPartition = []; // (partition name => job list) 209 /** @var Job $job */ 210 foreach ( $jobs as $key => $job ) { 211 if ( $job->ignoreDuplicates() ) { 212 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); 213 $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; 214 unset( $jobs[$key] ); 215 } 216 } 217 // Get the batches of jobs that are not de-duplicated 218 if ( $flags & self::QOS_ATOMIC ) { 219 $nuJobBatches = [ $jobs ]; // all or nothing 220 } else { 221 // Split the jobs into batches and spread them out over servers if there 222 // are many jobs. This helps keep the partitions even. Otherwise, send all 223 // the jobs to a single partition queue to avoids the extra connections. 224 $nuJobBatches = array_chunk( $jobs, 300 ); 225 } 226 227 // Insert the de-duplicated jobs into the queues... 228 foreach ( $uJobsByPartition as $partition => $jobBatch ) { 229 /** @var JobQueue $queue */ 230 $queue = $this->partitionQueues[$partition]; 231 try { 232 $ok = true; 233 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 234 } catch ( JobQueueError $e ) { 235 $ok = false; 236 $this->logException( $e ); 237 } 238 if ( !$ok ) { 239 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { 240 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 241 } 242 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 243 } 244 } 245 246 // Insert the jobs that are not de-duplicated into the queues... 247 foreach ( $nuJobBatches as $jobBatch ) { 248 $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); 249 $queue = $this->partitionQueues[$partition]; 250 try { 251 $ok = true; 252 $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); 253 } catch ( JobQueueError $e ) { 254 $ok = false; 255 $this->logException( $e ); 256 } 257 if ( !$ok ) { 258 if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { 259 throw new JobQueueError( "Could not insert job(s), no partitions available." ); 260 } 261 $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted 262 } 263 } 264 265 return $jobsLeft; 266 } 267 268 protected function doPop() { 269 $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) 270 271 $failed = 0; 272 while ( count( $partitionsTry ) ) { 273 $partition = ArrayUtils::pickRandom( $partitionsTry ); 274 if ( $partition === false ) { 275 break; // all partitions at 0 weight 276 } 277 278 /** @var JobQueue $queue */ 279 $queue = $this->partitionQueues[$partition]; 280 try { 281 $job = $queue->pop(); 282 } catch ( JobQueueError $e ) { 283 ++$failed; 284 $this->logException( $e ); 285 $job = false; 286 } 287 if ( $job ) { 288 $job->setMetadata( 'QueuePartition', $partition ); 289 290 return $job; 291 } else { 292 unset( $partitionsTry[$partition] ); 293 } 294 } 295 $this->throwErrorIfAllPartitionsDown( $failed ); 296 297 return false; 298 } 299 300 protected function doAck( RunnableJob $job ) { 301 $partition = $job->getMetadata( 'QueuePartition' ); 302 if ( $partition === null ) { 303 throw new MWException( "The given job has no defined partition name." ); 304 } 305 306 $this->partitionQueues[$partition]->ack( $job ); 307 } 308 309 protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { 310 $signature = $job->getRootJobParams()['rootJobSignature']; 311 $partition = $this->partitionRing->getLiveLocation( $signature ); 312 try { 313 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); 314 } catch ( JobQueueError $e ) { 315 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { 316 $partition = $this->partitionRing->getLiveLocation( $signature ); 317 return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); 318 } 319 } 320 321 return false; 322 } 323 324 protected function doDeduplicateRootJob( IJobSpecification $job ) { 325 $signature = $job->getRootJobParams()['rootJobSignature']; 326 $partition = $this->partitionRing->getLiveLocation( $signature ); 327 try { 328 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); 329 } catch ( JobQueueError $e ) { 330 if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { 331 $partition = $this->partitionRing->getLiveLocation( $signature ); 332 return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); 333 } 334 } 335 336 return false; 337 } 338 339 protected function doDelete() { 340 $failed = 0; 341 /** @var JobQueue $queue */ 342 foreach ( $this->partitionQueues as $queue ) { 343 try { 344 $queue->doDelete(); 345 } catch ( JobQueueError $e ) { 346 ++$failed; 347 $this->logException( $e ); 348 } 349 } 350 $this->throwErrorIfAllPartitionsDown( $failed ); 351 return true; 352 } 353 354 protected function doWaitForBackups() { 355 $failed = 0; 356 /** @var JobQueue $queue */ 357 foreach ( $this->partitionQueues as $queue ) { 358 try { 359 $queue->waitForBackups(); 360 } catch ( JobQueueError $e ) { 361 ++$failed; 362 $this->logException( $e ); 363 } 364 } 365 $this->throwErrorIfAllPartitionsDown( $failed ); 366 } 367 368 protected function doFlushCaches() { 369 /** @var JobQueue $queue */ 370 foreach ( $this->partitionQueues as $queue ) { 371 $queue->doFlushCaches(); 372 } 373 } 374 375 public function getAllQueuedJobs() { 376 $iterator = new AppendIterator(); 377 378 /** @var JobQueue $queue */ 379 foreach ( $this->partitionQueues as $queue ) { 380 $iterator->append( $queue->getAllQueuedJobs() ); 381 } 382 383 return $iterator; 384 } 385 386 public function getAllDelayedJobs() { 387 $iterator = new AppendIterator(); 388 389 /** @var JobQueue $queue */ 390 foreach ( $this->partitionQueues as $queue ) { 391 $iterator->append( $queue->getAllDelayedJobs() ); 392 } 393 394 return $iterator; 395 } 396 397 public function getAllAcquiredJobs() { 398 $iterator = new AppendIterator(); 399 400 /** @var JobQueue $queue */ 401 foreach ( $this->partitionQueues as $queue ) { 402 $iterator->append( $queue->getAllAcquiredJobs() ); 403 } 404 405 return $iterator; 406 } 407 408 public function getAllAbandonedJobs() { 409 $iterator = new AppendIterator(); 410 411 /** @var JobQueue $queue */ 412 foreach ( $this->partitionQueues as $queue ) { 413 $iterator->append( $queue->getAllAbandonedJobs() ); 414 } 415 416 return $iterator; 417 } 418 419 public function getCoalesceLocationInternal() { 420 return "JobQueueFederated:wiki:{$this->domain}" . 421 sha1( serialize( array_keys( $this->partitionQueues ) ) ); 422 } 423 424 protected function doGetSiblingQueuesWithJobs( array $types ) { 425 $result = []; 426 427 $failed = 0; 428 /** @var JobQueue $queue */ 429 foreach ( $this->partitionQueues as $queue ) { 430 try { 431 $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); 432 if ( is_array( $nonEmpty ) ) { 433 $result = array_unique( array_merge( $result, $nonEmpty ) ); 434 } else { 435 return null; // not supported on all partitions; bail 436 } 437 if ( count( $result ) == count( $types ) ) { 438 break; // short-circuit 439 } 440 } catch ( JobQueueError $e ) { 441 ++$failed; 442 $this->logException( $e ); 443 } 444 } 445 $this->throwErrorIfAllPartitionsDown( $failed ); 446 447 return array_values( $result ); 448 } 449 450 protected function doGetSiblingQueueSizes( array $types ) { 451 $result = []; 452 $failed = 0; 453 /** @var JobQueue $queue */ 454 foreach ( $this->partitionQueues as $queue ) { 455 try { 456 $sizes = $queue->doGetSiblingQueueSizes( $types ); 457 if ( is_array( $sizes ) ) { 458 foreach ( $sizes as $type => $size ) { 459 $result[$type] = ( $result[$type] ?? 0 ) + $size; 460 } 461 } else { 462 return null; // not supported on all partitions; bail 463 } 464 } catch ( JobQueueError $e ) { 465 ++$failed; 466 $this->logException( $e ); 467 } 468 } 469 $this->throwErrorIfAllPartitionsDown( $failed ); 470 471 return $result; 472 } 473 474 protected function logException( Exception $e ) { 475 wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() ); 476 } 477 478 /** 479 * Throw an error if no partitions available 480 * 481 * @param int $down The number of up partitions down 482 * @return void 483 * @throws JobQueueError 484 */ 485 protected function throwErrorIfAllPartitionsDown( $down ) { 486 if ( $down >= count( $this->partitionQueues ) ) { 487 throw new JobQueueError( 'No queue partitions available.' ); 488 } 489 } 490} 491