1<?php 2/** 3 * Redis-backed job queue code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 */ 22 23use MediaWiki\Logger\LoggerFactory; 24use Psr\Log\LoggerInterface; 25 26/** 27 * Class to handle job queues stored in Redis 28 * 29 * This is a faster and less resource-intensive job queue than JobQueueDB. 30 * All data for a queue using this class is placed into one redis server. 31 * The mediawiki/services/jobrunner background service must be set up and running. 32 * 33 * There are eight main redis keys (per queue) used to track jobs: 34 * - l-unclaimed : A list of job IDs used for ready unclaimed jobs 35 * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries 36 * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs 37 * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs 38 * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication 39 * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication 40 * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries 41 * - h-data : A hash of (job ID => serialized blobs) for job storage 42 * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. 43 * If an ID appears in any of those lists, it should have a h-data entry for its ID. 44 * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then 45 * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById 46 * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its 47 * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. 48 * 49 * The following keys are used to track queue states: 50 * - s-queuesWithJobs : A set of all queues with non-abandoned jobs 51 * 52 * The background service takes care of undelaying, recycling, and pruning jobs as well as 53 * removing s-queuesWithJobs entries as queues empty. 54 * 55 * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. 56 * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. 57 * All the keys are prefixed with the relevant wiki ID information. 58 * 59 * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. 60 * Additionally, it should be noted that redis has different persistence modes, such 61 * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be 62 * made on the servers based on what queues are using it and what tolerance they have. 63 * 64 * @ingroup JobQueue 65 * @ingroup Redis 66 * @since 1.22 67 */ 68class JobQueueRedis extends JobQueue { 69 /** @var RedisConnectionPool */ 70 protected $redisPool; 71 /** @var LoggerInterface */ 72 protected $logger; 73 74 /** @var string Server address */ 75 protected $server; 76 /** @var string Compression method to use */ 77 protected $compression; 78 79 private const MAX_PUSH_SIZE = 25; // avoid tying up the server 80 81 /** 82 * @param array $params Possible keys: 83 * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). 84 * Note that the serializer option is ignored as "none" is always used. 85 * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. 86 * If a hostname is specified but no port, the standard port number 87 * 6379 will be used. Required. 88 * - compression : The type of compression to use; one of (none,gzip). 89 * - daemonized : Set to true if the redisJobRunnerService runs in the background. 90 * This will disable job recycling/undelaying from the MediaWiki side 91 * to avoid redundance and out-of-sync configuration. 92 * @throws InvalidArgumentException 93 */ 94 public function __construct( array $params ) { 95 parent::__construct( $params ); 96 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua 97 $this->server = $params['redisServer']; 98 $this->compression = $params['compression'] ?? 'none'; 99 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 100 if ( empty( $params['daemonized'] ) ) { 101 throw new InvalidArgumentException( 102 "Non-daemonized mode is no longer supported. Please install the " . 103 "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); 104 } 105 $this->logger = LoggerFactory::getInstance( 'redis' ); 106 } 107 108 protected function supportedOrders() { 109 return [ 'timestamp', 'fifo' ]; 110 } 111 112 protected function optimalOrder() { 113 return 'fifo'; 114 } 115 116 protected function supportsDelayedJobs() { 117 return true; 118 } 119 120 /** 121 * @see JobQueue::doIsEmpty() 122 * @return bool 123 * @throws JobQueueError 124 */ 125 protected function doIsEmpty() { 126 return $this->doGetSize() == 0; 127 } 128 129 /** 130 * @see JobQueue::doGetSize() 131 * @return int 132 * @throws JobQueueError 133 */ 134 protected function doGetSize() { 135 $conn = $this->getConnection(); 136 try { 137 return $conn->lLen( $this->getQueueKey( 'l-unclaimed' ) ); 138 } catch ( RedisException $e ) { 139 throw $this->handleErrorAndMakeException( $conn, $e ); 140 } 141 } 142 143 /** 144 * @see JobQueue::doGetAcquiredCount() 145 * @return int 146 * @throws JobQueueError 147 */ 148 protected function doGetAcquiredCount() { 149 $conn = $this->getConnection(); 150 try { 151 $conn->multi( Redis::PIPELINE ); 152 $conn->zCard( $this->getQueueKey( 'z-claimed' ) ); 153 $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); 154 155 return array_sum( $conn->exec() ); 156 } catch ( RedisException $e ) { 157 throw $this->handleErrorAndMakeException( $conn, $e ); 158 } 159 } 160 161 /** 162 * @see JobQueue::doGetDelayedCount() 163 * @return int 164 * @throws JobQueueError 165 */ 166 protected function doGetDelayedCount() { 167 $conn = $this->getConnection(); 168 try { 169 return $conn->zCard( $this->getQueueKey( 'z-delayed' ) ); 170 } catch ( RedisException $e ) { 171 throw $this->handleErrorAndMakeException( $conn, $e ); 172 } 173 } 174 175 /** 176 * @see JobQueue::doGetAbandonedCount() 177 * @return int 178 * @throws JobQueueError 179 */ 180 protected function doGetAbandonedCount() { 181 $conn = $this->getConnection(); 182 try { 183 return $conn->zCard( $this->getQueueKey( 'z-abandoned' ) ); 184 } catch ( RedisException $e ) { 185 throw $this->handleErrorAndMakeException( $conn, $e ); 186 } 187 } 188 189 /** 190 * @see JobQueue::doBatchPush() 191 * @param IJobSpecification[] $jobs 192 * @param int $flags 193 * @return void 194 * @throws JobQueueError 195 */ 196 protected function doBatchPush( array $jobs, $flags ) { 197 // Convert the jobs into field maps (de-duplicated against each other) 198 $items = []; // (job ID => job fields map) 199 foreach ( $jobs as $job ) { 200 $item = $this->getNewJobFields( $job ); 201 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate 202 $items[$item['sha1']] = $item; 203 } else { 204 $items[$item['uuid']] = $item; 205 } 206 } 207 208 if ( $items === [] ) { 209 return; // nothing to do 210 } 211 212 $conn = $this->getConnection(); 213 try { 214 // Actually push the non-duplicate jobs into the queue... 215 if ( $flags & self::QOS_ATOMIC ) { 216 $batches = [ $items ]; // all or nothing 217 } else { 218 $batches = array_chunk( $items, self::MAX_PUSH_SIZE ); 219 } 220 $failed = 0; 221 $pushed = 0; 222 foreach ( $batches as $itemBatch ) { 223 $added = $this->pushBlobs( $conn, $itemBatch ); 224 if ( is_int( $added ) ) { 225 $pushed += $added; 226 } else { 227 $failed += count( $itemBatch ); 228 } 229 } 230 $this->incrStats( 'inserts', $this->type, count( $items ) ); 231 $this->incrStats( 'inserts_actual', $this->type, $pushed ); 232 $this->incrStats( 'dupe_inserts', $this->type, 233 count( $items ) - $failed - $pushed ); 234 if ( $failed > 0 ) { 235 $err = "Could not insert {$failed} {$this->type} job(s)."; 236 wfDebugLog( 'JobQueueRedis', $err ); 237 throw new RedisException( $err ); 238 } 239 } catch ( RedisException $e ) { 240 throw $this->handleErrorAndMakeException( $conn, $e ); 241 } 242 } 243 244 /** 245 * @param RedisConnRef $conn 246 * @param array[] $items List of results from JobQueueRedis::getNewJobFields() 247 * @return int Number of jobs inserted (duplicates are ignored) 248 * @throws RedisException 249 */ 250 protected function pushBlobs( RedisConnRef $conn, array $items ) { 251 $args = [ $this->encodeQueueName() ]; 252 // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) 253 foreach ( $items as $item ) { 254 $args[] = (string)$item['uuid']; 255 $args[] = (string)$item['sha1']; 256 $args[] = (string)$item['rtimestamp']; 257 $args[] = (string)$this->serialize( $item ); 258 } 259 static $script = 260 /** @lang Lua */ 261<<<LUA 262 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) 263 -- First argument is the queue ID 264 local queueId = ARGV[1] 265 -- Next arguments all come in 4s (one per job) 266 local variadicArgCount = #ARGV - 1 267 if variadicArgCount % 4 ~= 0 then 268 return redis.error_reply('Unmatched arguments') 269 end 270 -- Insert each job into this queue as needed 271 local pushed = 0 272 for i = 2,#ARGV,4 do 273 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] 274 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then 275 if 1*rtimestamp > 0 then 276 -- Insert into delayed queue (release time as score) 277 redis.call('zAdd',kDelayed,rtimestamp,id) 278 else 279 -- Insert into unclaimed queue 280 redis.call('lPush',kUnclaimed,id) 281 end 282 if sha1 ~= '' then 283 redis.call('hSet',kSha1ById,id,sha1) 284 redis.call('hSet',kIdBySha1,sha1,id) 285 end 286 redis.call('hSet',kData,id,blob) 287 pushed = pushed + 1 288 end 289 end 290 -- Mark this queue as having jobs 291 redis.call('sAdd',kQwJobs,queueId) 292 return pushed 293LUA; 294 return $conn->luaEval( $script, 295 array_merge( 296 [ 297 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 298 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 299 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 300 $this->getQueueKey( 'z-delayed' ), # KEYS[4] 301 $this->getQueueKey( 'h-data' ), # KEYS[5] 302 $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] 303 ], 304 $args 305 ), 306 6 # number of first argument(s) that are keys 307 ); 308 } 309 310 /** 311 * @see JobQueue::doPop() 312 * @return RunnableJob|bool 313 * @throws JobQueueError 314 */ 315 protected function doPop() { 316 $job = false; 317 318 $conn = $this->getConnection(); 319 try { 320 do { 321 $blob = $this->popAndAcquireBlob( $conn ); 322 if ( !is_string( $blob ) ) { 323 break; // no jobs; nothing to do 324 } 325 326 $this->incrStats( 'pops', $this->type ); 327 $item = $this->unserialize( $blob ); 328 if ( $item === false ) { 329 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); 330 continue; 331 } 332 333 // If $item is invalid, the runner loop recyling will cleanup as needed 334 $job = $this->getJobFromFields( $item ); // may be false 335 } while ( !$job ); // job may be false if invalid 336 } catch ( RedisException $e ) { 337 throw $this->handleErrorAndMakeException( $conn, $e ); 338 } 339 340 return $job; 341 } 342 343 /** 344 * @param RedisConnRef $conn 345 * @return array Serialized string or false 346 * @throws RedisException 347 */ 348 protected function popAndAcquireBlob( RedisConnRef $conn ) { 349 static $script = 350 /** @lang Lua */ 351<<<LUA 352 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) 353 local rTime = unpack(ARGV) 354 -- Pop an item off the queue 355 local id = redis.call('rPop',kUnclaimed) 356 if not id then 357 return false 358 end 359 -- Allow new duplicates of this job 360 local sha1 = redis.call('hGet',kSha1ById,id) 361 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 362 redis.call('hDel',kSha1ById,id) 363 -- Mark the jobs as claimed and return it 364 redis.call('zAdd',kClaimed,rTime,id) 365 redis.call('hIncrBy',kAttempts,id,1) 366 return redis.call('hGet',kData,id) 367LUA; 368 return $conn->luaEval( $script, 369 [ 370 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 371 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 372 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 373 $this->getQueueKey( 'z-claimed' ), # KEYS[4] 374 $this->getQueueKey( 'h-attempts' ), # KEYS[5] 375 $this->getQueueKey( 'h-data' ), # KEYS[6] 376 time(), # ARGV[1] (injected to be replication-safe) 377 ], 378 6 # number of first argument(s) that are keys 379 ); 380 } 381 382 /** 383 * @see JobQueue::doAck() 384 * @param RunnableJob $job 385 * @return RunnableJob|bool 386 * @throws UnexpectedValueException 387 * @throws JobQueueError 388 */ 389 protected function doAck( RunnableJob $job ) { 390 $uuid = $job->getMetadata( 'uuid' ); 391 if ( $uuid === null ) { 392 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); 393 } 394 395 $conn = $this->getConnection(); 396 try { 397 static $script = 398 /** @lang Lua */ 399<<<LUA 400 local kClaimed, kAttempts, kData = unpack(KEYS) 401 local id = unpack(ARGV) 402 -- Unmark the job as claimed 403 local removed = redis.call('zRem',kClaimed,id) 404 -- Check if the job was recycled 405 if removed == 0 then 406 return 0 407 end 408 -- Delete the retry data 409 redis.call('hDel',kAttempts,id) 410 -- Delete the job data itself 411 return redis.call('hDel',kData,id) 412LUA; 413 $res = $conn->luaEval( $script, 414 [ 415 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 416 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 417 $this->getQueueKey( 'h-data' ), # KEYS[3] 418 $uuid # ARGV[1] 419 ], 420 3 # number of first argument(s) that are keys 421 ); 422 423 if ( !$res ) { 424 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); 425 426 return false; 427 } 428 429 $this->incrStats( 'acks', $this->type ); 430 } catch ( RedisException $e ) { 431 throw $this->handleErrorAndMakeException( $conn, $e ); 432 } 433 434 return true; 435 } 436 437 /** 438 * @see JobQueue::doDeduplicateRootJob() 439 * @param IJobSpecification $job 440 * @return bool 441 * @throws JobQueueError 442 * @throws LogicException 443 */ 444 protected function doDeduplicateRootJob( IJobSpecification $job ) { 445 if ( !$job->hasRootJobParams() ) { 446 throw new LogicException( "Cannot register root job; missing parameters." ); 447 } 448 $params = $job->getRootJobParams(); 449 450 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 451 452 $conn = $this->getConnection(); 453 try { 454 $timestamp = $conn->get( $key ); // last known timestamp of such a root job 455 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 456 return true; // a newer version of this root job was enqueued 457 } 458 459 // Update the timestamp of the last root job started at the location... 460 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks 461 } catch ( RedisException $e ) { 462 throw $this->handleErrorAndMakeException( $conn, $e ); 463 } 464 } 465 466 /** 467 * @see JobQueue::doIsRootJobOldDuplicate() 468 * @param IJobSpecification $job 469 * @return bool 470 * @throws JobQueueError 471 */ 472 protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { 473 if ( !$job->hasRootJobParams() ) { 474 return false; // job has no de-deplication info 475 } 476 $params = $job->getRootJobParams(); 477 478 $conn = $this->getConnection(); 479 try { 480 // Get the last time this root job was enqueued 481 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); 482 } catch ( RedisException $e ) { 483 throw $this->handleErrorAndMakeException( $conn, $e ); 484 } 485 486 // Check if a new root job was started at the location after this one's... 487 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 488 } 489 490 /** 491 * @see JobQueue::doDelete() 492 * @return bool 493 * @throws JobQueueError 494 */ 495 protected function doDelete() { 496 static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', 497 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; 498 499 $conn = $this->getConnection(); 500 try { 501 $keys = []; 502 foreach ( $props as $prop ) { 503 $keys[] = $this->getQueueKey( $prop ); 504 } 505 506 $ok = ( $conn->del( $keys ) !== false ); 507 $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); 508 509 return $ok; 510 } catch ( RedisException $e ) { 511 throw $this->handleErrorAndMakeException( $conn, $e ); 512 } 513 } 514 515 /** 516 * @see JobQueue::getAllQueuedJobs() 517 * @return Iterator 518 * @throws JobQueueError 519 */ 520 public function getAllQueuedJobs() { 521 $conn = $this->getConnection(); 522 try { 523 $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); 524 } catch ( RedisException $e ) { 525 throw $this->handleErrorAndMakeException( $conn, $e ); 526 } 527 528 return $this->getJobIterator( $conn, $uids ); 529 } 530 531 /** 532 * @see JobQueue::getAllDelayedJobs() 533 * @return Iterator 534 * @throws JobQueueError 535 */ 536 public function getAllDelayedJobs() { 537 $conn = $this->getConnection(); 538 try { 539 $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); 540 } catch ( RedisException $e ) { 541 throw $this->handleErrorAndMakeException( $conn, $e ); 542 } 543 544 return $this->getJobIterator( $conn, $uids ); 545 } 546 547 /** 548 * @see JobQueue::getAllAcquiredJobs() 549 * @return Iterator 550 * @throws JobQueueError 551 */ 552 public function getAllAcquiredJobs() { 553 $conn = $this->getConnection(); 554 try { 555 $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); 556 } catch ( RedisException $e ) { 557 throw $this->handleErrorAndMakeException( $conn, $e ); 558 } 559 560 return $this->getJobIterator( $conn, $uids ); 561 } 562 563 /** 564 * @see JobQueue::getAllAbandonedJobs() 565 * @return Iterator 566 * @throws JobQueueError 567 */ 568 public function getAllAbandonedJobs() { 569 $conn = $this->getConnection(); 570 try { 571 $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); 572 } catch ( RedisException $e ) { 573 throw $this->handleErrorAndMakeException( $conn, $e ); 574 } 575 576 return $this->getJobIterator( $conn, $uids ); 577 } 578 579 /** 580 * @param RedisConnRef $conn 581 * @param array $uids List of job UUIDs 582 * @return MappedIterator 583 */ 584 protected function getJobIterator( RedisConnRef $conn, array $uids ) { 585 return new MappedIterator( 586 $uids, 587 function ( $uid ) use ( $conn ) { 588 return $this->getJobFromUidInternal( $uid, $conn ); 589 }, 590 [ 'accept' => static function ( $job ) { 591 return is_object( $job ); 592 } ] 593 ); 594 } 595 596 public function getCoalesceLocationInternal() { 597 return "RedisServer:" . $this->server; 598 } 599 600 protected function doGetSiblingQueuesWithJobs( array $types ) { 601 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); 602 } 603 604 protected function doGetSiblingQueueSizes( array $types ) { 605 $sizes = []; // (type => size) 606 $types = array_values( $types ); // reindex 607 $conn = $this->getConnection(); 608 try { 609 $conn->multi( Redis::PIPELINE ); 610 foreach ( $types as $type ) { 611 $conn->lLen( $this->getQueueKey( 'l-unclaimed', $type ) ); 612 } 613 $res = $conn->exec(); 614 if ( is_array( $res ) ) { 615 foreach ( $res as $i => $size ) { 616 $sizes[$types[$i]] = $size; 617 } 618 } 619 } catch ( RedisException $e ) { 620 throw $this->handleErrorAndMakeException( $conn, $e ); 621 } 622 623 return $sizes; 624 } 625 626 /** 627 * This function should not be called outside JobQueueRedis 628 * 629 * @param string $uid 630 * @param RedisConnRef|Redis $conn 631 * @return RunnableJob|bool Returns false if the job does not exist 632 * @throws JobQueueError 633 * @throws UnexpectedValueException 634 */ 635 public function getJobFromUidInternal( $uid, $conn ) { 636 try { 637 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); 638 if ( $data === false ) { 639 return false; // not found 640 } 641 $item = $this->unserialize( $data ); 642 if ( !is_array( $item ) ) { // this shouldn't happen 643 throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." ); 644 } 645 646 $params = $item['params']; 647 $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ]; 648 $job = $this->factoryJob( $item['type'], $params ); 649 $job->setMetadata( 'uuid', $item['uuid'] ); 650 $job->setMetadata( 'timestamp', $item['timestamp'] ); 651 // Add in attempt count for debugging at showJobs.php 652 $job->setMetadata( 'attempts', 653 $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) ); 654 655 return $job; 656 } catch ( RedisException $e ) { 657 throw $this->handleErrorAndMakeException( $conn, $e ); 658 } 659 } 660 661 /** 662 * @return array List of (wiki,type) tuples for queues with non-abandoned jobs 663 * @throws JobQueueConnectionError 664 * @throws JobQueueError 665 */ 666 public function getServerQueuesWithJobs() { 667 $queues = []; 668 669 $conn = $this->getConnection(); 670 try { 671 $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); 672 foreach ( $set as $queue ) { 673 $queues[] = $this->decodeQueueName( $queue ); 674 } 675 } catch ( RedisException $e ) { 676 throw $this->handleErrorAndMakeException( $conn, $e ); 677 } 678 679 return $queues; 680 } 681 682 /** 683 * @param IJobSpecification $job 684 * @return array 685 */ 686 protected function getNewJobFields( IJobSpecification $job ) { 687 return [ 688 // Fields that describe the nature of the job 689 'type' => $job->getType(), 690 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, 691 'title' => $job->getParams()['title'] ?? '', 692 'params' => $job->getParams(), 693 // Some jobs cannot run until a "release timestamp" 694 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, 695 // Additional job metadata 696 'uuid' => $this->idGenerator->newRawUUIDv4(), 697 'sha1' => $job->ignoreDuplicates() 698 ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) 699 : '', 700 'timestamp' => time() // UNIX timestamp 701 ]; 702 } 703 704 /** 705 * @param array $fields 706 * @return RunnableJob|bool 707 */ 708 protected function getJobFromFields( array $fields ) { 709 $params = $fields['params']; 710 $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ]; 711 712 $job = $this->factoryJob( $fields['type'], $params ); 713 $job->setMetadata( 'uuid', $fields['uuid'] ); 714 $job->setMetadata( 'timestamp', $fields['timestamp'] ); 715 716 return $job; 717 } 718 719 /** 720 * @param array $fields 721 * @return string Serialized and possibly compressed version of $fields 722 */ 723 protected function serialize( array $fields ) { 724 $blob = serialize( $fields ); 725 if ( $this->compression === 'gzip' 726 && strlen( $blob ) >= 1024 727 && function_exists( 'gzdeflate' ) 728 ) { 729 $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; 730 $blobz = serialize( $object ); 731 732 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; 733 } else { 734 return $blob; 735 } 736 } 737 738 /** 739 * @param string $blob 740 * @return array|bool Unserialized version of $blob or false 741 */ 742 protected function unserialize( $blob ) { 743 $fields = unserialize( $blob ); 744 if ( is_object( $fields ) ) { 745 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { 746 $fields = unserialize( gzinflate( $fields->blob ) ); 747 } else { 748 $fields = false; 749 } 750 } 751 752 return is_array( $fields ) ? $fields : false; 753 } 754 755 /** 756 * Get a connection to the server that handles all sub-queues for this queue 757 * 758 * @return RedisConnRef|Redis 759 * @throws JobQueueConnectionError 760 */ 761 protected function getConnection() { 762 $conn = $this->redisPool->getConnection( $this->server, $this->logger ); 763 if ( !$conn ) { 764 throw new JobQueueConnectionError( 765 "Unable to connect to redis server {$this->server}." ); 766 } 767 768 return $conn; 769 } 770 771 /** 772 * @param RedisConnRef $conn 773 * @param RedisException $e 774 * @return JobQueueError 775 */ 776 protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) { 777 $this->redisPool->handleError( $conn, $e ); 778 return new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); 779 } 780 781 /** 782 * @return string JSON 783 */ 784 private function encodeQueueName() { 785 return json_encode( [ $this->type, $this->domain ] ); 786 } 787 788 /** 789 * @param string $name JSON 790 * @return array (type, wiki) 791 */ 792 private function decodeQueueName( $name ) { 793 return json_decode( $name ); 794 } 795 796 /** 797 * @param string $name 798 * @return string 799 */ 800 private function getGlobalKey( $name ) { 801 $parts = [ 'global', 'jobqueue', $name ]; 802 foreach ( $parts as $part ) { 803 if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { 804 throw new InvalidArgumentException( "Key part characters are out of range." ); 805 } 806 } 807 808 return implode( ':', $parts ); 809 } 810 811 /** 812 * @param string $prop 813 * @param string|null $type Override this for sibling queues 814 * @return string 815 */ 816 private function getQueueKey( $prop, $type = null ) { 817 $type = is_string( $type ) ? $type : $this->type; 818 819 // Use wiki ID for b/c 820 $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain ); 821 822 $parts = [ $keyspace, 'jobqueue', $type, $prop ]; 823 824 // Parts are typically ASCII, but encode for sanity to escape ":" 825 return implode( ':', array_map( 'rawurlencode', $parts ) ); 826 } 827} 828