1 /*-------------------------------------------------------------------------
2  *
3  * pglogical_compat.c
4  *              compatibility functions (mainly with different PG versions)
5  *
6  * Copyright (c) 2015, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *              pglogical_compat.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "funcapi.h"
17 #include "miscadmin.h"
18 
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23 
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_type.h"
28 
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/rel.h"
36 #include "utils/snapmgr.h"
37 #include "utils/syscache.h"
38 #include "utils/tqual.h"
39 
40 #include "postmaster/bgworker_internals.h"
41 
42 #include "pglogical_compat.h"
43 #include "replication/origin.h"
44 #include "access/commit_ts.h"
45 
46 #define InvalidRepNodeId 0
47 
48 XLogRecPtr XactLastCommitEnd = 0;
49 
50 RepOriginId replorigin_session_origin = InvalidRepNodeId;
51 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
52 TimestampTz replorigin_session_origin_timestamp = 0;
53 
54 bool track_commit_timestamp = false;
55 
56 #define Natts_pg_replication_origin				3
57 #define Anum_pg_replication_origin_roident		1
58 #define Anum_pg_replication_origin_roname		2
59 #define Anum_pg_replication_origin_roremote_lsn	3
60 
61 static Oid ReplicationOriginRelationId = InvalidOid;
62 static Oid ReplicationOriginIdentIndex = InvalidOid;
63 static Oid ReplicationOriginNameIndex = InvalidOid;
64 
65 /*
66  * Replay progress of a single remote node.
67  */
68 typedef struct ReplicationState
69 {
70 	/*
71 	 * Local identifier for the remote node.
72 	 */
73 	RepOriginId roident;
74 
75 	/*
76 	 * Location of the latest commit from the remote side.
77 	 */
78 	XLogRecPtr	remote_lsn;
79 
80 	/*
81 	 * Remember the local lsn of the commit record so we can XLogFlush() to it
82 	 * during a checkpoint so we know the commit record actually is safe on
83 	 * disk.
84 	 */
85 	XLogRecPtr	local_lsn;
86 
87 	/*
88 	 * Slot is setup in backend?
89 	 */
90 	pid_t		acquired_by;
91 
92 	/*
93 	 * Lock protecting remote_lsn and local_lsn.
94 	 */
95 /*	LWLock		lock;*/
96 } ReplicationState;
97 
98 static ReplicationState *session_replication_state = NULL;
99 
100 static void session_origin_xact_cb(XactEvent event, void *arg);
101 static void ensure_replication_origin_relid(void);
102 
103 
104 /*
105  * Create a replication origin.
106  *
107  * Needs to be called in a transaction.
108  */
109 RepOriginId
replorigin_create(char * roname)110 replorigin_create(char *roname)
111 {
112 	Oid			roident;
113 	HeapTuple	tuple = NULL;
114 	Relation	rel;
115 	SnapshotData SnapshotDirty;
116 	SysScanDesc scan;
117 	ScanKeyData key;
118 
119 	Assert(IsTransactionState());
120 
121 	ensure_replication_origin_relid();
122 
123 	/*
124 	 * We need the numeric replication origin to be 16bit wide, so we cannot
125 	 * rely on the normal oid allocation. Instead we simply scan
126 	 * pg_replication_origin for the first unused id. That's not particularly
127 	 * efficient, but this should be a fairly infrequent operation - we can
128 	 * easily spend a bit more code on this when it turns out it needs to be
129 	 * faster.
130 	 *
131 	 * We handle concurrency by taking an exclusive lock (allowing reads!)
132 	 * over the table for the duration of the search. Because we use a "dirty
133 	 * snapshot" we can read rows that other in-progress sessions have
134 	 * written, even though they would be invisible with normal snapshots. Due
135 	 * to the exclusive lock there's no danger that new rows can appear while
136 	 * we're checking.
137 	 */
138 	InitDirtySnapshot(SnapshotDirty);
139 
140 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
141 
142 	for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
143 	{
144 		bool		nulls[Natts_pg_replication_origin];
145 		Datum		values[Natts_pg_replication_origin];
146 		bool		collides;
147 
148 		CHECK_FOR_INTERRUPTS();
149 
150 		ScanKeyInit(&key,
151 					Anum_pg_replication_origin_roident,
152 					BTEqualStrategyNumber, F_OIDEQ,
153 					ObjectIdGetDatum(roident));
154 
155 		scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
156 								  true /* indexOK */ ,
157 								  &SnapshotDirty,
158 								  1, &key);
159 
160 		collides = HeapTupleIsValid(systable_getnext(scan));
161 
162 		systable_endscan(scan);
163 
164 		if (!collides)
165 		{
166 			/*
167 			 * Ok, found an unused roident, insert the new row and do a CCI,
168 			 * so our callers can look it up if they want to.
169 			 */
170 			memset(&nulls, 0, sizeof(nulls));
171 
172 			values[Anum_pg_replication_origin_roident - 1] =
173 				ObjectIdGetDatum(roident);
174 			values[Anum_pg_replication_origin_roname - 1] =
175 				CStringGetTextDatum(roname);
176 			values[Anum_pg_replication_origin_roremote_lsn - 1] =
177 				LSNGetDatum(InvalidXLogRecPtr);
178 
179 			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
180 			simple_heap_insert(rel, tuple);
181 			CatalogUpdateIndexes(rel, tuple);
182 			CommandCounterIncrement();
183 			break;
184 		}
185 	}
186 
187 	/* now release lock again,	*/
188 	heap_close(rel, ExclusiveLock);
189 
190 	if (tuple == NULL)
191 		ereport(ERROR,
192 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
193 				 errmsg("could not find free replication origin OID")));
194 
195 	heap_freetuple(tuple);
196 	return roident;
197 }
198 
199 /*
200  * Drop replication origin.
201  *
202  * Needs to be called in a transaction.
203  */
204 void
pgl_replorigin_drop(RepOriginId roident)205 pgl_replorigin_drop(RepOriginId roident)
206 {
207 	HeapTuple	tuple = NULL;
208 	Relation	rel;
209 	SnapshotData SnapshotDirty;
210 	SysScanDesc scan;
211 	ScanKeyData key;
212 
213 	Assert(IsTransactionState());
214 
215 	ensure_replication_origin_relid();
216 
217 	InitDirtySnapshot(SnapshotDirty);
218 
219 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
220 
221 	/* Find and delete tuple from name table */
222 	ScanKeyInit(&key,
223 				Anum_pg_replication_origin_roident,
224 				BTEqualStrategyNumber, F_OIDEQ,
225 				ObjectIdGetDatum(roident));
226 
227 	scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
228 							  true /* indexOK */,
229 							  &SnapshotDirty,
230 							  1, &key);
231 
232 	tuple = systable_getnext(scan);
233 
234 	if (HeapTupleIsValid(tuple))
235 		simple_heap_delete(rel, &tuple->t_self);
236 
237 	systable_endscan(scan);
238 
239 	CommandCounterIncrement();
240 
241 	/* now release lock again,	*/
242 	heap_close(rel, ExclusiveLock);
243 }
244 
245 void
replorigin_drop_by_name(char * name,bool missing_ok,bool nowait)246 replorigin_drop_by_name(char *name, bool missing_ok, bool nowait)
247 {
248 	RepOriginId	originid;
249 
250 	originid = replorigin_by_name(name, missing_ok);
251 	if (originid != InvalidRepOriginId)
252 		replorigin_drop(originid, nowait);
253 }
254 
255 RepOriginId
replorigin_by_name(char * name,bool missing_ok)256 replorigin_by_name(char *name, bool missing_ok)
257 {
258 	HeapTuple	tuple = NULL;
259 	Relation	rel;
260 	Snapshot	snap;
261 	SysScanDesc scan;
262 	ScanKeyData key;
263 	Oid			roident = InvalidOid;
264 
265 	ensure_replication_origin_relid();
266 
267 	snap = RegisterSnapshot(GetLatestSnapshot());
268 	rel = heap_open(ReplicationOriginRelationId, RowExclusiveLock);
269 
270 	ScanKeyInit(&key,
271 				Anum_pg_replication_origin_roname,
272 				BTEqualStrategyNumber, F_TEXTEQ,
273 				CStringGetTextDatum(name));
274 
275 	scan = systable_beginscan(rel, ReplicationOriginNameIndex,
276 							  true /* indexOK */,
277 							  snap,
278 							  1, &key);
279 
280 	tuple = systable_getnext(scan);
281 
282 	if (HeapTupleIsValid(tuple))
283 	{
284 		Datum		values[Natts_pg_replication_origin];
285 		bool		nulls[Natts_pg_replication_origin];
286 
287 		heap_deform_tuple(tuple, RelationGetDescr(rel),
288 						  values, nulls);
289 		roident = DatumGetObjectId(values[Anum_pg_replication_origin_roident - 1]);
290 	}
291 	else if (!missing_ok)
292 		elog(ERROR, "cache lookup failed for replication origin named %s",
293 			 name);
294 
295 	systable_endscan(scan);
296 	UnregisterSnapshot(snap);
297 	heap_close(rel, RowExclusiveLock);
298 
299 	return roident;
300 }
301 
302 void
replorigin_session_setup(RepOriginId node)303 replorigin_session_setup(RepOriginId node)
304 {
305 	Relation		rel;
306 	SysScanDesc		scan;
307 	ScanKeyData		key;
308 	HeapTuple		tuple;
309 	XLogRecPtr		remote_lsn = InvalidXLogRecPtr,
310 					local_lsn = InvalidXLogRecPtr;
311     MemoryContext   oldcontext;
312 
313 	Assert(node != InvalidRepNodeId);
314 
315 	if (session_replication_state != NULL)
316 		ereport(ERROR,
317 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318 		errmsg("cannot setup replication origin when one is already setup")));
319 
320 	ensure_replication_origin_relid();
321 
322 	rel = heap_open(ReplicationOriginRelationId, RowExclusiveLock);
323 
324 	ScanKeyInit(&key,
325 				Anum_pg_replication_origin_roident,
326 				BTEqualStrategyNumber, F_OIDEQ,
327 				ObjectIdGetDatum(node));
328 
329 	scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
330 							  true, NULL, 1, &key);
331 	tuple = systable_getnext(scan);
332 
333 	if (HeapTupleIsValid(tuple))
334 	{
335 		Datum		values[Natts_pg_replication_origin];
336 		bool		nulls[Natts_pg_replication_origin];
337 
338 		heap_deform_tuple(tuple, RelationGetDescr(rel),
339 						  values, nulls);
340 		remote_lsn =
341 			DatumGetLSN(values[Anum_pg_replication_origin_roremote_lsn - 1]);
342 		local_lsn = XactLastCommitEnd;
343 	}
344 
345 	systable_endscan(scan);
346 	heap_close(rel, RowExclusiveLock);
347 
348 	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
349 	session_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
350 	session_replication_state->roident = node;
351 	session_replication_state->remote_lsn = remote_lsn;
352 	session_replication_state->local_lsn = local_lsn;
353 	MemoryContextSwitchTo(oldcontext);
354 
355 	RegisterXactCallback(session_origin_xact_cb, NULL);
356 }
357 
358 void
replorigin_session_reset(void)359 replorigin_session_reset(void)
360 {
361 	ReplicationState *local_replication_state = session_replication_state;
362 
363 	if (session_replication_state == NULL)
364 		ereport(ERROR,
365 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366 				 errmsg("no replication origin is configured")));
367 
368 	UnregisterXactCallback(session_origin_xact_cb, NULL);
369 
370 	session_replication_state->acquired_by = 0;
371 	session_replication_state = NULL;
372 
373 	pfree(local_replication_state);
374 }
375 
376 /*
377  * Ask the machinery about the point up to which we successfully replayed
378  * changes from an already setup replication origin.
379  */
380 XLogRecPtr
replorigin_session_get_progress(bool flush)381 replorigin_session_get_progress(bool flush)
382 {
383 	XLogRecPtr	remote_lsn;
384 	XLogRecPtr	local_lsn;
385 
386 	Assert(session_replication_state != NULL);
387 
388 	remote_lsn = session_replication_state->remote_lsn;
389 	local_lsn = session_replication_state->local_lsn;
390 
391 	if (flush && local_lsn != InvalidXLogRecPtr)
392 		XLogFlush(local_lsn);
393 
394 	return remote_lsn;
395 }
396 
397 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)398 replorigin_advance(RepOriginId node,
399 				   XLogRecPtr remote_commit,
400 				   XLogRecPtr local_commit,
401 				   bool go_backward, bool wal_log)
402 {
403 	HeapTuple	tuple = NULL;
404 	Relation	rel;
405 	SnapshotData SnapshotDirty;
406 	SysScanDesc scan;
407 	ScanKeyData key;
408 
409 	Assert(node != InvalidRepOriginId);
410 	Assert(IsTransactionState());
411 
412 	if (node == DoNotReplicateId)
413 		return;
414 
415 	ensure_replication_origin_relid();
416 
417 	InitDirtySnapshot(SnapshotDirty);
418 
419 	rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
420 
421 	/* Find and delete tuple from name table */
422 	ScanKeyInit(&key,
423 				Anum_pg_replication_origin_roident,
424 				BTEqualStrategyNumber, F_OIDEQ,
425 				ObjectIdGetDatum(node));
426 
427 	scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
428 							  true /* indexOK */,
429 							  &SnapshotDirty,
430 							  1, &key);
431 
432 	tuple = systable_getnext(scan);
433 
434 	if (HeapTupleIsValid(tuple))
435 	{
436 		HeapTuple	newtuple;
437 		Datum		values[Natts_pg_replication_origin];
438 		bool		nulls[Natts_pg_replication_origin];
439 
440 		heap_deform_tuple(tuple, RelationGetDescr(rel),
441 							  values, nulls);
442 
443 		values[Anum_pg_replication_origin_roremote_lsn - 1] =
444 			LSNGetDatum(remote_commit);
445 
446 		newtuple = heap_form_tuple(RelationGetDescr(rel),
447 								   values, nulls);
448 		simple_heap_update(rel, &tuple->t_self, newtuple);
449 		CatalogUpdateIndexes(rel, newtuple);
450 	}
451 
452 	systable_endscan(scan);
453 
454 	CommandCounterIncrement();
455 
456 	/* now release lock again,	*/
457 	heap_close(rel, ExclusiveLock);
458 
459 	return;
460 }
461 
462 static void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)463 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
464 {
465 	Assert(session_replication_state != NULL);
466 	Assert(session_replication_state->roident != InvalidRepOriginId);
467 
468 	if (session_replication_state->local_lsn < local_commit)
469 		session_replication_state->local_lsn = local_commit;
470 	if (session_replication_state->remote_lsn < remote_commit)
471 		session_replication_state->remote_lsn = remote_commit;
472 
473 	replorigin_advance(session_replication_state->roident, remote_commit,
474 					   local_commit, false, true);
475 }
476 
477 static void
session_origin_xact_cb(XactEvent event,void * arg)478 session_origin_xact_cb(XactEvent event, void *arg)
479 {
480 	if (event == XACT_EVENT_PRE_COMMIT &&
481 		session_replication_state != NULL &&
482 		replorigin_session_origin != InvalidRepOriginId &&
483 		replorigin_session_origin != DoNotReplicateId)
484 	{
485 		replorigin_session_advance(replorigin_session_origin_lsn,
486 								   XactLastCommitEnd);
487 	}
488 }
489 
490 static void
ensure_replication_origin_relid(void)491 ensure_replication_origin_relid(void)
492 {
493 	if (ReplicationOriginRelationId == InvalidOid)
494 	{
495 		Oid	schema_oid = get_namespace_oid("pglogical_origin", true);
496 
497 		if (schema_oid == InvalidOid)
498 			ereport(ERROR,
499 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
500 					 errmsg("pglogical_origin extension not found")));
501 
502 		ReplicationOriginRelationId =
503 			get_relname_relid("replication_origin", schema_oid);
504 		ReplicationOriginIdentIndex =
505 			get_relname_relid("replication_origin_roident_index", schema_oid);
506 		ReplicationOriginNameIndex =
507 			get_relname_relid("replication_origin_roname_index", schema_oid);
508 	}
509 }
510 
511 /*
512  * Connect background worker to a database using OIDs.
513  */
514 void
BackgroundWorkerInitializeConnectionByOid(Oid dboid,Oid useroid)515 BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
516 {
517 	BackgroundWorker *worker = MyBgworkerEntry;
518 
519 	/* XXX is this the right errcode? */
520 	if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
521 		ereport(FATAL,
522 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
523 				 errmsg("database connection requirement not indicated during registration")));
524 
525 	InitPostgres(NULL, dboid, NULL, NULL);
526 
527 	/* it had better not gotten out of "init" mode yet */
528 	if (!IsInitProcessingMode())
529 		ereport(ERROR,
530 				(errmsg("invalid processing mode in background worker")));
531 	SetProcessingMode(NormalProcessing);
532 }
533 
534 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)535 TransactionIdGetCommitTsData(TransactionId xid,
536 							 TimestampTz *ts, RepOriginId *nodeid)
537 {
538 	elog(ERROR, "TransactionIdGetCommitTsData is not implemented yet");
539 	return false;
540 }
541 
542 
543 /*
544  * Auxiliary function to return a TEXT array out of a list of C-strings.
545  */
546 ArrayType *
strlist_to_textarray(List * list)547 strlist_to_textarray(List *list)
548 {
549 	ArrayType  *arr;
550 	Datum	   *datums;
551 	int			j = 0;
552 	ListCell   *cell;
553 	MemoryContext memcxt;
554 	MemoryContext oldcxt;
555 
556 	memcxt = AllocSetContextCreate(CurrentMemoryContext,
557 								   "strlist to array",
558 								   ALLOCSET_DEFAULT_MINSIZE,
559 								   ALLOCSET_DEFAULT_INITSIZE,
560 								   ALLOCSET_DEFAULT_MAXSIZE);
561 	oldcxt = MemoryContextSwitchTo(memcxt);
562 
563 	datums = palloc(sizeof(text *) * list_length(list));
564 	foreach(cell, list)
565 	{
566 		char	   *name = lfirst(cell);
567 
568 		datums[j++] = CStringGetTextDatum(name);
569 	}
570 
571 	MemoryContextSwitchTo(oldcxt);
572 
573 	arr = construct_array(datums, list_length(list),
574 						  TEXTOID, -1, false, 'i');
575 	MemoryContextDelete(memcxt);
576 
577 	return arr;
578 }
579 
580 
581 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)582 GetNamedLWLockTranche(const char *tranche_name)
583 {
584     LWLock     *lock = LWLockAssign();
585 
586     return (LWLockPadded *)lock;
587 }
588 
589 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)590 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
591 {
592     Assert(num_lwlocks == 1);
593 
594     RequestAddinLWLocks(num_lwlocks);
595 }
596 
597 /*
598  * CatalogTupleInsert - do heap and indexing work for a new catalog tuple
599  *
600  * Insert the tuple data in "tup" into the specified catalog relation.
601  * The Oid of the inserted tuple is returned.
602  *
603  * This is a convenience routine for the common case of inserting a single
604  * tuple in a system catalog; it inserts a new heap tuple, keeping indexes
605  * current.  Avoid using it for multiple tuples, since opening the indexes
606  * and building the index info structures is moderately expensive.
607  * (Use CatalogTupleInsertWithInfo in such cases.)
608  */
609 Oid
CatalogTupleInsert(Relation heapRel,HeapTuple tup)610 CatalogTupleInsert(Relation heapRel, HeapTuple tup)
611 {
612 	CatalogIndexState indstate;
613 	Oid			oid;
614 
615 	indstate = CatalogOpenIndexes(heapRel);
616 
617 	oid = simple_heap_insert(heapRel, tup);
618 
619 	CatalogIndexInsert(indstate, tup);
620 	CatalogCloseIndexes(indstate);
621 
622 	return oid;
623 }
624 
625 /*
626  * CatalogTupleUpdate - do heap and indexing work for updating a catalog tuple
627  *
628  * Update the tuple identified by "otid", replacing it with the data in "tup".
629  *
630  * This is a convenience routine for the common case of updating a single
631  * tuple in a system catalog; it updates one heap tuple, keeping indexes
632  * current.  Avoid using it for multiple tuples, since opening the indexes
633  * and building the index info structures is moderately expensive.
634  * (Use CatalogTupleUpdateWithInfo in such cases.)
635  */
636 void
CatalogTupleUpdate(Relation heapRel,ItemPointer otid,HeapTuple tup)637 CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
638 {
639 	CatalogIndexState indstate;
640 
641 	indstate = CatalogOpenIndexes(heapRel);
642 
643 	simple_heap_update(heapRel, otid, tup);
644 
645 	CatalogIndexInsert(indstate, tup);
646 	CatalogCloseIndexes(indstate);
647 }
648 
649 
650 /*
651  * CatalogTupleDelete - do heap and indexing work for deleting a catalog tuple
652  *
653  * Delete the tuple identified by "tid" in the specified catalog.
654  *
655  * With Postgres heaps, there is no index work to do at deletion time;
656  * cleanup will be done later by VACUUM.  However, callers of this function
657  * shouldn't have to know that; we'd like a uniform abstraction for all
658  * catalog tuple changes.  Hence, provide this currently-trivial wrapper.
659  *
660  * The abstraction is a bit leaky in that we don't provide an optimized
661  * CatalogTupleDeleteWithInfo version, because there is currently nothing to
662  * optimize.  If we ever need that, rather than touching a lot of call sites,
663  * it might be better to do something about caching CatalogIndexState.
664  */
665 void
CatalogTupleDelete(Relation heapRel,ItemPointer tid)666 CatalogTupleDelete(Relation heapRel, ItemPointer tid)
667 {
668 	simple_heap_delete(heapRel, tid);
669 }
670