1 /*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/pg_lsn.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
36
37 static List *textarray_to_stringlist(ArrayType *textarray);
38
39 /*
40 * Fetch the subscription from the syscache.
41 */
42 Subscription *
GetSubscription(Oid subid,bool missing_ok)43 GetSubscription(Oid subid, bool missing_ok)
44 {
45 HeapTuple tup;
46 Subscription *sub;
47 Form_pg_subscription subform;
48 Datum datum;
49 bool isnull;
50
51 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
52
53 if (!HeapTupleIsValid(tup))
54 {
55 if (missing_ok)
56 return NULL;
57
58 elog(ERROR, "cache lookup failed for subscription %u", subid);
59 }
60
61 subform = (Form_pg_subscription) GETSTRUCT(tup);
62
63 sub = (Subscription *) palloc(sizeof(Subscription));
64 sub->oid = subid;
65 sub->dbid = subform->subdbid;
66 sub->name = pstrdup(NameStr(subform->subname));
67 sub->owner = subform->subowner;
68 sub->enabled = subform->subenabled;
69 sub->binary = subform->subbinary;
70 sub->stream = subform->substream;
71
72 /* Get conninfo */
73 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
74 tup,
75 Anum_pg_subscription_subconninfo,
76 &isnull);
77 Assert(!isnull);
78 sub->conninfo = TextDatumGetCString(datum);
79
80 /* Get slotname */
81 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
82 tup,
83 Anum_pg_subscription_subslotname,
84 &isnull);
85 if (!isnull)
86 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
87 else
88 sub->slotname = NULL;
89
90 /* Get synccommit */
91 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
92 tup,
93 Anum_pg_subscription_subsynccommit,
94 &isnull);
95 Assert(!isnull);
96 sub->synccommit = TextDatumGetCString(datum);
97
98 /* Get publications */
99 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
100 tup,
101 Anum_pg_subscription_subpublications,
102 &isnull);
103 Assert(!isnull);
104 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
105
106 ReleaseSysCache(tup);
107
108 return sub;
109 }
110
111 /*
112 * Return number of subscriptions defined in given database.
113 * Used by dropdb() to check if database can indeed be dropped.
114 */
115 int
CountDBSubscriptions(Oid dbid)116 CountDBSubscriptions(Oid dbid)
117 {
118 int nsubs = 0;
119 Relation rel;
120 ScanKeyData scankey;
121 SysScanDesc scan;
122 HeapTuple tup;
123
124 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
125
126 ScanKeyInit(&scankey,
127 Anum_pg_subscription_subdbid,
128 BTEqualStrategyNumber, F_OIDEQ,
129 ObjectIdGetDatum(dbid));
130
131 scan = systable_beginscan(rel, InvalidOid, false,
132 NULL, 1, &scankey);
133
134 while (HeapTupleIsValid(tup = systable_getnext(scan)))
135 nsubs++;
136
137 systable_endscan(scan);
138
139 table_close(rel, NoLock);
140
141 return nsubs;
142 }
143
144 /*
145 * Free memory allocated by subscription struct.
146 */
147 void
FreeSubscription(Subscription * sub)148 FreeSubscription(Subscription *sub)
149 {
150 pfree(sub->name);
151 pfree(sub->conninfo);
152 if (sub->slotname)
153 pfree(sub->slotname);
154 list_free_deep(sub->publications);
155 pfree(sub);
156 }
157
158 /*
159 * get_subscription_oid - given a subscription name, look up the OID
160 *
161 * If missing_ok is false, throw an error if name not found. If true, just
162 * return InvalidOid.
163 */
164 Oid
get_subscription_oid(const char * subname,bool missing_ok)165 get_subscription_oid(const char *subname, bool missing_ok)
166 {
167 Oid oid;
168
169 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
170 MyDatabaseId, CStringGetDatum(subname));
171 if (!OidIsValid(oid) && !missing_ok)
172 ereport(ERROR,
173 (errcode(ERRCODE_UNDEFINED_OBJECT),
174 errmsg("subscription \"%s\" does not exist", subname)));
175 return oid;
176 }
177
178 /*
179 * get_subscription_name - given a subscription OID, look up the name
180 *
181 * If missing_ok is false, throw an error if name not found. If true, just
182 * return NULL.
183 */
184 char *
get_subscription_name(Oid subid,bool missing_ok)185 get_subscription_name(Oid subid, bool missing_ok)
186 {
187 HeapTuple tup;
188 char *subname;
189 Form_pg_subscription subform;
190
191 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
192
193 if (!HeapTupleIsValid(tup))
194 {
195 if (!missing_ok)
196 elog(ERROR, "cache lookup failed for subscription %u", subid);
197 return NULL;
198 }
199
200 subform = (Form_pg_subscription) GETSTRUCT(tup);
201 subname = pstrdup(NameStr(subform->subname));
202
203 ReleaseSysCache(tup);
204
205 return subname;
206 }
207
208 /*
209 * Convert text array to list of strings.
210 *
211 * Note: the resulting list of strings is pallocated here.
212 */
213 static List *
textarray_to_stringlist(ArrayType * textarray)214 textarray_to_stringlist(ArrayType *textarray)
215 {
216 Datum *elems;
217 int nelems,
218 i;
219 List *res = NIL;
220
221 deconstruct_array(textarray,
222 TEXTOID, -1, false, TYPALIGN_INT,
223 &elems, NULL, &nelems);
224
225 if (nelems == 0)
226 return NIL;
227
228 for (i = 0; i < nelems; i++)
229 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
230
231 return res;
232 }
233
234 /*
235 * Add new state record for a subscription table.
236 */
237 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)238 AddSubscriptionRelState(Oid subid, Oid relid, char state,
239 XLogRecPtr sublsn)
240 {
241 Relation rel;
242 HeapTuple tup;
243 bool nulls[Natts_pg_subscription_rel];
244 Datum values[Natts_pg_subscription_rel];
245
246 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
247
248 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
249
250 /* Try finding existing mapping. */
251 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
252 ObjectIdGetDatum(relid),
253 ObjectIdGetDatum(subid));
254 if (HeapTupleIsValid(tup))
255 elog(ERROR, "subscription table %u in subscription %u already exists",
256 relid, subid);
257
258 /* Form the tuple. */
259 memset(values, 0, sizeof(values));
260 memset(nulls, false, sizeof(nulls));
261 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
262 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
263 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
264 if (sublsn != InvalidXLogRecPtr)
265 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
266 else
267 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
268
269 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
270
271 /* Insert tuple into catalog. */
272 CatalogTupleInsert(rel, tup);
273
274 heap_freetuple(tup);
275
276 /* Cleanup. */
277 table_close(rel, NoLock);
278 }
279
280 /*
281 * Update the state of a subscription table.
282 */
283 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)284 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
285 XLogRecPtr sublsn)
286 {
287 Relation rel;
288 HeapTuple tup;
289 bool nulls[Natts_pg_subscription_rel];
290 Datum values[Natts_pg_subscription_rel];
291 bool replaces[Natts_pg_subscription_rel];
292
293 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
294
295 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
296
297 /* Try finding existing mapping. */
298 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
299 ObjectIdGetDatum(relid),
300 ObjectIdGetDatum(subid));
301 if (!HeapTupleIsValid(tup))
302 elog(ERROR, "subscription table %u in subscription %u does not exist",
303 relid, subid);
304
305 /* Update the tuple. */
306 memset(values, 0, sizeof(values));
307 memset(nulls, false, sizeof(nulls));
308 memset(replaces, false, sizeof(replaces));
309
310 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
311 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
312
313 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
314 if (sublsn != InvalidXLogRecPtr)
315 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
316 else
317 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
318
319 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
320 replaces);
321
322 /* Update the catalog. */
323 CatalogTupleUpdate(rel, &tup->t_self, tup);
324
325 /* Cleanup. */
326 table_close(rel, NoLock);
327 }
328
329 /*
330 * Get state of subscription table.
331 *
332 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
333 */
334 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn)335 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
336 {
337 HeapTuple tup;
338 char substate;
339 bool isnull;
340 Datum d;
341 Relation rel;
342
343 /*
344 * This is to avoid the race condition with AlterSubscription which tries
345 * to remove this relstate.
346 */
347 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
348
349 /* Try finding the mapping. */
350 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 ObjectIdGetDatum(relid),
352 ObjectIdGetDatum(subid));
353
354 if (!HeapTupleIsValid(tup))
355 {
356 table_close(rel, AccessShareLock);
357 *sublsn = InvalidXLogRecPtr;
358 return SUBREL_STATE_UNKNOWN;
359 }
360
361 /* Get the state. */
362 substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
363
364 /* Get the LSN */
365 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
366 Anum_pg_subscription_rel_srsublsn, &isnull);
367 if (isnull)
368 *sublsn = InvalidXLogRecPtr;
369 else
370 *sublsn = DatumGetLSN(d);
371
372 /* Cleanup */
373 ReleaseSysCache(tup);
374
375 table_close(rel, AccessShareLock);
376
377 return substate;
378 }
379
380 /*
381 * Drop subscription relation mapping. These can be for a particular
382 * subscription, or for a particular relation, or both.
383 */
384 void
RemoveSubscriptionRel(Oid subid,Oid relid)385 RemoveSubscriptionRel(Oid subid, Oid relid)
386 {
387 Relation rel;
388 TableScanDesc scan;
389 ScanKeyData skey[2];
390 HeapTuple tup;
391 int nkeys = 0;
392
393 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
394
395 if (OidIsValid(subid))
396 {
397 ScanKeyInit(&skey[nkeys++],
398 Anum_pg_subscription_rel_srsubid,
399 BTEqualStrategyNumber,
400 F_OIDEQ,
401 ObjectIdGetDatum(subid));
402 }
403
404 if (OidIsValid(relid))
405 {
406 ScanKeyInit(&skey[nkeys++],
407 Anum_pg_subscription_rel_srrelid,
408 BTEqualStrategyNumber,
409 F_OIDEQ,
410 ObjectIdGetDatum(relid));
411 }
412
413 /* Do the search and delete what we found. */
414 scan = table_beginscan_catalog(rel, nkeys, skey);
415 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
416 {
417 Form_pg_subscription_rel subrel;
418
419 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
420
421 /*
422 * We don't allow to drop the relation mapping when the table
423 * synchronization is in progress unless the caller updates the
424 * corresponding subscription as well. This is to ensure that we don't
425 * leave tablesync slots or origins in the system when the
426 * corresponding table is dropped.
427 */
428 if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
429 {
430 ereport(ERROR,
431 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 errmsg("could not drop relation mapping for subscription \"%s\"",
433 get_subscription_name(subrel->srsubid, false)),
434 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
435 get_rel_name(relid), subrel->srsubstate),
436
437 /*
438 * translator: first %s is a SQL ALTER command and second %s is a
439 * SQL DROP command
440 */
441 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
442 "ALTER SUBSCRIPTION ... ENABLE",
443 "DROP SUBSCRIPTION ...")));
444 }
445
446 CatalogTupleDelete(rel, &tup->t_self);
447 }
448 table_endscan(scan);
449
450 table_close(rel, RowExclusiveLock);
451 }
452
453
454 /*
455 * Get all relations for subscription.
456 *
457 * Returned list is palloc'ed in current memory context.
458 */
459 List *
GetSubscriptionRelations(Oid subid)460 GetSubscriptionRelations(Oid subid)
461 {
462 List *res = NIL;
463 Relation rel;
464 HeapTuple tup;
465 ScanKeyData skey[1];
466 SysScanDesc scan;
467
468 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
469
470 ScanKeyInit(&skey[0],
471 Anum_pg_subscription_rel_srsubid,
472 BTEqualStrategyNumber, F_OIDEQ,
473 ObjectIdGetDatum(subid));
474
475 scan = systable_beginscan(rel, InvalidOid, false,
476 NULL, 1, skey);
477
478 while (HeapTupleIsValid(tup = systable_getnext(scan)))
479 {
480 Form_pg_subscription_rel subrel;
481 SubscriptionRelState *relstate;
482 Datum d;
483 bool isnull;
484
485 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
486
487 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
488 relstate->relid = subrel->srrelid;
489 relstate->state = subrel->srsubstate;
490 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
491 Anum_pg_subscription_rel_srsublsn, &isnull);
492 if (isnull)
493 relstate->lsn = InvalidXLogRecPtr;
494 else
495 relstate->lsn = DatumGetLSN(d);
496
497 res = lappend(res, relstate);
498 }
499
500 /* Cleanup */
501 systable_endscan(scan);
502 table_close(rel, AccessShareLock);
503
504 return res;
505 }
506
507 /*
508 * Get all relations for subscription that are not in a ready state.
509 *
510 * Returned list is palloc'ed in current memory context.
511 */
512 List *
GetSubscriptionNotReadyRelations(Oid subid)513 GetSubscriptionNotReadyRelations(Oid subid)
514 {
515 List *res = NIL;
516 Relation rel;
517 HeapTuple tup;
518 int nkeys = 0;
519 ScanKeyData skey[2];
520 SysScanDesc scan;
521
522 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
523
524 ScanKeyInit(&skey[nkeys++],
525 Anum_pg_subscription_rel_srsubid,
526 BTEqualStrategyNumber, F_OIDEQ,
527 ObjectIdGetDatum(subid));
528
529 ScanKeyInit(&skey[nkeys++],
530 Anum_pg_subscription_rel_srsubstate,
531 BTEqualStrategyNumber, F_CHARNE,
532 CharGetDatum(SUBREL_STATE_READY));
533
534 scan = systable_beginscan(rel, InvalidOid, false,
535 NULL, nkeys, skey);
536
537 while (HeapTupleIsValid(tup = systable_getnext(scan)))
538 {
539 Form_pg_subscription_rel subrel;
540 SubscriptionRelState *relstate;
541 Datum d;
542 bool isnull;
543
544 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
545
546 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
547 relstate->relid = subrel->srrelid;
548 relstate->state = subrel->srsubstate;
549 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
550 Anum_pg_subscription_rel_srsublsn, &isnull);
551 if (isnull)
552 relstate->lsn = InvalidXLogRecPtr;
553 else
554 relstate->lsn = DatumGetLSN(d);
555
556 res = lappend(res, relstate);
557 }
558
559 /* Cleanup */
560 systable_endscan(scan);
561 table_close(rel, AccessShareLock);
562
563 return res;
564 }
565