1<?php 2/** 3 * Adds blobs from a given external storage cluster to the blob_tracking table. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @ingroup Maintenance 22 */ 23 24use MediaWiki\MediaWikiServices; 25use MediaWiki\Revision\SlotRecord; 26use Wikimedia\Rdbms\DBConnectionError; 27 28require __DIR__ . '/../CommandLineInc.php'; 29 30if ( count( $args ) < 1 ) { 31 echo "Usage: php trackBlobs.php <cluster> [... <cluster>]\n"; 32 echo "Adds blobs from a given ES cluster to the blob_tracking table\n"; 33 echo "Automatically deletes the tracking table and starts from the start again when restarted.\n"; 34 35 exit( 1 ); 36} 37$tracker = new TrackBlobs( $args ); 38$tracker->run(); 39echo "All done.\n"; 40 41class TrackBlobs { 42 public $clusters, $textClause; 43 public $doBlobOrphans; 44 public $trackedBlobs = []; 45 46 public $batchSize = 1000; 47 public $reportingInterval = 10; 48 49 public function __construct( $clusters ) { 50 $this->clusters = $clusters; 51 if ( extension_loaded( 'gmp' ) ) { 52 $this->doBlobOrphans = true; 53 foreach ( $clusters as $cluster ) { 54 $this->trackedBlobs[$cluster] = gmp_init( 0 ); 55 } 56 } else { 57 echo "Warning: the gmp extension is needed to find orphan blobs\n"; 58 } 59 } 60 61 public function run() { 62 $this->checkIntegrity(); 63 $this->initTrackingTable(); 64 $this->trackRevisions(); 65 $this->trackOrphanText(); 66 if ( $this->doBlobOrphans ) { 67 $this->findOrphanBlobs(); 68 } 69 } 70 71 private function checkIntegrity() { 72 echo "Doing integrity check...\n"; 73 $dbr = wfGetDB( DB_REPLICA ); 74 75 // Scan for HistoryBlobStub objects in the text table (T22757) 76 77 $exists = $dbr->selectField( 'text', '1', 78 'old_flags LIKE \'%object%\' AND old_flags NOT LIKE \'%external%\' ' . 79 'AND LOWER(CONVERT(LEFT(old_text,22) USING latin1)) = \'o:15:"historyblobstub"\'', 80 __METHOD__ 81 ); 82 83 if ( $exists ) { 84 echo "Integrity check failed: found HistoryBlobStub objects in your text table.\n" . 85 "This script could destroy these objects if it continued. Run resolveStubs.php\n" . 86 "to fix this.\n"; 87 exit( 1 ); 88 } 89 90 echo "Integrity check OK\n"; 91 } 92 93 private function initTrackingTable() { 94 $dbw = wfGetDB( DB_MASTER ); 95 if ( $dbw->tableExists( 'blob_tracking', __METHOD__ ) ) { 96 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ), __METHOD__ ); 97 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ), __METHOD__ ); 98 } 99 $dbw->sourceFile( __DIR__ . '/blob_tracking.sql' ); 100 } 101 102 private function getTextClause() { 103 if ( !$this->textClause ) { 104 $dbr = wfGetDB( DB_REPLICA ); 105 $this->textClause = ''; 106 foreach ( $this->clusters as $cluster ) { 107 if ( $this->textClause != '' ) { 108 $this->textClause .= ' OR '; 109 } 110 $this->textClause .= 'old_text' . $dbr->buildLike( "DB://$cluster/", $dbr->anyString() ); 111 } 112 } 113 114 return $this->textClause; 115 } 116 117 private function interpretPointer( $text ) { 118 if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) { 119 return false; 120 } 121 122 return [ 123 'cluster' => $m[1], 124 'id' => intval( $m[2] ), 125 'hash' => $m[3] ?? null 126 ]; 127 } 128 129 /** 130 * Scan the revision table for rows stored in the specified clusters 131 */ 132 private function trackRevisions() { 133 $dbw = wfGetDB( DB_MASTER ); 134 $dbr = wfGetDB( DB_REPLICA ); 135 136 $textClause = $this->getTextClause(); 137 $startId = 0; 138 $endId = (int)$dbr->selectField( 'revision', 'MAX(rev_id)', '', __METHOD__ ); 139 $batchesDone = 0; 140 $rowsInserted = 0; 141 142 echo "Finding revisions...\n"; 143 144 $fields = [ 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ]; 145 $options = [ 146 'ORDER BY' => 'rev_id', 147 'LIMIT' => $this->batchSize 148 ]; 149 $conds = [ 150 $textClause, 151 'old_flags ' . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ), 152 ]; 153 $slotRoleStore = MediaWikiServices::getInstance()->getSlotRoleStore(); 154 $tables = [ 'revision', 'slots', 'content', 'text' ]; 155 $conds = array_merge( [ 156 'rev_id=slot_revision_id', 157 'slot_role_id=' . $slotRoleStore->getId( SlotRecord::MAIN ), 158 'content_id=slot_content_id', 159 'SUBSTRING(content_address, 1, 3)=' . $dbr->addQuotes( 'tt:' ), 160 'SUBSTRING(content_address, 4)=old_id', 161 ], $conds ); 162 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); 163 164 while ( true ) { 165 $res = $dbr->select( $tables, 166 $fields, 167 array_merge( [ 168 'rev_id > ' . $dbr->addQuotes( $startId ), 169 ], $conds ), 170 __METHOD__, 171 $options 172 ); 173 if ( !$res->numRows() ) { 174 break; 175 } 176 177 $insertBatch = []; 178 foreach ( $res as $row ) { 179 $startId = (int)$row->rev_id; 180 $info = $this->interpretPointer( $row->old_text ); 181 if ( !$info ) { 182 echo "Invalid DB:// URL in rev_id {$row->rev_id}\n"; 183 continue; 184 } 185 if ( !in_array( $info['cluster'], $this->clusters ) ) { 186 echo "Invalid cluster returned in SQL query: {$info['cluster']}\n"; 187 continue; 188 } 189 $insertBatch[] = [ 190 'bt_page' => $row->rev_page, 191 'bt_rev_id' => $row->rev_id, 192 'bt_text_id' => $row->old_id, 193 'bt_cluster' => $info['cluster'], 194 'bt_blob_id' => $info['id'], 195 'bt_cgz_hash' => $info['hash'] 196 ]; 197 if ( $this->doBlobOrphans ) { 198 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] ); 199 } 200 } 201 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); 202 $rowsInserted += count( $insertBatch ); 203 204 ++$batchesDone; 205 if ( $batchesDone >= $this->reportingInterval ) { 206 $batchesDone = 0; 207 echo "$startId / $endId\n"; 208 $lbFactory->waitForReplication(); 209 } 210 } 211 echo "Found $rowsInserted revisions\n"; 212 } 213 214 /** 215 * Scan the text table for orphan text 216 * Orphan text here does not imply DB corruption -- deleted text tracked by the 217 * archive table counts as orphan for our purposes. 218 */ 219 private function trackOrphanText() { 220 # Wait until the blob_tracking table is available in the replica DB 221 $dbw = wfGetDB( DB_MASTER ); 222 $dbr = wfGetDB( DB_REPLICA ); 223 $pos = $dbw->getMasterPos(); 224 $dbr->masterPosWait( $pos, 100000 ); 225 226 $textClause = $this->getTextClause(); 227 $startId = 0; 228 $endId = (int)$dbr->selectField( 'text', 'MAX(old_id)', '', __METHOD__ ); 229 $rowsInserted = 0; 230 $batchesDone = 0; 231 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); 232 233 echo "Finding orphan text...\n"; 234 235 # Scan the text table for orphan text 236 while ( true ) { 237 $res = $dbr->select( [ 'text', 'blob_tracking' ], 238 [ 'old_id', 'old_flags', 'old_text' ], 239 [ 240 'old_id>' . $dbr->addQuotes( $startId ), 241 $textClause, 242 'old_flags ' . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ), 243 'bt_text_id IS NULL' 244 ], 245 __METHOD__, 246 [ 247 'ORDER BY' => 'old_id', 248 'LIMIT' => $this->batchSize 249 ], 250 [ 'blob_tracking' => [ 'LEFT JOIN', 'bt_text_id=old_id' ] ] 251 ); 252 $ids = []; 253 foreach ( $res as $row ) { 254 $ids[] = $row->old_id; 255 } 256 257 if ( !$res->numRows() ) { 258 break; 259 } 260 261 $insertBatch = []; 262 foreach ( $res as $row ) { 263 $startId = (int)$row->old_id; 264 $info = $this->interpretPointer( $row->old_text ); 265 if ( !$info ) { 266 echo "Invalid DB:// URL in old_id {$row->old_id}\n"; 267 continue; 268 } 269 if ( !in_array( $info['cluster'], $this->clusters ) ) { 270 echo "Invalid cluster returned in SQL query\n"; 271 continue; 272 } 273 274 $insertBatch[] = [ 275 'bt_page' => 0, 276 'bt_rev_id' => 0, 277 'bt_text_id' => $row->old_id, 278 'bt_cluster' => $info['cluster'], 279 'bt_blob_id' => $info['id'], 280 'bt_cgz_hash' => $info['hash'] 281 ]; 282 if ( $this->doBlobOrphans ) { 283 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] ); 284 } 285 } 286 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); 287 288 $rowsInserted += count( $insertBatch ); 289 ++$batchesDone; 290 if ( $batchesDone >= $this->reportingInterval ) { 291 $batchesDone = 0; 292 echo "$startId / $endId\n"; 293 $lbFactory->waitForReplication(); 294 } 295 } 296 echo "Found $rowsInserted orphan text rows\n"; 297 } 298 299 /** 300 * Scan the blobs table for rows not registered in blob_tracking (and thus not 301 * registered in the text table). 302 * 303 * Orphan blobs are indicative of DB corruption. They are inaccessible and 304 * should probably be deleted. 305 */ 306 private function findOrphanBlobs() { 307 if ( !extension_loaded( 'gmp' ) ) { 308 echo "Can't find orphan blobs, need bitfield support provided by GMP.\n"; 309 310 return; 311 } 312 313 $dbw = wfGetDB( DB_MASTER ); 314 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); 315 316 foreach ( $this->clusters as $cluster ) { 317 echo "Searching for orphan blobs in $cluster...\n"; 318 $lb = $lbFactory->getExternalLB( $cluster ); 319 try { 320 $extDB = $lb->getMaintenanceConnectionRef( DB_REPLICA ); 321 } catch ( DBConnectionError $e ) { 322 if ( strpos( $e->getMessage(), 'Unknown database' ) !== false ) { 323 echo "No database on $cluster\n"; 324 } else { 325 echo "Error on $cluster: " . $e->getMessage() . "\n"; 326 } 327 continue; 328 } 329 $table = $extDB->getLBInfo( 'blobs table' ); 330 if ( $table === null ) { 331 $table = 'blobs'; 332 } 333 if ( !$extDB->tableExists( $table, __METHOD__ ) ) { 334 echo "No blobs table on cluster $cluster\n"; 335 continue; 336 } 337 $startId = 0; 338 $batchesDone = 0; 339 $actualBlobs = gmp_init( 0 ); 340 $endId = (int)$extDB->selectField( $table, 'MAX(blob_id)', '', __METHOD__ ); 341 342 // Build a bitmap of actual blob rows 343 while ( true ) { 344 $res = $extDB->select( $table, 345 [ 'blob_id' ], 346 [ 'blob_id > ' . $extDB->addQuotes( $startId ) ], 347 __METHOD__, 348 [ 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' ] 349 ); 350 351 if ( !$res->numRows() ) { 352 break; 353 } 354 355 foreach ( $res as $row ) { 356 gmp_setbit( $actualBlobs, $row->blob_id ); 357 $startId = (int)$row->blob_id; 358 } 359 360 ++$batchesDone; 361 if ( $batchesDone >= $this->reportingInterval ) { 362 $batchesDone = 0; 363 echo "$startId / $endId\n"; 364 } 365 } 366 367 // Find actual blobs that weren't tracked by the previous passes 368 // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B 369 $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) ); 370 371 // Traverse the orphan list 372 $insertBatch = []; 373 $id = 0; 374 $numOrphans = 0; 375 while ( true ) { 376 $id = gmp_scan1( $orphans, $id ); 377 if ( $id == -1 ) { 378 break; 379 } 380 $insertBatch[] = [ 381 'bo_cluster' => $cluster, 382 'bo_blob_id' => $id 383 ]; 384 if ( count( $insertBatch ) > $this->batchSize ) { 385 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ ); 386 $insertBatch = []; 387 } 388 389 ++$id; 390 ++$numOrphans; 391 } 392 if ( $insertBatch ) { 393 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ ); 394 } 395 echo "Found $numOrphans orphan(s) in $cluster\n"; 396 } 397 } 398} 399