1<?php 2/** 3 * PHP memory-backed job queue code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 */ 22 23/** 24 * Class to handle job queues stored in PHP memory for testing 25 * 26 * JobQueueGroup does not remember every queue instance, so statically track it here 27 * 28 * @ingroup JobQueue 29 * @since 1.27 30 */ 31class JobQueueMemory extends JobQueue { 32 /** @var array[] */ 33 protected static $data = []; 34 35 public function __construct( array $params ) { 36 $params['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] ); 37 38 parent::__construct( $params ); 39 } 40 41 /** 42 * @see JobQueue::doBatchPush 43 * 44 * @param IJobSpecification[] $jobs 45 * @param int $flags 46 */ 47 protected function doBatchPush( array $jobs, $flags ) { 48 $unclaimed =& $this->getQueueData( 'unclaimed', [] ); 49 50 foreach ( $jobs as $job ) { 51 if ( $job->ignoreDuplicates() ) { 52 $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); 53 if ( !isset( $unclaimed[$sha1] ) ) { 54 $unclaimed[$sha1] = $job; 55 } 56 } else { 57 $unclaimed[] = $job; 58 } 59 } 60 } 61 62 /** 63 * @see JobQueue::supportedOrders 64 * 65 * @return string[] 66 */ 67 protected function supportedOrders() { 68 return [ 'random', 'timestamp', 'fifo' ]; 69 } 70 71 /** 72 * @see JobQueue::optimalOrder 73 * 74 * @return string 75 */ 76 protected function optimalOrder() { 77 return 'fifo'; 78 } 79 80 /** 81 * @see JobQueue::doIsEmpty 82 * 83 * @return bool 84 */ 85 protected function doIsEmpty() { 86 return ( $this->doGetSize() == 0 ); 87 } 88 89 /** 90 * @see JobQueue::doGetSize 91 * 92 * @return int 93 */ 94 protected function doGetSize() { 95 $unclaimed = $this->getQueueData( 'unclaimed' ); 96 97 return $unclaimed ? count( $unclaimed ) : 0; 98 } 99 100 /** 101 * @see JobQueue::doGetAcquiredCount 102 * 103 * @return int 104 */ 105 protected function doGetAcquiredCount() { 106 $claimed = $this->getQueueData( 'claimed' ); 107 108 return $claimed ? count( $claimed ) : 0; 109 } 110 111 /** 112 * @see JobQueue::doPop 113 * 114 * @return RunnableJob|bool 115 */ 116 protected function doPop() { 117 if ( $this->doGetSize() == 0 ) { 118 return false; 119 } 120 121 $unclaimed =& $this->getQueueData( 'unclaimed' ); 122 $claimed =& $this->getQueueData( 'claimed', [] ); 123 124 if ( $this->order === 'random' ) { 125 $key = array_rand( $unclaimed ); 126 } else { 127 reset( $unclaimed ); 128 $key = key( $unclaimed ); 129 } 130 131 $spec = $unclaimed[$key]; 132 unset( $unclaimed[$key] ); 133 $claimed[] = $spec; 134 135 $job = $this->jobFromSpecInternal( $spec ); 136 137 end( $claimed ); 138 $job->setMetadata( 'claimId', key( $claimed ) ); 139 140 return $job; 141 } 142 143 /** 144 * @see JobQueue::doAck 145 * 146 * @param RunnableJob $job 147 */ 148 protected function doAck( RunnableJob $job ) { 149 if ( $this->getAcquiredCount() == 0 ) { 150 return; 151 } 152 153 $claimed =& $this->getQueueData( 'claimed' ); 154 unset( $claimed[$job->getMetadata( 'claimId' )] ); 155 } 156 157 /** 158 * @inheritDoc 159 */ 160 protected function doDelete() { 161 if ( isset( self::$data[$this->type][$this->domain] ) ) { 162 unset( self::$data[$this->type][$this->domain] ); 163 if ( !self::$data[$this->type] ) { 164 unset( self::$data[$this->type] ); 165 } 166 } 167 } 168 169 /** 170 * @see JobQueue::getAllQueuedJobs 171 * 172 * @return Iterator of Job objects. 173 */ 174 public function getAllQueuedJobs() { 175 $unclaimed = $this->getQueueData( 'unclaimed' ); 176 if ( !$unclaimed ) { 177 return new ArrayIterator( [] ); 178 } 179 180 return new MappedIterator( 181 $unclaimed, 182 function ( $value ) { 183 return $this->jobFromSpecInternal( $value ); 184 } 185 ); 186 } 187 188 /** 189 * @see JobQueue::getAllAcquiredJobs 190 * 191 * @return Iterator of Job objects. 192 */ 193 public function getAllAcquiredJobs() { 194 $claimed = $this->getQueueData( 'claimed' ); 195 if ( !$claimed ) { 196 return new ArrayIterator( [] ); 197 } 198 199 return new MappedIterator( 200 $claimed, 201 function ( $value ) { 202 return $this->jobFromSpecInternal( $value ); 203 } 204 ); 205 } 206 207 /** 208 * @param IJobSpecification $spec 209 * @return RunnableJob 210 */ 211 public function jobFromSpecInternal( IJobSpecification $spec ) { 212 return $this->factoryJob( $spec->getType(), $spec->getParams() ); 213 } 214 215 /** 216 * @param string $field 217 * @param mixed|null $init 218 * 219 * @return mixed 220 */ 221 private function &getQueueData( $field, $init = null ) { 222 if ( !isset( self::$data[$this->type][$this->domain][$field] ) ) { 223 if ( $init !== null ) { 224 self::$data[$this->type][$this->domain][$field] = $init; 225 } else { 226 return $init; 227 } 228 } 229 230 return self::$data[$this->type][$this->domain][$field]; 231 } 232} 233