1 /*-------------------------------------------------------------------------
2 *
3 * pglogical_compat.c
4 * compatibility functions (mainly with different PG versions)
5 *
6 * Copyright (c) 2015, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * pglogical_compat.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15
16 #include "funcapi.h"
17 #include "miscadmin.h"
18
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_type.h"
28
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/rel.h"
36 #include "utils/snapmgr.h"
37 #include "utils/syscache.h"
38 #include "utils/tqual.h"
39
40 #include "postmaster/bgworker_internals.h"
41
42 #include "pglogical_compat.h"
43 #include "replication/origin.h"
44 #include "access/commit_ts.h"
45
46 #define InvalidRepNodeId 0
47
48 XLogRecPtr XactLastCommitEnd = 0;
49
50 RepOriginId replorigin_session_origin = InvalidRepNodeId;
51 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
52 TimestampTz replorigin_session_origin_timestamp = 0;
53
54 bool track_commit_timestamp = false;
55
56 #define Natts_pg_replication_origin 3
57 #define Anum_pg_replication_origin_roident 1
58 #define Anum_pg_replication_origin_roname 2
59 #define Anum_pg_replication_origin_roremote_lsn 3
60
61 static Oid ReplicationOriginRelationId = InvalidOid;
62 static Oid ReplicationOriginIdentIndex = InvalidOid;
63 static Oid ReplicationOriginNameIndex = InvalidOid;
64
65 /*
66 * Replay progress of a single remote node.
67 */
68 typedef struct ReplicationState
69 {
70 /*
71 * Local identifier for the remote node.
72 */
73 RepOriginId roident;
74
75 /*
76 * Location of the latest commit from the remote side.
77 */
78 XLogRecPtr remote_lsn;
79
80 /*
81 * Remember the local lsn of the commit record so we can XLogFlush() to it
82 * during a checkpoint so we know the commit record actually is safe on
83 * disk.
84 */
85 XLogRecPtr local_lsn;
86
87 /*
88 * Slot is setup in backend?
89 */
90 pid_t acquired_by;
91
92 /*
93 * Lock protecting remote_lsn and local_lsn.
94 */
95 /* LWLock lock;*/
96 } ReplicationState;
97
98 static ReplicationState *session_replication_state = NULL;
99
100 static void session_origin_xact_cb(XactEvent event, void *arg);
101 static void ensure_replication_origin_relid(void);
102
103
104 /*
105 * Create a replication origin.
106 *
107 * Needs to be called in a transaction.
108 */
109 RepOriginId
replorigin_create(char * roname)110 replorigin_create(char *roname)
111 {
112 Oid roident;
113 HeapTuple tuple = NULL;
114 Relation rel;
115 SnapshotData SnapshotDirty;
116 SysScanDesc scan;
117 ScanKeyData key;
118
119 Assert(IsTransactionState());
120
121 ensure_replication_origin_relid();
122
123 /*
124 * We need the numeric replication origin to be 16bit wide, so we cannot
125 * rely on the normal oid allocation. Instead we simply scan
126 * pg_replication_origin for the first unused id. That's not particularly
127 * efficient, but this should be a fairly infrequent operation - we can
128 * easily spend a bit more code on this when it turns out it needs to be
129 * faster.
130 *
131 * We handle concurrency by taking an exclusive lock (allowing reads!)
132 * over the table for the duration of the search. Because we use a "dirty
133 * snapshot" we can read rows that other in-progress sessions have
134 * written, even though they would be invisible with normal snapshots. Due
135 * to the exclusive lock there's no danger that new rows can appear while
136 * we're checking.
137 */
138 InitDirtySnapshot(SnapshotDirty);
139
140 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
141
142 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
143 {
144 bool nulls[Natts_pg_replication_origin];
145 Datum values[Natts_pg_replication_origin];
146 bool collides;
147
148 CHECK_FOR_INTERRUPTS();
149
150 ScanKeyInit(&key,
151 Anum_pg_replication_origin_roident,
152 BTEqualStrategyNumber, F_OIDEQ,
153 ObjectIdGetDatum(roident));
154
155 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
156 true /* indexOK */ ,
157 &SnapshotDirty,
158 1, &key);
159
160 collides = HeapTupleIsValid(systable_getnext(scan));
161
162 systable_endscan(scan);
163
164 if (!collides)
165 {
166 /*
167 * Ok, found an unused roident, insert the new row and do a CCI,
168 * so our callers can look it up if they want to.
169 */
170 memset(&nulls, 0, sizeof(nulls));
171
172 values[Anum_pg_replication_origin_roident - 1] =
173 ObjectIdGetDatum(roident);
174 values[Anum_pg_replication_origin_roname - 1] =
175 CStringGetTextDatum(roname);
176 values[Anum_pg_replication_origin_roremote_lsn - 1] =
177 LSNGetDatum(InvalidXLogRecPtr);
178
179 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
180 simple_heap_insert(rel, tuple);
181 CatalogUpdateIndexes(rel, tuple);
182 CommandCounterIncrement();
183 break;
184 }
185 }
186
187 /* now release lock again, */
188 heap_close(rel, ExclusiveLock);
189
190 if (tuple == NULL)
191 ereport(ERROR,
192 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
193 errmsg("could not find free replication origin OID")));
194
195 heap_freetuple(tuple);
196 return roident;
197 }
198
199 /*
200 * Drop replication origin.
201 *
202 * Needs to be called in a transaction.
203 */
204 void
pgl_replorigin_drop(RepOriginId roident)205 pgl_replorigin_drop(RepOriginId roident)
206 {
207 HeapTuple tuple = NULL;
208 Relation rel;
209 SnapshotData SnapshotDirty;
210 SysScanDesc scan;
211 ScanKeyData key;
212
213 Assert(IsTransactionState());
214
215 ensure_replication_origin_relid();
216
217 InitDirtySnapshot(SnapshotDirty);
218
219 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
220
221 /* Find and delete tuple from name table */
222 ScanKeyInit(&key,
223 Anum_pg_replication_origin_roident,
224 BTEqualStrategyNumber, F_OIDEQ,
225 ObjectIdGetDatum(roident));
226
227 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
228 true /* indexOK */,
229 &SnapshotDirty,
230 1, &key);
231
232 tuple = systable_getnext(scan);
233
234 if (HeapTupleIsValid(tuple))
235 simple_heap_delete(rel, &tuple->t_self);
236
237 systable_endscan(scan);
238
239 CommandCounterIncrement();
240
241 /* now release lock again, */
242 heap_close(rel, ExclusiveLock);
243 }
244
245 void
replorigin_drop_by_name(char * name,bool missing_ok,bool nowait)246 replorigin_drop_by_name(char *name, bool missing_ok, bool nowait)
247 {
248 RepOriginId originid;
249
250 originid = replorigin_by_name(name, missing_ok);
251 if (originid != InvalidRepOriginId)
252 replorigin_drop(originid, nowait);
253 }
254
255 RepOriginId
replorigin_by_name(char * name,bool missing_ok)256 replorigin_by_name(char *name, bool missing_ok)
257 {
258 HeapTuple tuple = NULL;
259 Relation rel;
260 Snapshot snap;
261 SysScanDesc scan;
262 ScanKeyData key;
263 Oid roident = InvalidOid;
264
265 ensure_replication_origin_relid();
266
267 snap = RegisterSnapshot(GetLatestSnapshot());
268 rel = heap_open(ReplicationOriginRelationId, RowExclusiveLock);
269
270 ScanKeyInit(&key,
271 Anum_pg_replication_origin_roname,
272 BTEqualStrategyNumber, F_TEXTEQ,
273 CStringGetTextDatum(name));
274
275 scan = systable_beginscan(rel, ReplicationOriginNameIndex,
276 true /* indexOK */,
277 snap,
278 1, &key);
279
280 tuple = systable_getnext(scan);
281
282 if (HeapTupleIsValid(tuple))
283 {
284 Datum values[Natts_pg_replication_origin];
285 bool nulls[Natts_pg_replication_origin];
286
287 heap_deform_tuple(tuple, RelationGetDescr(rel),
288 values, nulls);
289 roident = DatumGetObjectId(values[Anum_pg_replication_origin_roident - 1]);
290 }
291 else if (!missing_ok)
292 elog(ERROR, "cache lookup failed for replication origin named %s",
293 name);
294
295 systable_endscan(scan);
296 UnregisterSnapshot(snap);
297 heap_close(rel, RowExclusiveLock);
298
299 return roident;
300 }
301
302 void
replorigin_session_setup(RepOriginId node)303 replorigin_session_setup(RepOriginId node)
304 {
305 Relation rel;
306 SysScanDesc scan;
307 ScanKeyData key;
308 HeapTuple tuple;
309 XLogRecPtr remote_lsn = InvalidXLogRecPtr,
310 local_lsn = InvalidXLogRecPtr;
311 MemoryContext oldcontext;
312
313 Assert(node != InvalidRepNodeId);
314
315 if (session_replication_state != NULL)
316 ereport(ERROR,
317 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318 errmsg("cannot setup replication origin when one is already setup")));
319
320 ensure_replication_origin_relid();
321
322 rel = heap_open(ReplicationOriginRelationId, RowExclusiveLock);
323
324 ScanKeyInit(&key,
325 Anum_pg_replication_origin_roident,
326 BTEqualStrategyNumber, F_OIDEQ,
327 ObjectIdGetDatum(node));
328
329 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
330 true, NULL, 1, &key);
331 tuple = systable_getnext(scan);
332
333 if (HeapTupleIsValid(tuple))
334 {
335 Datum values[Natts_pg_replication_origin];
336 bool nulls[Natts_pg_replication_origin];
337
338 heap_deform_tuple(tuple, RelationGetDescr(rel),
339 values, nulls);
340 remote_lsn =
341 DatumGetLSN(values[Anum_pg_replication_origin_roremote_lsn - 1]);
342 local_lsn = XactLastCommitEnd;
343 }
344
345 systable_endscan(scan);
346 heap_close(rel, RowExclusiveLock);
347
348 oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
349 session_replication_state = (ReplicationState *) palloc(sizeof(ReplicationState));
350 session_replication_state->roident = node;
351 session_replication_state->remote_lsn = remote_lsn;
352 session_replication_state->local_lsn = local_lsn;
353 MemoryContextSwitchTo(oldcontext);
354
355 RegisterXactCallback(session_origin_xact_cb, NULL);
356 }
357
358 void
replorigin_session_reset(void)359 replorigin_session_reset(void)
360 {
361 ReplicationState *local_replication_state = session_replication_state;
362
363 if (session_replication_state == NULL)
364 ereport(ERROR,
365 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366 errmsg("no replication origin is configured")));
367
368 UnregisterXactCallback(session_origin_xact_cb, NULL);
369
370 session_replication_state->acquired_by = 0;
371 session_replication_state = NULL;
372
373 pfree(local_replication_state);
374 }
375
376 /*
377 * Ask the machinery about the point up to which we successfully replayed
378 * changes from an already setup replication origin.
379 */
380 XLogRecPtr
replorigin_session_get_progress(bool flush)381 replorigin_session_get_progress(bool flush)
382 {
383 XLogRecPtr remote_lsn;
384 XLogRecPtr local_lsn;
385
386 Assert(session_replication_state != NULL);
387
388 remote_lsn = session_replication_state->remote_lsn;
389 local_lsn = session_replication_state->local_lsn;
390
391 if (flush && local_lsn != InvalidXLogRecPtr)
392 XLogFlush(local_lsn);
393
394 return remote_lsn;
395 }
396
397 void
replorigin_advance(RepOriginId node,XLogRecPtr remote_commit,XLogRecPtr local_commit,bool go_backward,bool wal_log)398 replorigin_advance(RepOriginId node,
399 XLogRecPtr remote_commit,
400 XLogRecPtr local_commit,
401 bool go_backward, bool wal_log)
402 {
403 HeapTuple tuple = NULL;
404 Relation rel;
405 SnapshotData SnapshotDirty;
406 SysScanDesc scan;
407 ScanKeyData key;
408
409 Assert(node != InvalidRepOriginId);
410 Assert(IsTransactionState());
411
412 if (node == DoNotReplicateId)
413 return;
414
415 ensure_replication_origin_relid();
416
417 InitDirtySnapshot(SnapshotDirty);
418
419 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
420
421 /* Find and delete tuple from name table */
422 ScanKeyInit(&key,
423 Anum_pg_replication_origin_roident,
424 BTEqualStrategyNumber, F_OIDEQ,
425 ObjectIdGetDatum(node));
426
427 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
428 true /* indexOK */,
429 &SnapshotDirty,
430 1, &key);
431
432 tuple = systable_getnext(scan);
433
434 if (HeapTupleIsValid(tuple))
435 {
436 HeapTuple newtuple;
437 Datum values[Natts_pg_replication_origin];
438 bool nulls[Natts_pg_replication_origin];
439
440 heap_deform_tuple(tuple, RelationGetDescr(rel),
441 values, nulls);
442
443 values[Anum_pg_replication_origin_roremote_lsn - 1] =
444 LSNGetDatum(remote_commit);
445
446 newtuple = heap_form_tuple(RelationGetDescr(rel),
447 values, nulls);
448 simple_heap_update(rel, &tuple->t_self, newtuple);
449 CatalogUpdateIndexes(rel, newtuple);
450 }
451
452 systable_endscan(scan);
453
454 CommandCounterIncrement();
455
456 /* now release lock again, */
457 heap_close(rel, ExclusiveLock);
458
459 return;
460 }
461
462 static void
replorigin_session_advance(XLogRecPtr remote_commit,XLogRecPtr local_commit)463 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
464 {
465 Assert(session_replication_state != NULL);
466 Assert(session_replication_state->roident != InvalidRepOriginId);
467
468 if (session_replication_state->local_lsn < local_commit)
469 session_replication_state->local_lsn = local_commit;
470 if (session_replication_state->remote_lsn < remote_commit)
471 session_replication_state->remote_lsn = remote_commit;
472
473 replorigin_advance(session_replication_state->roident, remote_commit,
474 local_commit, false, true);
475 }
476
477 static void
session_origin_xact_cb(XactEvent event,void * arg)478 session_origin_xact_cb(XactEvent event, void *arg)
479 {
480 if (event == XACT_EVENT_PRE_COMMIT &&
481 session_replication_state != NULL &&
482 replorigin_session_origin != InvalidRepOriginId &&
483 replorigin_session_origin != DoNotReplicateId)
484 {
485 replorigin_session_advance(replorigin_session_origin_lsn,
486 XactLastCommitEnd);
487 }
488 }
489
490 static void
ensure_replication_origin_relid(void)491 ensure_replication_origin_relid(void)
492 {
493 if (ReplicationOriginRelationId == InvalidOid)
494 {
495 Oid schema_oid = get_namespace_oid("pglogical_origin", true);
496
497 if (schema_oid == InvalidOid)
498 ereport(ERROR,
499 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
500 errmsg("pglogical_origin extension not found")));
501
502 ReplicationOriginRelationId =
503 get_relname_relid("replication_origin", schema_oid);
504 ReplicationOriginIdentIndex =
505 get_relname_relid("replication_origin_roident_index", schema_oid);
506 ReplicationOriginNameIndex =
507 get_relname_relid("replication_origin_roname_index", schema_oid);
508 }
509 }
510
511 /*
512 * Connect background worker to a database using OIDs.
513 */
514 void
BackgroundWorkerInitializeConnectionByOid(Oid dboid,Oid useroid)515 BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
516 {
517 BackgroundWorker *worker = MyBgworkerEntry;
518
519 /* XXX is this the right errcode? */
520 if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
521 ereport(FATAL,
522 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
523 errmsg("database connection requirement not indicated during registration")));
524
525 InitPostgres(NULL, dboid, NULL, NULL);
526
527 /* it had better not gotten out of "init" mode yet */
528 if (!IsInitProcessingMode())
529 ereport(ERROR,
530 (errmsg("invalid processing mode in background worker")));
531 SetProcessingMode(NormalProcessing);
532 }
533
534 bool
TransactionIdGetCommitTsData(TransactionId xid,TimestampTz * ts,RepOriginId * nodeid)535 TransactionIdGetCommitTsData(TransactionId xid,
536 TimestampTz *ts, RepOriginId *nodeid)
537 {
538 elog(ERROR, "TransactionIdGetCommitTsData is not implemented yet");
539 return false;
540 }
541
542
543 /*
544 * Auxiliary function to return a TEXT array out of a list of C-strings.
545 */
546 ArrayType *
strlist_to_textarray(List * list)547 strlist_to_textarray(List *list)
548 {
549 ArrayType *arr;
550 Datum *datums;
551 int j = 0;
552 ListCell *cell;
553 MemoryContext memcxt;
554 MemoryContext oldcxt;
555
556 memcxt = AllocSetContextCreate(CurrentMemoryContext,
557 "strlist to array",
558 ALLOCSET_DEFAULT_MINSIZE,
559 ALLOCSET_DEFAULT_INITSIZE,
560 ALLOCSET_DEFAULT_MAXSIZE);
561 oldcxt = MemoryContextSwitchTo(memcxt);
562
563 datums = palloc(sizeof(text *) * list_length(list));
564 foreach(cell, list)
565 {
566 char *name = lfirst(cell);
567
568 datums[j++] = CStringGetTextDatum(name);
569 }
570
571 MemoryContextSwitchTo(oldcxt);
572
573 arr = construct_array(datums, list_length(list),
574 TEXTOID, -1, false, 'i');
575 MemoryContextDelete(memcxt);
576
577 return arr;
578 }
579
580
581 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)582 GetNamedLWLockTranche(const char *tranche_name)
583 {
584 LWLock *lock = LWLockAssign();
585
586 return (LWLockPadded *)lock;
587 }
588
589 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)590 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
591 {
592 Assert(num_lwlocks == 1);
593
594 RequestAddinLWLocks(num_lwlocks);
595 }
596
597 /*
598 * CatalogTupleInsert - do heap and indexing work for a new catalog tuple
599 *
600 * Insert the tuple data in "tup" into the specified catalog relation.
601 * The Oid of the inserted tuple is returned.
602 *
603 * This is a convenience routine for the common case of inserting a single
604 * tuple in a system catalog; it inserts a new heap tuple, keeping indexes
605 * current. Avoid using it for multiple tuples, since opening the indexes
606 * and building the index info structures is moderately expensive.
607 * (Use CatalogTupleInsertWithInfo in such cases.)
608 */
609 Oid
CatalogTupleInsert(Relation heapRel,HeapTuple tup)610 CatalogTupleInsert(Relation heapRel, HeapTuple tup)
611 {
612 CatalogIndexState indstate;
613 Oid oid;
614
615 indstate = CatalogOpenIndexes(heapRel);
616
617 oid = simple_heap_insert(heapRel, tup);
618
619 CatalogIndexInsert(indstate, tup);
620 CatalogCloseIndexes(indstate);
621
622 return oid;
623 }
624
625 /*
626 * CatalogTupleUpdate - do heap and indexing work for updating a catalog tuple
627 *
628 * Update the tuple identified by "otid", replacing it with the data in "tup".
629 *
630 * This is a convenience routine for the common case of updating a single
631 * tuple in a system catalog; it updates one heap tuple, keeping indexes
632 * current. Avoid using it for multiple tuples, since opening the indexes
633 * and building the index info structures is moderately expensive.
634 * (Use CatalogTupleUpdateWithInfo in such cases.)
635 */
636 void
CatalogTupleUpdate(Relation heapRel,ItemPointer otid,HeapTuple tup)637 CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
638 {
639 CatalogIndexState indstate;
640
641 indstate = CatalogOpenIndexes(heapRel);
642
643 simple_heap_update(heapRel, otid, tup);
644
645 CatalogIndexInsert(indstate, tup);
646 CatalogCloseIndexes(indstate);
647 }
648
649
650 /*
651 * CatalogTupleDelete - do heap and indexing work for deleting a catalog tuple
652 *
653 * Delete the tuple identified by "tid" in the specified catalog.
654 *
655 * With Postgres heaps, there is no index work to do at deletion time;
656 * cleanup will be done later by VACUUM. However, callers of this function
657 * shouldn't have to know that; we'd like a uniform abstraction for all
658 * catalog tuple changes. Hence, provide this currently-trivial wrapper.
659 *
660 * The abstraction is a bit leaky in that we don't provide an optimized
661 * CatalogTupleDeleteWithInfo version, because there is currently nothing to
662 * optimize. If we ever need that, rather than touching a lot of call sites,
663 * it might be better to do something about caching CatalogIndexState.
664 */
665 void
CatalogTupleDelete(Relation heapRel,ItemPointer tid)666 CatalogTupleDelete(Relation heapRel, ItemPointer tid)
667 {
668 simple_heap_delete(heapRel, tid);
669 }
670