1<?php
2
3namespace Wikimedia\Rdbms;
4
5use InvalidArgumentException;
6use UnexpectedValueException;
7
8/**
9 * DBPrimaryPos class for MySQL/MariaDB
10 *
11 * Note that primary positions and sync logic here make some assumptions:
12 *  - Binlog-based usage assumes single-source replication and non-hierarchical replication.
13 *  - GTID-based usage allows getting/syncing with multi-source replication. It is assumed
14 *    that GTID sets are complete (e.g. include all domains on the server).
15 *
16 * @see https://mariadb.com/kb/en/library/gtid/
17 * @see https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html
18 * @since 1.37
19 */
20class MySQLPrimaryPos implements DBPrimaryPos {
21	/** @var string One of (BINARY_LOG, GTID_MYSQL, GTID_MARIA) */
22	private $style;
23	/** @var string|null Base name of all Binary Log files */
24	private $binLog;
25	/** @var array<int,int|string>|null Binary Log position tuple (index number, event number) */
26	private $logPos;
27	/** @var string[] Map of (server_uuid/gtid_domain_id => GTID) */
28	private $gtids = [];
29	/** @var string|null Active GTID domain ID */
30	private $activeDomain;
31	/** @var string|null ID of the server were DB writes originate */
32	private $activeServerId;
33	/** @var string|null UUID of the server were DB writes originate */
34	private $activeServerUUID;
35	/** @var float UNIX timestamp */
36	private $asOfTime = 0.0;
37
38	private const BINARY_LOG = 'binary-log';
39	private const GTID_MARIA = 'gtid-maria';
40	private const GTID_MYSQL = 'gtid-mysql';
41
42	/** @var int Key name of the 6 digit binary log index number of a position tuple */
43	public const CORD_INDEX = 0;
44	/** @var int Key name of the 64 bit binary log event number of a position tuple */
45	public const CORD_EVENT = 1;
46
47	/**
48	 * @param string $position One of (comma separated GTID list, <binlog file>/<64 bit integer>)
49	 * @param float $asOfTime UNIX timestamp
50	 */
51	public function __construct( $position, $asOfTime ) {
52		$this->init( $position, $asOfTime );
53	}
54
55	/**
56	 * @param string $position
57	 * @param float $asOfTime
58	 */
59	protected function init( $position, $asOfTime ) {
60		$m = [];
61		if ( preg_match( '!^(.+)\.(\d+)/(\d+)$!', $position, $m ) ) {
62			$this->binLog = $m[1]; // ideally something like host name
63			$this->logPos = [ self::CORD_INDEX => (int)$m[2], self::CORD_EVENT => $m[3] ];
64			$this->style = self::BINARY_LOG;
65		} else {
66			$gtids = array_filter( array_map( 'trim', explode( ',', $position ) ) );
67			foreach ( $gtids as $gtid ) {
68				$components = self::parseGTID( $gtid );
69				if ( !$components ) {
70					throw new InvalidArgumentException( "Invalid GTID '$gtid'." );
71				}
72
73				list( $domain, $eventNumber ) = $components;
74				if ( isset( $this->gtids[$domain] ) ) {
75					// For MySQL, handle the case where some past issue caused a gap in the
76					// executed GTID set, e.g. [last_purged+1,N-1] and [N+1,N+2+K]. Ignore the
77					// gap by using the GTID with the highest ending event number.
78					list( , $otherEventNumber ) = self::parseGTID( $this->gtids[$domain] );
79					if ( $eventNumber > $otherEventNumber ) {
80						$this->gtids[$domain] = $gtid;
81					}
82				} else {
83					$this->gtids[$domain] = $gtid;
84				}
85
86				if ( is_string( $domain ) ) {
87					$this->style = self::GTID_MARIA; // gtid_domain_id
88				} else {
89					$this->style = self::GTID_MYSQL; // server_uuid
90				}
91			}
92			if ( !$this->gtids ) {
93				throw new InvalidArgumentException( "GTID set cannot be empty." );
94			}
95		}
96
97		$this->asOfTime = $asOfTime;
98	}
99
100	public function asOfTime() {
101		return $this->asOfTime;
102	}
103
104	public function hasReached( DBPrimaryPos $pos ) {
105		if ( !( $pos instanceof self ) ) {
106			throw new InvalidArgumentException( "Position not an instance of " . __CLASS__ );
107		}
108
109		// Prefer GTID comparisons, which work with multi-tier replication
110		$thisPosByDomain = $this->getActiveGtidCoordinates();
111		$thatPosByDomain = $pos->getActiveGtidCoordinates();
112		if ( $thisPosByDomain && $thatPosByDomain ) {
113			$comparisons = [];
114			// Check that this has positions reaching those in $pos for all domains in common
115			foreach ( $thatPosByDomain as $domain => $thatPos ) {
116				if ( isset( $thisPosByDomain[$domain] ) ) {
117					$comparisons[] = ( $thatPos <= $thisPosByDomain[$domain] );
118				}
119			}
120			// Check that $this has a GTID for at least one domain also in $pos; due to MariaDB
121			// quirks, prior primary switch-overs may result in inactive garbage GTIDs that cannot
122			// be cleaned up. Assume that the domains in both this and $pos cover the relevant
123			// active channels.
124			return ( $comparisons && !in_array( false, $comparisons, true ) );
125		}
126
127		// Fallback to the binlog file comparisons
128		$thisBinPos = $this->getBinlogCoordinates();
129		$thatBinPos = $pos->getBinlogCoordinates();
130		if ( $thisBinPos && $thatBinPos && $thisBinPos['binlog'] === $thatBinPos['binlog'] ) {
131			return ( $thisBinPos['pos'] >= $thatBinPos['pos'] );
132		}
133
134		// Comparing totally different binlogs does not make sense
135		return false;
136	}
137
138	public function channelsMatch( DBPrimaryPos $pos ) {
139		if ( !( $pos instanceof self ) ) {
140			throw new InvalidArgumentException( "Position not an instance of " . __CLASS__ );
141		}
142
143		// Prefer GTID comparisons, which work with multi-tier replication
144		$thisPosDomains = array_keys( $this->getActiveGtidCoordinates() );
145		$thatPosDomains = array_keys( $pos->getActiveGtidCoordinates() );
146		if ( $thisPosDomains && $thatPosDomains ) {
147			// Check that $this has a GTID for at least one domain also in $pos; due to MariaDB
148			// quirks, prior primary switch-overs may result in inactive garbage GTIDs that cannot
149			// easily be cleaned up. Assume that the domains in both this and $pos cover the
150			// relevant active channels.
151			return array_intersect( $thatPosDomains, $thisPosDomains ) ? true : false;
152		}
153
154		// Fallback to the binlog file comparisons
155		$thisBinPos = $this->getBinlogCoordinates();
156		$thatBinPos = $pos->getBinlogCoordinates();
157
158		return ( $thisBinPos && $thatBinPos && $thisBinPos['binlog'] === $thatBinPos['binlog'] );
159	}
160
161	/**
162	 * @return string|null Base name of binary log files
163	 * @since 1.31
164	 */
165	public function getLogName() {
166		return $this->gtids ? null : $this->binLog;
167	}
168
169	/**
170	 * @return array<int,int|string>|null Tuple of (binary log file number, 64 bit event number)
171	 * @since 1.31
172	 */
173	public function getLogPosition() {
174		return $this->gtids ? null : $this->logPos;
175	}
176
177	/**
178	 * @return string|null Name of the binary log file for this position
179	 * @since 1.31
180	 */
181	public function getLogFile() {
182		// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
183		return $this->gtids ? null : "{$this->binLog}.{$this->logPos[self::CORD_INDEX]}";
184	}
185
186	/**
187	 * @return array<string,string> Map of (server_uuid/gtid_domain_id => GTID)
188	 * @since 1.31
189	 */
190	public function getGTIDs() {
191		return $this->gtids;
192	}
193
194	/**
195	 * Set the GTID domain known to be used in new commits on a replication stream of interest
196	 *
197	 * This makes getRelevantActiveGTIDs() filter out GTIDs from other domains
198	 *
199	 * @see MySQLPrimaryPos::getRelevantActiveGTIDs()
200	 * @see https://mariadb.com/kb/en/library/gtid/#gtid_domain_id
201	 *
202	 * @param string|int|null $id @@gtid_domain_id of the active replication stream
203	 * @return MySQLPrimaryPos This instance (since 1.34)
204	 * @since 1.31
205	 */
206	public function setActiveDomain( $id ) {
207		$this->activeDomain = (string)$id;
208
209		return $this;
210	}
211
212	/**
213	 * Set the server ID known to be used in new commits on a replication stream of interest
214	 *
215	 * This makes getRelevantActiveGTIDs() filter out GTIDs from other origin servers
216	 *
217	 * @see MySQLPrimaryPos::getRelevantActiveGTIDs()
218	 *
219	 * @param string|int|null $id @@server_id of the server were writes originate
220	 * @return MySQLPrimaryPos This instance (since 1.34)
221	 * @since 1.31
222	 */
223	public function setActiveOriginServerId( $id ) {
224		$this->activeServerId = (string)$id;
225
226		return $this;
227	}
228
229	/**
230	 * Set the server UUID known to be used in new commits on a replication stream of interest
231	 *
232	 * This makes getRelevantActiveGTIDs() filter out GTIDs from other origin servers
233	 *
234	 * @see MySQLPrimaryPos::getRelevantActiveGTIDs()
235	 *
236	 * @param string|null $id @@server_uuid of the server were writes originate
237	 * @return MySQLPrimaryPos This instance (since 1.34)
238	 * @since 1.31
239	 */
240	public function setActiveOriginServerUUID( $id ) {
241		$this->activeServerUUID = $id;
242
243		return $this;
244	}
245
246	/**
247	 * @param MySQLPrimaryPos $pos
248	 * @param MySQLPrimaryPos $refPos
249	 * @return string[] List of active GTIDs from $pos that have domains in $refPos
250	 * @since 1.34
251	 */
252	public static function getRelevantActiveGTIDs( MySQLPrimaryPos $pos, MySQLPrimaryPos $refPos ) {
253		return array_values( array_intersect_key(
254			$pos->gtids,
255			$pos->getActiveGtidCoordinates(),
256			$refPos->gtids
257		) );
258	}
259
260	/**
261	 * @see https://mariadb.com/kb/en/mariadb/gtid
262	 * @see https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html
263	 * @return array<string,int> Map of (server_uuid/gtid_domain_id => integer position)
264	 */
265	protected function getActiveGtidCoordinates() {
266		$gtidInfos = [];
267
268		foreach ( $this->gtids as $gtid ) {
269			list( $domain, $pos, $server ) = self::parseGTID( $gtid );
270
271			$ignore = false;
272			// Filter out GTIDs from non-active replication domains
273			if ( $this->style === self::GTID_MARIA && $this->activeDomain !== null ) {
274				$ignore = $ignore || ( $domain !== $this->activeDomain );
275			}
276			// Likewise for GTIDs from non-active replication origin servers
277			if ( $this->style === self::GTID_MARIA && $this->activeServerId !== null ) {
278				$ignore = $ignore || ( $server !== $this->activeServerId );
279			} elseif ( $this->style === self::GTID_MYSQL && $this->activeServerUUID !== null ) {
280				$ignore = $ignore || ( $server !== $this->activeServerUUID );
281			}
282
283			if ( !$ignore ) {
284				$gtidInfos[$domain] = $pos;
285			}
286		}
287
288		return $gtidInfos;
289	}
290
291	/**
292	 * @param string $id GTID
293	 * @return string[]|null (domain ID, event number, source server ID) for MariaDB,
294	 * (source server UUID, event number, source server UUID) for MySQL, or null
295	 */
296	protected static function parseGTID( $id ) {
297		$m = [];
298		if ( preg_match( '!^(\d+)-(\d+)-(\d+)$!', $id, $m ) ) {
299			// MariaDB style: "<32 bit domain ID>-<32 bit server id>-<64 bit event number>"
300			$channelId = $m[1];
301			$originServerId = $m[2];
302			$eventNumber = $m[3];
303		} elseif ( preg_match( '!^(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}):(?:\d+-|)(\d+)$!', $id, $m ) ) {
304			// MySQL style: "<server UUID>:<64 bit event number>[-<64 bit event number>]".
305			// Normally, the first number should reflect the point (gtid_purged) where older
306			// binary logs where purged to save space. When doing comparisons, it may as well
307			// be 1 in that case. Assume that this is generally the situation.
308			$channelId = $m[1];
309			$originServerId = $m[1];
310			$eventNumber = $m[2];
311		} else {
312			return null;
313		}
314
315		return [ $channelId, $eventNumber, $originServerId ];
316	}
317
318	/**
319	 * @see https://dev.mysql.com/doc/refman/5.7/en/show-master-status.html
320	 * @see https://dev.mysql.com/doc/refman/5.7/en/show-slave-status.html
321	 * @return array|bool Map of (binlog:<string>, pos:(<integer>, <integer>)) or false
322	 */
323	protected function getBinlogCoordinates() {
324		return ( $this->binLog !== null && $this->logPos !== null )
325			? [ 'binlog' => $this->binLog, 'pos' => $this->logPos ]
326			: false;
327	}
328
329	public function serialize() {
330		return serialize( [
331			'position' => $this->__toString(),
332			'activeDomain' => $this->activeDomain,
333			'activeServerId' => $this->activeServerId,
334			'activeServerUUID' => $this->activeServerUUID,
335			'asOfTime' => $this->asOfTime
336		] );
337	}
338
339	public function unserialize( $serialized ) {
340		$data = unserialize( $serialized );
341		if ( !is_array( $data ) ) {
342			throw new UnexpectedValueException( __METHOD__ . ": cannot unserialize position" );
343		}
344
345		$this->init( $data['position'], $data['asOfTime'] );
346		if ( isset( $data['activeDomain'] ) ) {
347			$this->setActiveDomain( $data['activeDomain'] );
348		}
349		if ( isset( $data['activeServerId'] ) ) {
350			$this->setActiveOriginServerId( $data['activeServerId'] );
351		}
352		if ( isset( $data['activeServerUUID'] ) ) {
353			$this->setActiveOriginServerUUID( $data['activeServerUUID'] );
354		}
355	}
356
357	/**
358	 * @return string GTID set or <binary log file>/<position> (e.g db1034-bin.000976/843431247)
359	 */
360	public function __toString() {
361		return $this->gtids
362			? implode( ',', $this->gtids )
363			// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
364			: $this->getLogFile() . "/{$this->logPos[self::CORD_EVENT]}";
365	}
366}
367
368/**
369 * Deprecated alias, renamed as of MediaWiki 1.37
370 *
371 * @deprecated since 1.37
372 */
373class_alias( MySQLPrimaryPos::class, 'Wikimedia\\Rdbms\\MySQLMasterPos' );
374