1<?php 2/** 3 * Database-backed job queue code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 */ 22use MediaWiki\MediaWikiServices; 23use Wikimedia\Rdbms\Database; 24use Wikimedia\Rdbms\DBConnectionError; 25use Wikimedia\Rdbms\DBError; 26use Wikimedia\Rdbms\IDatabase; 27use Wikimedia\Rdbms\IMaintainableDatabase; 28use Wikimedia\ScopedCallback; 29 30/** 31 * Class to handle job queues stored in the DB 32 * 33 * @ingroup JobQueue 34 * @since 1.21 35 */ 36class JobQueueDB extends JobQueue { 37 private const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating 38 private const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed 39 private const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random 40 private const MAX_OFFSET = 255; // integer; maximum number of rows to skip 41 42 /** @var IMaintainableDatabase|DBError|null */ 43 protected $conn; 44 45 /** @var array|null Server configuration array */ 46 protected $server; 47 /** @var string|null Name of an external DB cluster or null for the local DB cluster */ 48 protected $cluster; 49 50 /** 51 * Additional parameters include: 52 * - server : Server configuration array for Database::factory. Overrides "cluster". 53 * - cluster : The name of an external cluster registered via LBFactory. 54 * If not specified, the primary DB cluster for the wiki will be used. 55 * This can be overridden with a custom cluster so that DB handles will 56 * be retrieved via LBFactory::getExternalLB() and getConnection(). 57 * @param array $params 58 */ 59 protected function __construct( array $params ) { 60 parent::__construct( $params ); 61 62 if ( isset( $params['server'] ) ) { 63 $this->server = $params['server']; 64 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { 65 $this->cluster = $params['cluster']; 66 } 67 } 68 69 protected function supportedOrders() { 70 return [ 'random', 'timestamp', 'fifo' ]; 71 } 72 73 protected function optimalOrder() { 74 return 'random'; 75 } 76 77 /** 78 * @see JobQueue::doIsEmpty() 79 * @return bool 80 */ 81 protected function doIsEmpty() { 82 $dbr = $this->getReplicaDB(); 83 /** @noinspection PhpUnusedLocalVariableInspection */ 84 $scope = $this->getScopedNoTrxFlag( $dbr ); 85 try { 86 $found = $dbr->selectField( // unclaimed job 87 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ 88 ); 89 } catch ( DBError $e ) { 90 throw $this->getDBException( $e ); 91 } 92 93 return !$found; 94 } 95 96 /** 97 * @see JobQueue::doGetSize() 98 * @return int 99 */ 100 protected function doGetSize() { 101 $key = $this->getCacheKey( 'size' ); 102 103 $size = $this->wanCache->get( $key ); 104 if ( is_int( $size ) ) { 105 return $size; 106 } 107 108 $dbr = $this->getReplicaDB(); 109 /** @noinspection PhpUnusedLocalVariableInspection */ 110 $scope = $this->getScopedNoTrxFlag( $dbr ); 111 try { 112 $size = (int)$dbr->selectField( 'job', 'COUNT(*)', 113 [ 'job_cmd' => $this->type, 'job_token' => '' ], 114 __METHOD__ 115 ); 116 } catch ( DBError $e ) { 117 throw $this->getDBException( $e ); 118 } 119 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); 120 121 return $size; 122 } 123 124 /** 125 * @see JobQueue::doGetAcquiredCount() 126 * @return int 127 */ 128 protected function doGetAcquiredCount() { 129 if ( $this->claimTTL <= 0 ) { 130 return 0; // no acknowledgements 131 } 132 133 $key = $this->getCacheKey( 'acquiredcount' ); 134 135 $count = $this->wanCache->get( $key ); 136 if ( is_int( $count ) ) { 137 return $count; 138 } 139 140 $dbr = $this->getReplicaDB(); 141 /** @noinspection PhpUnusedLocalVariableInspection */ 142 $scope = $this->getScopedNoTrxFlag( $dbr ); 143 try { 144 $count = (int)$dbr->selectField( 'job', 'COUNT(*)', 145 [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ], 146 __METHOD__ 147 ); 148 } catch ( DBError $e ) { 149 throw $this->getDBException( $e ); 150 } 151 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); 152 153 return $count; 154 } 155 156 /** 157 * @see JobQueue::doGetAbandonedCount() 158 * @return int 159 * @throws MWException 160 */ 161 protected function doGetAbandonedCount() { 162 if ( $this->claimTTL <= 0 ) { 163 return 0; // no acknowledgements 164 } 165 166 $key = $this->getCacheKey( 'abandonedcount' ); 167 168 $count = $this->wanCache->get( $key ); 169 if ( is_int( $count ) ) { 170 return $count; 171 } 172 173 $dbr = $this->getReplicaDB(); 174 /** @noinspection PhpUnusedLocalVariableInspection */ 175 $scope = $this->getScopedNoTrxFlag( $dbr ); 176 try { 177 $count = (int)$dbr->selectField( 'job', 'COUNT(*)', 178 [ 179 'job_cmd' => $this->type, 180 "job_token != {$dbr->addQuotes( '' )}", 181 "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) 182 ], 183 __METHOD__ 184 ); 185 } catch ( DBError $e ) { 186 throw $this->getDBException( $e ); 187 } 188 189 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); 190 191 return $count; 192 } 193 194 /** 195 * @see JobQueue::doBatchPush() 196 * @param IJobSpecification[] $jobs 197 * @param int $flags 198 * @throws DBError|Exception 199 * @return void 200 */ 201 protected function doBatchPush( array $jobs, $flags ) { 202 $dbw = $this->getMasterDB(); 203 /** @noinspection PhpUnusedLocalVariableInspection */ 204 $scope = $this->getScopedNoTrxFlag( $dbw ); 205 // In general, there will be two cases here: 206 // a) sqlite; DB connection is probably a regular round-aware handle. 207 // If the connection is busy with a transaction, then defer the job writes 208 // until right before the main round commit step. Any errors that bubble 209 // up will rollback the main commit round. 210 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. 211 // No transaction is active nor will be started by writes, so enqueue the jobs 212 // now so that any errors will show up immediately as the interface expects. Any 213 // errors that bubble up will rollback the main commit round. 214 $fname = __METHOD__; 215 $dbw->onTransactionPreCommitOrIdle( 216 function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) { 217 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); 218 }, 219 $fname 220 ); 221 } 222 223 /** 224 * This function should *not* be called outside of JobQueueDB 225 * 226 * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts 227 * @param IDatabase $dbw 228 * @param IJobSpecification[] $jobs 229 * @param int $flags 230 * @param string $method 231 * @throws DBError 232 * @return void 233 */ 234 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { 235 if ( $jobs === [] ) { 236 return; 237 } 238 239 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated 240 $rowList = []; // list of jobs for jobs that are not de-duplicated 241 foreach ( $jobs as $job ) { 242 $row = $this->insertFields( $job, $dbw ); 243 if ( $job->ignoreDuplicates() ) { 244 $rowSet[$row['job_sha1']] = $row; 245 } else { 246 $rowList[] = $row; 247 } 248 } 249 250 if ( $flags & self::QOS_ATOMIC ) { 251 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction 252 } 253 try { 254 // Strip out any duplicate jobs that are already in the queue... 255 if ( count( $rowSet ) ) { 256 $res = $dbw->select( 'job', 'job_sha1', 257 [ 258 // No job_type condition since it's part of the job_sha1 hash 259 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ), 260 'job_token' => '' // unclaimed 261 ], 262 $method 263 ); 264 foreach ( $res as $row ) { 265 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); 266 unset( $rowSet[$row->job_sha1] ); // already enqueued 267 } 268 } 269 // Build the full list of job rows to insert 270 $rows = array_merge( $rowList, array_values( $rowSet ) ); 271 // Insert the job rows in chunks to avoid replica DB lag... 272 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { 273 $dbw->insert( 'job', $rowBatch, $method ); 274 } 275 $this->incrStats( 'inserts', $this->type, count( $rows ) ); 276 $this->incrStats( 'dupe_inserts', $this->type, 277 count( $rowSet ) + count( $rowList ) - count( $rows ) 278 ); 279 } catch ( DBError $e ) { 280 throw $this->getDBException( $e ); 281 } 282 if ( $flags & self::QOS_ATOMIC ) { 283 $dbw->endAtomic( $method ); 284 } 285 } 286 287 /** 288 * @see JobQueue::doPop() 289 * @return RunnableJob|bool 290 */ 291 protected function doPop() { 292 $dbw = $this->getMasterDB(); 293 /** @noinspection PhpUnusedLocalVariableInspection */ 294 $scope = $this->getScopedNoTrxFlag( $dbw ); 295 296 $job = false; // job popped off 297 try { 298 $uuid = wfRandomString( 32 ); // pop attempt 299 do { // retry when our row is invalid or deleted as a duplicate 300 // Try to reserve a row in the DB... 301 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { 302 $row = $this->claimOldest( $uuid ); 303 } else { // random first 304 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs 305 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand 306 $row = $this->claimRandom( $uuid, $rand, $gte ); 307 } 308 // Check if we found a row to reserve... 309 if ( !$row ) { 310 break; // nothing to do 311 } 312 $this->incrStats( 'pops', $this->type ); 313 314 // Get the job object from the row... 315 $job = $this->jobFromRow( $row ); 316 break; // done 317 } while ( true ); 318 319 if ( !$job || mt_rand( 0, 9 ) == 0 ) { 320 // Handled jobs that need to be recycled/deleted; 321 // any recycled jobs will be picked up next attempt 322 $this->recycleAndDeleteStaleJobs(); 323 } 324 } catch ( DBError $e ) { 325 throw $this->getDBException( $e ); 326 } 327 328 return $job; 329 } 330 331 /** 332 * Reserve a row with a single UPDATE without holding row locks over RTTs... 333 * 334 * @param string $uuid 32 char hex string 335 * @param int $rand Random unsigned integer (31 bits) 336 * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) 337 * @return stdClass|bool Row|false 338 */ 339 protected function claimRandom( $uuid, $rand, $gte ) { 340 $dbw = $this->getMasterDB(); 341 /** @noinspection PhpUnusedLocalVariableInspection */ 342 $scope = $this->getScopedNoTrxFlag( $dbw ); 343 // Check cache to see if the queue has <= OFFSET items 344 $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); 345 346 $invertedDirection = false; // whether one job_random direction was already scanned 347 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT 348 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is 349 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot 350 // be used here with MySQL. 351 do { 352 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows 353 // For small queues, using OFFSET will overshoot and return no rows more often. 354 // Instead, this uses job_random to pick a row (possibly checking both directions). 355 $ineq = $gte ? '>=' : '<='; 356 $dir = $gte ? 'ASC' : 'DESC'; 357 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job 358 [ 359 'job_cmd' => $this->type, 360 'job_token' => '', // unclaimed 361 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ], 362 __METHOD__, 363 [ 'ORDER BY' => "job_random {$dir}" ] 364 ); 365 if ( !$row && !$invertedDirection ) { 366 $gte = !$gte; 367 $invertedDirection = true; 368 continue; // try the other direction 369 } 370 } else { // table *may* have >= MAX_OFFSET rows 371 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU 372 // in MySQL if there are many rows for some reason. This uses a small OFFSET 373 // instead of job_random for reducing excess claim retries. 374 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job 375 [ 376 'job_cmd' => $this->type, 377 'job_token' => '', // unclaimed 378 ], 379 __METHOD__, 380 [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ] 381 ); 382 if ( !$row ) { 383 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows 384 $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); 385 continue; // use job_random 386 } 387 } 388 389 if ( !$row ) { 390 break; 391 } 392 393 $dbw->update( 'job', // update by PK 394 [ 395 'job_token' => $uuid, 396 'job_token_timestamp' => $dbw->timestamp(), 397 'job_attempts = job_attempts+1' ], 398 [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ], 399 __METHOD__ 400 ); 401 // This might get raced out by another runner when claiming the previously 402 // selected row. The use of job_random should minimize this problem, however. 403 if ( !$dbw->affectedRows() ) { 404 $row = false; // raced out 405 } 406 } while ( !$row ); 407 408 return $row; 409 } 410 411 /** 412 * Reserve a row with a single UPDATE without holding row locks over RTTs... 413 * 414 * @param string $uuid 32 char hex string 415 * @return stdClass|bool Row|false 416 */ 417 protected function claimOldest( $uuid ) { 418 $dbw = $this->getMasterDB(); 419 /** @noinspection PhpUnusedLocalVariableInspection */ 420 $scope = $this->getScopedNoTrxFlag( $dbw ); 421 422 $row = false; // the row acquired 423 do { 424 if ( $dbw->getType() === 'mysql' ) { 425 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the 426 // same table being changed in an UPDATE query in MySQL (gives Error: 1093). 427 // Postgres has no such limitation. However, MySQL offers an 428 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. 429 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . 430 "SET " . 431 "job_token = {$dbw->addQuotes( $uuid ) }, " . 432 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . 433 "job_attempts = job_attempts+1 " . 434 "WHERE ( " . 435 "job_cmd = {$dbw->addQuotes( $this->type )} " . 436 "AND job_token = {$dbw->addQuotes( '' )} " . 437 ") ORDER BY job_id ASC LIMIT 1", 438 __METHOD__ 439 ); 440 } else { 441 // Use a subquery to find the job, within an UPDATE to claim it. 442 // This uses as much of the DB wrapper functions as possible. 443 $dbw->update( 'job', 444 [ 445 'job_token' => $uuid, 446 'job_token_timestamp' => $dbw->timestamp(), 447 'job_attempts = job_attempts+1' ], 448 [ 'job_id = (' . 449 $dbw->selectSQLText( 'job', 'job_id', 450 [ 'job_cmd' => $this->type, 'job_token' => '' ], 451 __METHOD__, 452 [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) . 453 ')' 454 ], 455 __METHOD__ 456 ); 457 } 458 459 if ( !$dbw->affectedRows() ) { 460 break; 461 } 462 463 // Fetch any row that we just reserved... 464 $row = $dbw->selectRow( 'job', self::selectFields(), 465 [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__ 466 ); 467 if ( !$row ) { // raced out by duplicate job removal 468 wfDebug( "Row deleted as duplicate by another process." ); 469 } 470 } while ( !$row ); 471 472 return $row; 473 } 474 475 /** 476 * @see JobQueue::doAck() 477 * @param RunnableJob $job 478 * @throws MWException 479 */ 480 protected function doAck( RunnableJob $job ) { 481 $id = $job->getMetadata( 'id' ); 482 if ( $id === null ) { 483 throw new MWException( "Job of type '{$job->getType()}' has no ID." ); 484 } 485 486 $dbw = $this->getMasterDB(); 487 /** @noinspection PhpUnusedLocalVariableInspection */ 488 $scope = $this->getScopedNoTrxFlag( $dbw ); 489 try { 490 // Delete a row with a single DELETE without holding row locks over RTTs... 491 $dbw->delete( 492 'job', 493 [ 'job_cmd' => $this->type, 'job_id' => $id ], 494 __METHOD__ 495 ); 496 497 $this->incrStats( 'acks', $this->type ); 498 } catch ( DBError $e ) { 499 throw $this->getDBException( $e ); 500 } 501 } 502 503 /** 504 * @see JobQueue::doDeduplicateRootJob() 505 * @param IJobSpecification $job 506 * @throws MWException 507 * @return bool 508 */ 509 protected function doDeduplicateRootJob( IJobSpecification $job ) { 510 // Callers should call JobQueueGroup::push() before this method so that if the 511 // insert fails, the de-duplication registration will be aborted. Since the insert 512 // is deferred till "transaction idle", do the same here, so that the ordering is 513 // maintained. Having only the de-duplication registration succeed would cause 514 // jobs to become no-ops without any actual jobs that made them redundant. 515 $dbw = $this->getMasterDB(); 516 /** @noinspection PhpUnusedLocalVariableInspection */ 517 $scope = $this->getScopedNoTrxFlag( $dbw ); 518 $dbw->onTransactionCommitOrIdle( 519 function () use ( $job ) { 520 parent::doDeduplicateRootJob( $job ); 521 }, 522 __METHOD__ 523 ); 524 525 return true; 526 } 527 528 /** 529 * @see JobQueue::doDelete() 530 * @return bool 531 */ 532 protected function doDelete() { 533 $dbw = $this->getMasterDB(); 534 /** @noinspection PhpUnusedLocalVariableInspection */ 535 $scope = $this->getScopedNoTrxFlag( $dbw ); 536 try { 537 $dbw->delete( 'job', [ 'job_cmd' => $this->type ], __METHOD__ ); 538 } catch ( DBError $e ) { 539 throw $this->getDBException( $e ); 540 } 541 542 return true; 543 } 544 545 /** 546 * @see JobQueue::doWaitForBackups() 547 * @return void 548 */ 549 protected function doWaitForBackups() { 550 if ( $this->server ) { 551 return; // not using LBFactory instance 552 } 553 554 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); 555 $lbFactory->waitForReplication( [ 556 'domain' => $this->domain, 557 'cluster' => is_string( $this->cluster ) ? $this->cluster : false 558 ] ); 559 } 560 561 /** 562 * @return void 563 */ 564 protected function doFlushCaches() { 565 foreach ( [ 'size', 'acquiredcount' ] as $type ) { 566 $this->wanCache->delete( $this->getCacheKey( $type ) ); 567 } 568 } 569 570 /** 571 * @see JobQueue::getAllQueuedJobs() 572 * @return Iterator 573 */ 574 public function getAllQueuedJobs() { 575 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); 576 } 577 578 /** 579 * @see JobQueue::getAllAcquiredJobs() 580 * @return Iterator 581 */ 582 public function getAllAcquiredJobs() { 583 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] ); 584 } 585 586 /** 587 * @see JobQueue::getAllAbandonedJobs() 588 * @return Iterator 589 */ 590 public function getAllAbandonedJobs() { 591 return $this->getJobIterator( [ 592 'job_cmd' => $this->getType(), 593 "job_token > ''", 594 "job_attempts >= " . intval( $this->maxTries ) 595 ] ); 596 } 597 598 /** 599 * @param array $conds Query conditions 600 * @return Iterator 601 */ 602 protected function getJobIterator( array $conds ) { 603 $dbr = $this->getReplicaDB(); 604 /** @noinspection PhpUnusedLocalVariableInspection */ 605 $scope = $this->getScopedNoTrxFlag( $dbr ); 606 try { 607 return new MappedIterator( 608 $dbr->select( 'job', self::selectFields(), $conds, __METHOD__ ), 609 function ( $row ) { 610 return $this->jobFromRow( $row ); 611 } 612 ); 613 } catch ( DBError $e ) { 614 throw $this->getDBException( $e ); 615 } 616 } 617 618 public function getCoalesceLocationInternal() { 619 if ( $this->server ) { 620 return null; // not using the LBFactory instance 621 } 622 623 return is_string( $this->cluster ) 624 ? "DBCluster:{$this->cluster}:{$this->domain}" 625 : "LBFactory:{$this->domain}"; 626 } 627 628 protected function doGetSiblingQueuesWithJobs( array $types ) { 629 $dbr = $this->getReplicaDB(); 630 /** @noinspection PhpUnusedLocalVariableInspection */ 631 $scope = $this->getScopedNoTrxFlag( $dbr ); 632 // @note: this does not check whether the jobs are claimed or not. 633 // This is useful so JobQueueGroup::pop() also sees queues that only 634 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue 635 // failed jobs so that they can be popped again for that edge case. 636 $res = $dbr->select( 'job', 'DISTINCT job_cmd', 637 [ 'job_cmd' => $types ], __METHOD__ ); 638 639 $types = []; 640 foreach ( $res as $row ) { 641 $types[] = $row->job_cmd; 642 } 643 644 return $types; 645 } 646 647 protected function doGetSiblingQueueSizes( array $types ) { 648 $dbr = $this->getReplicaDB(); 649 /** @noinspection PhpUnusedLocalVariableInspection */ 650 $scope = $this->getScopedNoTrxFlag( $dbr ); 651 652 $res = $dbr->select( 'job', [ 'job_cmd', 'count' => 'COUNT(*)' ], 653 [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] ); 654 655 $sizes = []; 656 foreach ( $res as $row ) { 657 $sizes[$row->job_cmd] = (int)$row->count; 658 } 659 660 return $sizes; 661 } 662 663 /** 664 * Recycle or destroy any jobs that have been claimed for too long 665 * 666 * @return int Number of jobs recycled/deleted 667 */ 668 public function recycleAndDeleteStaleJobs() { 669 $now = time(); 670 $count = 0; // affected rows 671 $dbw = $this->getMasterDB(); 672 /** @noinspection PhpUnusedLocalVariableInspection */ 673 $scope = $this->getScopedNoTrxFlag( $dbw ); 674 675 try { 676 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { 677 return $count; // already in progress 678 } 679 680 // Remove claims on jobs acquired for too long if enabled... 681 if ( $this->claimTTL > 0 ) { 682 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); 683 // Get the IDs of jobs that have be claimed but not finished after too long. 684 // These jobs can be recycled into the queue by expiring the claim. Selecting 685 // the IDs first means that the UPDATE can be done by primary key (less deadlocks). 686 $res = $dbw->select( 'job', 'job_id', 687 [ 688 'job_cmd' => $this->type, 689 "job_token != {$dbw->addQuotes( '' )}", // was acquired 690 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale 691 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left 692 __METHOD__ 693 ); 694 $ids = array_map( 695 static function ( $o ) { 696 return $o->job_id; 697 }, iterator_to_array( $res ) 698 ); 699 if ( count( $ids ) ) { 700 // Reset job_token for these jobs so that other runners will pick them up. 701 // Set the timestamp to the current time, as it is useful to now that the job 702 // was already tried before (the timestamp becomes the "released" time). 703 $dbw->update( 'job', 704 [ 705 'job_token' => '', 706 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release 707 ], 708 [ 'job_id' => $ids, "job_token != ''" ], 709 __METHOD__ 710 ); 711 $affected = $dbw->affectedRows(); 712 $count += $affected; 713 $this->incrStats( 'recycles', $this->type, $affected ); 714 } 715 } 716 717 // Just destroy any stale jobs... 718 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); 719 $conds = [ 720 'job_cmd' => $this->type, 721 "job_token != {$dbw->addQuotes( '' )}", // was acquired 722 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale 723 ]; 724 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... 725 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; 726 } 727 // Get the IDs of jobs that are considered stale and should be removed. Selecting 728 // the IDs first means that the UPDATE can be done by primary key (less deadlocks). 729 $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); 730 $ids = array_map( 731 static function ( $o ) { 732 return $o->job_id; 733 }, iterator_to_array( $res ) 734 ); 735 if ( count( $ids ) ) { 736 $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ ); 737 $affected = $dbw->affectedRows(); 738 $count += $affected; 739 $this->incrStats( 'abandons', $this->type, $affected ); 740 } 741 742 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); 743 } catch ( DBError $e ) { 744 throw $this->getDBException( $e ); 745 } 746 747 return $count; 748 } 749 750 /** 751 * @param IJobSpecification $job 752 * @param IDatabase $db 753 * @return array 754 */ 755 protected function insertFields( IJobSpecification $job, IDatabase $db ) { 756 return [ 757 // Fields that describe the nature of the job 758 'job_cmd' => $job->getType(), 759 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, 760 'job_title' => $job->getParams()['title'] ?? '', 761 'job_params' => self::makeBlob( $job->getParams() ), 762 // Additional job metadata 763 'job_timestamp' => $db->timestamp(), 764 'job_sha1' => Wikimedia\base_convert( 765 sha1( serialize( $job->getDeduplicationInfo() ) ), 766 16, 36, 31 767 ), 768 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) 769 ]; 770 } 771 772 /** 773 * @throws JobQueueConnectionError 774 * @return IDatabase 775 */ 776 protected function getReplicaDB() { 777 try { 778 return $this->getDB( DB_REPLICA ); 779 } catch ( DBConnectionError $e ) { 780 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); 781 } 782 } 783 784 /** 785 * @throws JobQueueConnectionError 786 * @return IMaintainableDatabase 787 */ 788 protected function getMasterDB() { 789 try { 790 return $this->getDB( DB_MASTER ); 791 } catch ( DBConnectionError $e ) { 792 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); 793 } 794 } 795 796 /** 797 * @param int $index (DB_REPLICA/DB_MASTER) 798 * @return IMaintainableDatabase 799 */ 800 protected function getDB( $index ) { 801 if ( $this->server ) { 802 if ( $this->conn instanceof IDatabase ) { 803 return $this->conn; 804 } elseif ( $this->conn instanceof DBError ) { 805 throw $this->conn; 806 } 807 808 try { 809 $this->conn = Database::factory( $this->server['type'], $this->server ); 810 } catch ( DBError $e ) { 811 $this->conn = $e; 812 throw $e; 813 } 814 815 return $this->conn; 816 } else { 817 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); 818 $lb = is_string( $this->cluster ) 819 ? $lbFactory->getExternalLB( $this->cluster ) 820 : $lbFactory->getMainLB( $this->domain ); 821 822 if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) { 823 // Keep a separate connection to avoid contention and deadlocks; 824 // However, SQLite has the opposite behavior due to DB-level locking. 825 $flags = $lb::CONN_TRX_AUTOCOMMIT; 826 } else { 827 // Jobs insertion will be defered until the PRESEND stage to reduce contention. 828 $flags = 0; 829 } 830 831 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); 832 } 833 } 834 835 /** 836 * @param IDatabase $db 837 * @return ScopedCallback 838 */ 839 private function getScopedNoTrxFlag( IDatabase $db ) { 840 $autoTrx = $db->getFlag( DBO_TRX ); // get current setting 841 $db->clearFlag( DBO_TRX ); // make each query its own transaction 842 843 return new ScopedCallback( static function () use ( $db, $autoTrx ) { 844 if ( $autoTrx ) { 845 $db->setFlag( DBO_TRX ); // restore old setting 846 } 847 } ); 848 } 849 850 /** 851 * @param string $property 852 * @return string 853 */ 854 private function getCacheKey( $property ) { 855 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; 856 857 return $this->wanCache->makeGlobalKey( 858 'jobqueue', 859 $this->domain, 860 $cluster, 861 $this->type, 862 $property 863 ); 864 } 865 866 /** 867 * @param array|bool $params 868 * @return string 869 */ 870 protected static function makeBlob( $params ) { 871 if ( $params !== false ) { 872 return serialize( $params ); 873 } else { 874 return ''; 875 } 876 } 877 878 /** 879 * @param stdClass $row 880 * @return RunnableJob|null 881 */ 882 protected function jobFromRow( $row ) { 883 $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; 884 if ( !is_array( $params ) ) { // this shouldn't happen 885 throw new UnexpectedValueException( 886 "Could not unserialize job with ID '{$row->job_id}'." ); 887 } 888 889 $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; 890 $job = $this->factoryJob( $row->job_cmd, $params ); 891 $job->setMetadata( 'id', $row->job_id ); 892 $job->setMetadata( 'timestamp', $row->job_timestamp ); 893 894 return $job; 895 } 896 897 /** 898 * @param DBError $e 899 * @return JobQueueError 900 */ 901 protected function getDBException( DBError $e ) { 902 return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); 903 } 904 905 /** 906 * Return the list of job fields that should be selected. 907 * @since 1.23 908 * @return array 909 */ 910 public static function selectFields() { 911 return [ 912 'job_id', 913 'job_cmd', 914 'job_namespace', 915 'job_title', 916 'job_timestamp', 917 'job_params', 918 'job_random', 919 'job_attempts', 920 'job_token', 921 'job_token_timestamp', 922 'job_sha1', 923 ]; 924 } 925} 926