1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 * @ingroup JobQueue
20 */
21use MediaWiki\MediaWikiServices;
22
23/**
24 * Job for pruning recent changes
25 *
26 * @ingroup JobQueue
27 * @since 1.25
28 */
29class RecentChangesUpdateJob extends Job {
30	public function __construct( Title $title, array $params ) {
31		parent::__construct( 'recentChangesUpdate', $title, $params );
32
33		if ( !isset( $params['type'] ) ) {
34			throw new Exception( "Missing 'type' parameter." );
35		}
36
37		$this->executionFlags |= self::JOB_NO_EXPLICIT_TRX_ROUND;
38		$this->removeDuplicates = true;
39	}
40
41	/**
42	 * @return RecentChangesUpdateJob
43	 */
44	final public static function newPurgeJob() {
45		return new self(
46			SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'purge' ]
47		);
48	}
49
50	/**
51	 * @return RecentChangesUpdateJob
52	 * @since 1.26
53	 */
54	final public static function newCacheUpdateJob() {
55		return new self(
56			SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'cacheUpdate' ]
57		);
58	}
59
60	public function run() {
61		if ( $this->params['type'] === 'purge' ) {
62			$this->purgeExpiredRows();
63		} elseif ( $this->params['type'] === 'cacheUpdate' ) {
64			$this->updateActiveUsers();
65		} else {
66			throw new InvalidArgumentException(
67				"Invalid 'type' parameter '{$this->params['type']}'." );
68		}
69
70		return true;
71	}
72
73	protected function purgeExpiredRows() {
74		global $wgRCMaxAge, $wgUpdateRowsPerQuery;
75
76		$dbw = wfGetDB( DB_MASTER );
77		$lockKey = $dbw->getDomainID() . ':recentchanges-prune';
78		if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) {
79			// already in progress
80			return;
81		}
82
83		$factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
84		$ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
85		$cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
86		$rcQuery = RecentChange::getQueryInfo();
87		do {
88			$rcIds = [];
89			$rows = [];
90			$res = $dbw->select(
91				$rcQuery['tables'],
92				$rcQuery['fields'],
93				[ 'rc_timestamp < ' . $dbw->addQuotes( $cutoff ) ],
94				__METHOD__,
95				[ 'LIMIT' => $wgUpdateRowsPerQuery ],
96				$rcQuery['joins']
97			);
98			foreach ( $res as $row ) {
99				$rcIds[] = $row->rc_id;
100				$rows[] = $row;
101			}
102			if ( $rcIds ) {
103				$dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
104				Hooks::runner()->onRecentChangesPurgeRows( $rows );
105				// There might be more, so try waiting for replica DBs
106				if ( !$factory->commitAndWaitForReplication(
107					__METHOD__, $ticket, [ 'timeout' => 3 ]
108				) ) {
109					// Another job will continue anyway
110					break;
111				}
112			}
113		} while ( $rcIds );
114
115		$dbw->unlock( $lockKey, __METHOD__ );
116	}
117
118	protected function updateActiveUsers() {
119		global $wgActiveUserDays;
120
121		// Users that made edits at least this many days ago are "active"
122		$days = $wgActiveUserDays;
123		// Pull in the full window of active users in this update
124		$window = $wgActiveUserDays * 86400;
125
126		$dbw = wfGetDB( DB_MASTER );
127		$factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
128		$ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
129
130		$lockKey = $dbw->getDomainID() . '-activeusers';
131		if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) {
132			// Exclusive update (avoids duplicate entries)… it's usually fine to just
133			// drop out here, if the Job is already running.
134			return;
135		}
136
137		// Long-running queries expected
138		$dbw->setSessionOptions( [ 'connTimeout' => 900 ] );
139
140		$nowUnix = time();
141		// Get the last-updated timestamp for the cache
142		$cTime = $dbw->selectField( 'querycache_info',
143			'qci_timestamp',
144			[ 'qci_type' => 'activeusers' ],
145			__METHOD__
146		);
147		$cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1;
148
149		// Pick the date range to fetch from. This is normally from the last
150		// update to till the present time, but has a limited window for sanity.
151		// If the window is limited, multiple runs are need to fully populate it.
152		$sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 );
153		$eTimestamp = min( $sTimestamp + $window, $nowUnix );
154
155		// Get all the users active since the last update
156		$actorQuery = ActorMigration::newMigration()->getJoin( 'rc_user' );
157		$res = $dbw->select(
158			[ 'recentchanges' ] + $actorQuery['tables'],
159			[
160				'rc_user_text' => $actorQuery['fields']['rc_user_text'],
161				'lastedittime' => 'MAX(rc_timestamp)'
162			],
163			[
164				$actorQuery['fields']['rc_user'] . ' > 0', // actual accounts
165				'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata
166				'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ),
167				'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ),
168				'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) )
169			],
170			__METHOD__,
171			[
172				'GROUP BY' => [ $actorQuery['fields']['rc_user_text'] ],
173				'ORDER BY' => 'NULL' // avoid filesort
174			],
175			$actorQuery['joins']
176		);
177		$names = [];
178		foreach ( $res as $row ) {
179			$names[$row->rc_user_text] = $row->lastedittime;
180		}
181
182		// Find which of the recently active users are already accounted for
183		if ( count( $names ) ) {
184			$res = $dbw->select( 'querycachetwo',
185				[ 'user_name' => 'qcc_title' ],
186				[
187					'qcc_type' => 'activeusers',
188					'qcc_namespace' => NS_USER,
189					'qcc_title' => array_map( 'strval', array_keys( $names ) ),
190					'qcc_value >= ' . $dbw->addQuotes( $nowUnix - $days * 86400 ), // TS_UNIX
191				 ],
192				__METHOD__
193			);
194			// Note: In order for this to be actually consistent, we would need
195			// to update these rows with the new lastedittime.
196			foreach ( $res as $row ) {
197				unset( $names[$row->user_name] );
198			}
199		}
200
201		// Insert the users that need to be added to the list
202		if ( count( $names ) ) {
203			$newRows = [];
204			foreach ( $names as $name => $lastEditTime ) {
205				$newRows[] = [
206					'qcc_type' => 'activeusers',
207					'qcc_namespace' => NS_USER,
208					'qcc_title' => $name,
209					'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ),
210					'qcc_namespacetwo' => 0, // unused
211					'qcc_titletwo' => '' // unused
212				];
213			}
214			foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) {
215				$dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ );
216				$factory->commitAndWaitForReplication( __METHOD__, $ticket );
217			}
218		}
219
220		// If a transaction was already started, it might have an old
221		// snapshot, so kludge the timestamp range back as needed.
222		$asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() );
223
224		// Touch the data freshness timestamp
225		$dbw->replace(
226			'querycache_info',
227			'qci_type',
228			[ 'qci_type' => 'activeusers',
229				'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ], // not always $now
230			__METHOD__
231		);
232
233		$dbw->unlock( $lockKey, __METHOD__ );
234
235		// Rotate out users that have not edited in too long (according to old data set)
236		$dbw->delete( 'querycachetwo',
237			[
238				'qcc_type' => 'activeusers',
239				'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX
240			],
241			__METHOD__
242		);
243	}
244}
245