1 /*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/pg_lsn.h"
33 #include "utils/rel.h"
34 #include "utils/syscache.h"
35
36 static List *textarray_to_stringlist(ArrayType *textarray);
37
38 /*
39 * Fetch the subscription from the syscache.
40 */
41 Subscription *
GetSubscription(Oid subid,bool missing_ok)42 GetSubscription(Oid subid, bool missing_ok)
43 {
44 HeapTuple tup;
45 Subscription *sub;
46 Form_pg_subscription subform;
47 Datum datum;
48 bool isnull;
49
50 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
51
52 if (!HeapTupleIsValid(tup))
53 {
54 if (missing_ok)
55 return NULL;
56
57 elog(ERROR, "cache lookup failed for subscription %u", subid);
58 }
59
60 subform = (Form_pg_subscription) GETSTRUCT(tup);
61
62 sub = (Subscription *) palloc(sizeof(Subscription));
63 sub->oid = subid;
64 sub->dbid = subform->subdbid;
65 sub->name = pstrdup(NameStr(subform->subname));
66 sub->owner = subform->subowner;
67 sub->enabled = subform->subenabled;
68
69 /* Get conninfo */
70 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
71 tup,
72 Anum_pg_subscription_subconninfo,
73 &isnull);
74 Assert(!isnull);
75 sub->conninfo = TextDatumGetCString(datum);
76
77 /* Get slotname */
78 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
79 tup,
80 Anum_pg_subscription_subslotname,
81 &isnull);
82 if (!isnull)
83 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
84 else
85 sub->slotname = NULL;
86
87 /* Get synccommit */
88 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
89 tup,
90 Anum_pg_subscription_subsynccommit,
91 &isnull);
92 Assert(!isnull);
93 sub->synccommit = TextDatumGetCString(datum);
94
95 /* Get publications */
96 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
97 tup,
98 Anum_pg_subscription_subpublications,
99 &isnull);
100 Assert(!isnull);
101 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
102
103 ReleaseSysCache(tup);
104
105 return sub;
106 }
107
108 /*
109 * Return number of subscriptions defined in given database.
110 * Used by dropdb() to check if database can indeed be dropped.
111 */
112 int
CountDBSubscriptions(Oid dbid)113 CountDBSubscriptions(Oid dbid)
114 {
115 int nsubs = 0;
116 Relation rel;
117 ScanKeyData scankey;
118 SysScanDesc scan;
119 HeapTuple tup;
120
121 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
122
123 ScanKeyInit(&scankey,
124 Anum_pg_subscription_subdbid,
125 BTEqualStrategyNumber, F_OIDEQ,
126 ObjectIdGetDatum(dbid));
127
128 scan = systable_beginscan(rel, InvalidOid, false,
129 NULL, 1, &scankey);
130
131 while (HeapTupleIsValid(tup = systable_getnext(scan)))
132 nsubs++;
133
134 systable_endscan(scan);
135
136 table_close(rel, NoLock);
137
138 return nsubs;
139 }
140
141 /*
142 * Free memory allocated by subscription struct.
143 */
144 void
FreeSubscription(Subscription * sub)145 FreeSubscription(Subscription *sub)
146 {
147 pfree(sub->name);
148 pfree(sub->conninfo);
149 if (sub->slotname)
150 pfree(sub->slotname);
151 list_free_deep(sub->publications);
152 pfree(sub);
153 }
154
155 /*
156 * get_subscription_oid - given a subscription name, look up the OID
157 *
158 * If missing_ok is false, throw an error if name not found. If true, just
159 * return InvalidOid.
160 */
161 Oid
get_subscription_oid(const char * subname,bool missing_ok)162 get_subscription_oid(const char *subname, bool missing_ok)
163 {
164 Oid oid;
165
166 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
167 MyDatabaseId, CStringGetDatum(subname));
168 if (!OidIsValid(oid) && !missing_ok)
169 ereport(ERROR,
170 (errcode(ERRCODE_UNDEFINED_OBJECT),
171 errmsg("subscription \"%s\" does not exist", subname)));
172 return oid;
173 }
174
175 /*
176 * get_subscription_name - given a subscription OID, look up the name
177 *
178 * If missing_ok is false, throw an error if name not found. If true, just
179 * return NULL.
180 */
181 char *
get_subscription_name(Oid subid,bool missing_ok)182 get_subscription_name(Oid subid, bool missing_ok)
183 {
184 HeapTuple tup;
185 char *subname;
186 Form_pg_subscription subform;
187
188 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
189
190 if (!HeapTupleIsValid(tup))
191 {
192 if (!missing_ok)
193 elog(ERROR, "cache lookup failed for subscription %u", subid);
194 return NULL;
195 }
196
197 subform = (Form_pg_subscription) GETSTRUCT(tup);
198 subname = pstrdup(NameStr(subform->subname));
199
200 ReleaseSysCache(tup);
201
202 return subname;
203 }
204
205 /*
206 * Convert text array to list of strings.
207 *
208 * Note: the resulting list of strings is pallocated here.
209 */
210 static List *
textarray_to_stringlist(ArrayType * textarray)211 textarray_to_stringlist(ArrayType *textarray)
212 {
213 Datum *elems;
214 int nelems,
215 i;
216 List *res = NIL;
217
218 deconstruct_array(textarray,
219 TEXTOID, -1, false, TYPALIGN_INT,
220 &elems, NULL, &nelems);
221
222 if (nelems == 0)
223 return NIL;
224
225 for (i = 0; i < nelems; i++)
226 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
227
228 return res;
229 }
230
231 /*
232 * Add new state record for a subscription table.
233 */
234 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)235 AddSubscriptionRelState(Oid subid, Oid relid, char state,
236 XLogRecPtr sublsn)
237 {
238 Relation rel;
239 HeapTuple tup;
240 bool nulls[Natts_pg_subscription_rel];
241 Datum values[Natts_pg_subscription_rel];
242
243 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
244
245 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
246
247 /* Try finding existing mapping. */
248 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
249 ObjectIdGetDatum(relid),
250 ObjectIdGetDatum(subid));
251 if (HeapTupleIsValid(tup))
252 elog(ERROR, "subscription table %u in subscription %u already exists",
253 relid, subid);
254
255 /* Form the tuple. */
256 memset(values, 0, sizeof(values));
257 memset(nulls, false, sizeof(nulls));
258 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
259 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
260 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
261 if (sublsn != InvalidXLogRecPtr)
262 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
263 else
264 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
265
266 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
267
268 /* Insert tuple into catalog. */
269 CatalogTupleInsert(rel, tup);
270
271 heap_freetuple(tup);
272
273 /* Cleanup. */
274 table_close(rel, NoLock);
275 }
276
277 /*
278 * Update the state of a subscription table.
279 */
280 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)281 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
282 XLogRecPtr sublsn)
283 {
284 Relation rel;
285 HeapTuple tup;
286 bool nulls[Natts_pg_subscription_rel];
287 Datum values[Natts_pg_subscription_rel];
288 bool replaces[Natts_pg_subscription_rel];
289
290 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
291
292 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
293
294 /* Try finding existing mapping. */
295 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
296 ObjectIdGetDatum(relid),
297 ObjectIdGetDatum(subid));
298 if (!HeapTupleIsValid(tup))
299 elog(ERROR, "subscription table %u in subscription %u does not exist",
300 relid, subid);
301
302 /* Update the tuple. */
303 memset(values, 0, sizeof(values));
304 memset(nulls, false, sizeof(nulls));
305 memset(replaces, false, sizeof(replaces));
306
307 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
308 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
309
310 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
311 if (sublsn != InvalidXLogRecPtr)
312 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
313 else
314 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
315
316 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
317 replaces);
318
319 /* Update the catalog. */
320 CatalogTupleUpdate(rel, &tup->t_self, tup);
321
322 /* Cleanup. */
323 table_close(rel, NoLock);
324 }
325
326 /*
327 * Get state of subscription table.
328 *
329 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
330 */
331 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)332 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
333 bool missing_ok)
334 {
335 Relation rel;
336 HeapTuple tup;
337 char substate;
338 bool isnull;
339 Datum d;
340
341 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
342
343 /* Try finding the mapping. */
344 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
345 ObjectIdGetDatum(relid),
346 ObjectIdGetDatum(subid));
347
348 if (!HeapTupleIsValid(tup))
349 {
350 if (missing_ok)
351 {
352 table_close(rel, AccessShareLock);
353 *sublsn = InvalidXLogRecPtr;
354 return SUBREL_STATE_UNKNOWN;
355 }
356
357 elog(ERROR, "subscription table %u in subscription %u does not exist",
358 relid, subid);
359 }
360
361 /* Get the state. */
362 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
363 Anum_pg_subscription_rel_srsubstate, &isnull);
364 Assert(!isnull);
365 substate = DatumGetChar(d);
366 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
367 Anum_pg_subscription_rel_srsublsn, &isnull);
368 if (isnull)
369 *sublsn = InvalidXLogRecPtr;
370 else
371 *sublsn = DatumGetLSN(d);
372
373 /* Cleanup */
374 ReleaseSysCache(tup);
375 table_close(rel, AccessShareLock);
376
377 return substate;
378 }
379
380 /*
381 * Drop subscription relation mapping. These can be for a particular
382 * subscription, or for a particular relation, or both.
383 */
384 void
RemoveSubscriptionRel(Oid subid,Oid relid)385 RemoveSubscriptionRel(Oid subid, Oid relid)
386 {
387 Relation rel;
388 TableScanDesc scan;
389 ScanKeyData skey[2];
390 HeapTuple tup;
391 int nkeys = 0;
392
393 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
394
395 if (OidIsValid(subid))
396 {
397 ScanKeyInit(&skey[nkeys++],
398 Anum_pg_subscription_rel_srsubid,
399 BTEqualStrategyNumber,
400 F_OIDEQ,
401 ObjectIdGetDatum(subid));
402 }
403
404 if (OidIsValid(relid))
405 {
406 ScanKeyInit(&skey[nkeys++],
407 Anum_pg_subscription_rel_srrelid,
408 BTEqualStrategyNumber,
409 F_OIDEQ,
410 ObjectIdGetDatum(relid));
411 }
412
413 /* Do the search and delete what we found. */
414 scan = table_beginscan_catalog(rel, nkeys, skey);
415 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
416 {
417 CatalogTupleDelete(rel, &tup->t_self);
418 }
419 table_endscan(scan);
420
421 table_close(rel, RowExclusiveLock);
422 }
423
424
425 /*
426 * Get all relations for subscription.
427 *
428 * Returned list is palloc'ed in current memory context.
429 */
430 List *
GetSubscriptionRelations(Oid subid)431 GetSubscriptionRelations(Oid subid)
432 {
433 List *res = NIL;
434 Relation rel;
435 HeapTuple tup;
436 int nkeys = 0;
437 ScanKeyData skey[2];
438 SysScanDesc scan;
439
440 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
441
442 ScanKeyInit(&skey[nkeys++],
443 Anum_pg_subscription_rel_srsubid,
444 BTEqualStrategyNumber, F_OIDEQ,
445 ObjectIdGetDatum(subid));
446
447 scan = systable_beginscan(rel, InvalidOid, false,
448 NULL, nkeys, skey);
449
450 while (HeapTupleIsValid(tup = systable_getnext(scan)))
451 {
452 Form_pg_subscription_rel subrel;
453 SubscriptionRelState *relstate;
454 Datum d;
455 bool isnull;
456
457 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
458
459 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
460 relstate->relid = subrel->srrelid;
461 relstate->state = subrel->srsubstate;
462 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
463 Anum_pg_subscription_rel_srsublsn, &isnull);
464 if (isnull)
465 relstate->lsn = InvalidXLogRecPtr;
466 else
467 relstate->lsn = DatumGetLSN(d);
468
469 res = lappend(res, relstate);
470 }
471
472 /* Cleanup */
473 systable_endscan(scan);
474 table_close(rel, AccessShareLock);
475
476 return res;
477 }
478
479 /*
480 * Get all relations for subscription that are not in a ready state.
481 *
482 * Returned list is palloc'ed in current memory context.
483 */
484 List *
GetSubscriptionNotReadyRelations(Oid subid)485 GetSubscriptionNotReadyRelations(Oid subid)
486 {
487 List *res = NIL;
488 Relation rel;
489 HeapTuple tup;
490 int nkeys = 0;
491 ScanKeyData skey[2];
492 SysScanDesc scan;
493
494 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
495
496 ScanKeyInit(&skey[nkeys++],
497 Anum_pg_subscription_rel_srsubid,
498 BTEqualStrategyNumber, F_OIDEQ,
499 ObjectIdGetDatum(subid));
500
501 ScanKeyInit(&skey[nkeys++],
502 Anum_pg_subscription_rel_srsubstate,
503 BTEqualStrategyNumber, F_CHARNE,
504 CharGetDatum(SUBREL_STATE_READY));
505
506 scan = systable_beginscan(rel, InvalidOid, false,
507 NULL, nkeys, skey);
508
509 while (HeapTupleIsValid(tup = systable_getnext(scan)))
510 {
511 Form_pg_subscription_rel subrel;
512 SubscriptionRelState *relstate;
513 Datum d;
514 bool isnull;
515
516 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
517
518 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
519 relstate->relid = subrel->srrelid;
520 relstate->state = subrel->srsubstate;
521 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
522 Anum_pg_subscription_rel_srsublsn, &isnull);
523 if (isnull)
524 relstate->lsn = InvalidXLogRecPtr;
525 else
526 relstate->lsn = DatumGetLSN(d);
527
528 res = lappend(res, relstate);
529 }
530
531 /* Cleanup */
532 systable_endscan(scan);
533 table_close(rel, AccessShareLock);
534
535 return res;
536 }
537