1<?php
2
3namespace MediaWiki\Page;
4
5use DBAccessObjectUtils;
6use EmptyIterator;
7use InvalidArgumentException;
8use Iterator;
9use LinkCache;
10use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
11use MalformedTitleException;
12use MediaWiki\Config\ServiceOptions;
13use MediaWiki\DAO\WikiAwareEntity;
14use MediaWiki\Linker\LinkTarget;
15use NamespaceInfo;
16use NullStatsdDataFactory;
17use stdClass;
18use TitleParser;
19use Wikimedia\Assert\Assert;
20use Wikimedia\Rdbms\IDatabase;
21use Wikimedia\Rdbms\ILoadBalancer;
22use Wikimedia\Rdbms\SelectQueryBuilder;
23
24/**
25 * @since 1.36
26 *
27 * @unstable
28 */
29class PageStore implements PageLookup {
30
31	/** @var ServiceOptions */
32	private $options;
33
34	/** @var ILoadBalancer */
35	private $dbLoadBalancer;
36
37	/** @var NamespaceInfo */
38	private $namespaceInfo;
39
40	/** @var TitleParser */
41	private $titleParser;
42
43	/** @var LinkCache|null */
44	private $linkCache;
45
46	/** @var StatsdDataFactoryInterface */
47	private $stats;
48
49	/** @var string|false */
50	private $wikiId;
51
52	/**
53	 * @internal for use by service wiring
54	 */
55	public const CONSTRUCTOR_OPTIONS = [
56		'PageLanguageUseDB',
57	];
58
59	/**
60	 * @param ServiceOptions $options
61	 * @param ILoadBalancer $dbLoadBalancer
62	 * @param NamespaceInfo $namespaceInfo
63	 * @param TitleParser $titleParser
64	 * @param ?LinkCache $linkCache
65	 * @param ?StatsdDataFactoryInterface $stats
66	 * @param false|string $wikiId
67	 */
68	public function __construct(
69		ServiceOptions $options,
70		ILoadBalancer $dbLoadBalancer,
71		NamespaceInfo $namespaceInfo,
72		TitleParser $titleParser,
73		?LinkCache $linkCache,
74		?StatsdDataFactoryInterface $stats,
75		$wikiId = WikiAwareEntity::LOCAL
76	) {
77		$options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
78
79		$this->options = $options;
80		$this->dbLoadBalancer = $dbLoadBalancer;
81		$this->namespaceInfo = $namespaceInfo;
82		$this->titleParser = $titleParser;
83		$this->wikiId = $wikiId;
84		$this->linkCache = $linkCache;
85		$this->stats = $stats ?: new NullStatsdDataFactory();
86
87		if ( $wikiId !== WikiAwareEntity::LOCAL && $linkCache ) {
88			// LinkCache currently doesn't support cross-wiki PageReferences.
89			// Once it does, this check can go away. At that point, LinkCache should
90			// probably also no longer be optional.
91			throw new InvalidArgumentException( "Can't use LinkCache with pages from $wikiId" );
92		}
93	}
94
95	/**
96	 * @param string $metric
97	 */
98	private function incrementStats( string $metric ) {
99		$this->stats->increment( "PageStore.{$metric}" );
100	}
101
102	/**
103	 * @param LinkTarget $link
104	 * @param int $queryFlags
105	 *
106	 * @return ProperPageIdentity
107	 */
108	public function getPageForLink(
109		LinkTarget $link,
110		int $queryFlags = self::READ_NORMAL
111	): ProperPageIdentity {
112		Assert::parameter( !$link->isExternal(), '$link', 'must not be external' );
113		Assert::parameter( $link->getDBkey() !== '', '$link', 'must not be relative' );
114
115		$ns = $link->getNamespace();
116
117		// Map Media links to File namespace
118		if ( $ns === NS_MEDIA ) {
119			$ns = NS_FILE;
120		}
121
122		Assert::parameter( $ns >= 0, '$link', 'namespace must not be virtual' );
123
124		$page = $this->getPageByName( $ns, $link->getDBkey(), $queryFlags );
125
126		if ( !$page ) {
127			$page = new PageIdentityValue( 0, $ns, $link->getDBkey(), $this->wikiId );
128		}
129
130		return $page;
131	}
132
133	/**
134	 * @param int $namespace
135	 * @param string $dbKey
136	 * @param int $queryFlags
137	 *
138	 * @return ExistingPageRecord|null
139	 */
140	public function getPageByName(
141		int $namespace,
142		string $dbKey,
143		int $queryFlags = self::READ_NORMAL
144	): ?ExistingPageRecord {
145		Assert::parameter( $dbKey !== '', '$dbKey', 'must not be empty' );
146		Assert::parameter( !strpos( $dbKey, ' ' ), '$dbKey', 'must not contain spaces' );
147		Assert::parameter( $namespace >= 0, '$namespace', 'must not be virtual' );
148
149		$conds = [
150			'page_namespace' => $namespace,
151			'page_title' => $dbKey,
152		];
153
154		if ( $this->linkCache ) {
155			return $this->getPageByNameViaLinkCache( $namespace, $dbKey, $queryFlags );
156		} else {
157			return $this->loadPageFromConditions( $conds, $queryFlags );
158		}
159	}
160
161	/**
162	 * @param int $namespace
163	 * @param string $dbKey
164	 * @param int $queryFlags
165	 *
166	 * @return ExistingPageRecord|null
167	 */
168	private function getPageByNameViaLinkCache(
169		int $namespace,
170		string $dbKey,
171		int $queryFlags = self::READ_NORMAL
172	): ?ExistingPageRecord {
173		$conds = [
174			'page_namespace' => $namespace,
175			'page_title' => $dbKey,
176		];
177
178		if ( $queryFlags === self::READ_NORMAL && $this->linkCache->isBadLink( $conds ) ) {
179			$this->incrementStats( "LinkCache.hit.bad.early" );
180			return null;
181		}
182
183		$caller = __METHOD__;
184		$hitOrMiss = 'hit';
185
186		// Try to get the row from LinkCache, providing a callback to fetch it if it's not cached.
187		// When getGoodLinkRow() returns, LinkCache should have an entry for the row, good or bad.
188		$row = $this->linkCache->getGoodLinkRow(
189			$namespace,
190			$dbKey,
191			function ( IDatabase $dbr, $ns, $dbkey, array $options )
192				use ( $conds, $caller, &$hitOrMiss )
193			{
194				$hitOrMiss = 'miss';
195				$row = $this->newSelectQueryBuilder( $dbr )
196					->fields( $this->getSelectFields() )
197					->conds( $conds )
198					->options( $options )
199					->caller( $caller )
200					->fetchRow();
201
202				return $row;
203			},
204			$queryFlags
205		);
206
207		if ( $row ) {
208			try {
209				$page = $this->newPageRecordFromRow( $row );
210
211				// We were able to use the row we got from link cache.
212				$this->incrementStats( "LinkCache.{$hitOrMiss}.good" );
213			} catch ( InvalidArgumentException $e ) {
214				// The cached row was incomplete or corrupt,
215				// just keep going and load from the database.
216				$page = $this->loadPageFromConditions( $conds, $queryFlags );
217
218				if ( $page ) {
219					// PageSelectQueryBuilder should have added the full row to the LinkCache now.
220					$this->incrementStats( "LinkCache.{$hitOrMiss}.incomplete.loaded" );
221				} else {
222					// If we get here, an incomplete row was cached, but we failed to
223					// load the full row from the database. This should only happen
224					// if the page was deleted under out feet, which should be very rare.
225					// Update the LinkCache to reflect the new situation.
226					$this->linkCache->addBadLinkObj( $conds );
227					$this->incrementStats( "LinkCache.{$hitOrMiss}.incomplete.missing" );
228				}
229			}
230		} else {
231			$this->incrementStats( "LinkCache.{$hitOrMiss}.bad.late" );
232			$page = null;
233		}
234
235		return $page;
236	}
237
238	/**
239	 * @since 1.37
240	 *
241	 * @param string $text
242	 * @param int $defaultNamespace Namespace to assume per default (usually NS_MAIN)
243	 * @param int $queryFlags
244	 *
245	 * @return ProperPageIdentity|null
246	 */
247	public function getPageByText(
248		string $text,
249		int $defaultNamespace = NS_MAIN,
250		int $queryFlags = self::READ_NORMAL
251	): ?ProperPageIdentity {
252		try {
253			$title = $this->titleParser->parseTitle( $text, $defaultNamespace );
254			return $this->getPageForLink( $title, $queryFlags );
255		} catch ( MalformedTitleException | InvalidArgumentException $e ) {
256			// Note that even some well-formed links are still invalid parameters
257			// for getPageForLink(), e.g. interwiki links or special pages.
258			return null;
259		}
260	}
261
262	/**
263	 * @since 1.37
264	 *
265	 * @param string $text
266	 * @param int $defaultNamespace Namespace to assume per default (usually NS_MAIN)
267	 * @param int $queryFlags
268	 *
269	 * @return ExistingPageRecord|null
270	 */
271	public function getExistingPageByText(
272		string $text,
273		int $defaultNamespace = NS_MAIN,
274		int $queryFlags = self::READ_NORMAL
275	): ?ExistingPageRecord {
276		$pageIdentity = $this->getPageByText( $text, $defaultNamespace, $queryFlags );
277		if ( !$pageIdentity ) {
278			return null;
279		}
280		return $this->getPageByReference( $pageIdentity, $queryFlags );
281	}
282
283	/**
284	 * @param int $pageId
285	 * @param int $queryFlags
286	 *
287	 * @return ExistingPageRecord|null
288	 */
289	public function getPageById(
290		int $pageId,
291		int $queryFlags = self::READ_NORMAL
292	): ?ExistingPageRecord {
293		Assert::parameter( $pageId > 0, '$pageId', 'must be greater than zero' );
294
295		$conds = [
296			'page_id' => $pageId,
297		];
298
299		// XXX: no caching needed?
300
301		return $this->loadPageFromConditions( $conds, $queryFlags );
302	}
303
304	/**
305	 * @param PageReference $page
306	 * @param int $queryFlags
307	 *
308	 * @return ExistingPageRecord|null The page's PageRecord, or null if the page was not found.
309	 */
310	public function getPageByReference(
311		PageReference $page,
312		int $queryFlags = self::READ_NORMAL
313	): ?ExistingPageRecord {
314		$page->assertWiki( $this->wikiId );
315		Assert::parameter( $page->getNamespace() >= 0, '$page', 'namespace must not be virtual' );
316
317		if ( $page instanceof ExistingPageRecord && $queryFlags === self::READ_NORMAL ) {
318			return $page;
319		}
320
321		if ( $page instanceof PageIdentity ) {
322			Assert::parameter( $page->canExist(), '$page', 'Must be a proper page' );
323
324			if ( $page->exists() ) {
325				// if we have a page ID, use it
326				$id = $page->getId( $this->wikiId );
327				return $this->getPageById( $id, $queryFlags );
328			}
329		}
330
331		return $this->getPageByName( $page->getNamespace(), $page->getDBkey(), $queryFlags );
332	}
333
334	/**
335	 * @param array $conds
336	 * @param int $queryFlags
337	 *
338	 * @return ExistingPageRecord|null
339	 */
340	private function loadPageFromConditions(
341		array $conds,
342		int $queryFlags = self::READ_NORMAL
343	): ?ExistingPageRecord {
344		$queryBuilder = $this->newSelectQueryBuilder( $queryFlags )
345			->conds( $conds )
346			->caller( __METHOD__ );
347
348		// @phan-suppress-next-line PhanTypeMismatchReturnSuperType
349		return $queryBuilder->fetchPageRecord();
350	}
351
352	/**
353	 * @internal
354	 *
355	 * @param stdClass $row
356	 *
357	 * @return ExistingPageRecord
358	 */
359	public function newPageRecordFromRow( stdClass $row ): ExistingPageRecord {
360		return new PageStoreRecord(
361			$row,
362			$this->wikiId
363		);
364	}
365
366	/**
367	 * @internal
368	 *
369	 * @return string[]
370	 */
371	public function getSelectFields(): array {
372		$fields = [
373			'page_id',
374			'page_namespace',
375			'page_title',
376			'page_is_redirect',
377			'page_is_new',
378			'page_touched',
379			'page_links_updated',
380			'page_latest',
381			'page_len',
382			'page_content_model'
383		];
384
385		if ( $this->options->get( 'PageLanguageUseDB' ) ) {
386			$fields[] = 'page_lang';
387		}
388
389		// Since we are putting rows into LinkCache, we need to include all fields
390		// that LinkCache needs.
391		$fields = array_unique(
392			array_merge( $fields, LinkCache::getSelectFields() )
393		);
394
395		return $fields;
396	}
397
398	/**
399	 * @unstable
400	 *
401	 * @param IDatabase|int|null $dbOrFlags The database connection to use, or a READ_XXX constant
402	 *        indicating what kind of database connection to use.
403	 *
404	 * @return PageSelectQueryBuilder
405	 */
406	public function newSelectQueryBuilder( $dbOrFlags = self::READ_NORMAL ): SelectQueryBuilder {
407		if ( $dbOrFlags instanceof IDatabase ) {
408			$db = $dbOrFlags;
409			$options = [];
410		} else {
411			[ $mode, $options ] = DBAccessObjectUtils::getDBOptions( $dbOrFlags );
412			$db = $this->getDBConnectionRef( $mode );
413		}
414
415		$queryBuilder = new PageSelectQueryBuilder( $db, $this, $this->linkCache );
416		$queryBuilder->options( $options );
417
418		return $queryBuilder;
419	}
420
421	/**
422	 * @param int $mode DB_PRIMARY or DB_REPLICA
423	 * @return IDatabase
424	 */
425	private function getDBConnectionRef( int $mode = DB_REPLICA ): IDatabase {
426		return $this->dbLoadBalancer->getConnectionRef( $mode, [], $this->wikiId );
427	}
428
429	/**
430	 * Get all subpages of this page.
431	 * Will return an empty list of the namespace doesn't support subpages.
432	 *
433	 * @param PageIdentity $page
434	 * @param int $limit Maximum number of subpages to fetch
435	 *
436	 * @return Iterator<ExistingPageRecord>
437	 */
438	public function getSubpages( PageIdentity $page, int $limit ): Iterator {
439		if ( !$this->namespaceInfo->hasSubpages( $page->getNamespace() ) ) {
440			return new EmptyIterator();
441		}
442
443		return $this->newSelectQueryBuilder()
444			->whereTitlePrefix( $page->getNamespace(), $page->getDBkey() . '/' )
445			->orderByTitle()
446			->options( [ 'LIMIT' => $limit ] )
447			->caller( __METHOD__ )
448			->fetchPageRecords();
449	}
450
451}
452