1 /*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/tableam.h"
23 #include "access/xact.h"
24
25 #include "catalog/indexing.h"
26 #include "catalog/pg_type.h"
27 #include "catalog/pg_subscription.h"
28 #include "catalog/pg_subscription_rel.h"
29
30 #include "nodes/makefuncs.h"
31
32 #include "storage/lmgr.h"
33
34 #include "utils/array.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/pg_lsn.h"
38 #include "utils/rel.h"
39 #include "utils/syscache.h"
40
41
42 static List *textarray_to_stringlist(ArrayType *textarray);
43
44 /*
45 * Fetch the subscription from the syscache.
46 */
47 Subscription *
GetSubscription(Oid subid,bool missing_ok)48 GetSubscription(Oid subid, bool missing_ok)
49 {
50 HeapTuple tup;
51 Subscription *sub;
52 Form_pg_subscription subform;
53 Datum datum;
54 bool isnull;
55
56 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
57
58 if (!HeapTupleIsValid(tup))
59 {
60 if (missing_ok)
61 return NULL;
62
63 elog(ERROR, "cache lookup failed for subscription %u", subid);
64 }
65
66 subform = (Form_pg_subscription) GETSTRUCT(tup);
67
68 sub = (Subscription *) palloc(sizeof(Subscription));
69 sub->oid = subid;
70 sub->dbid = subform->subdbid;
71 sub->name = pstrdup(NameStr(subform->subname));
72 sub->owner = subform->subowner;
73 sub->enabled = subform->subenabled;
74
75 /* Get conninfo */
76 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
77 tup,
78 Anum_pg_subscription_subconninfo,
79 &isnull);
80 Assert(!isnull);
81 sub->conninfo = TextDatumGetCString(datum);
82
83 /* Get slotname */
84 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
85 tup,
86 Anum_pg_subscription_subslotname,
87 &isnull);
88 if (!isnull)
89 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90 else
91 sub->slotname = NULL;
92
93 /* Get synccommit */
94 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
95 tup,
96 Anum_pg_subscription_subsynccommit,
97 &isnull);
98 Assert(!isnull);
99 sub->synccommit = TextDatumGetCString(datum);
100
101 /* Get publications */
102 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
103 tup,
104 Anum_pg_subscription_subpublications,
105 &isnull);
106 Assert(!isnull);
107 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
108
109 ReleaseSysCache(tup);
110
111 return sub;
112 }
113
114 /*
115 * Return number of subscriptions defined in given database.
116 * Used by dropdb() to check if database can indeed be dropped.
117 */
118 int
CountDBSubscriptions(Oid dbid)119 CountDBSubscriptions(Oid dbid)
120 {
121 int nsubs = 0;
122 Relation rel;
123 ScanKeyData scankey;
124 SysScanDesc scan;
125 HeapTuple tup;
126
127 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
128
129 ScanKeyInit(&scankey,
130 Anum_pg_subscription_subdbid,
131 BTEqualStrategyNumber, F_OIDEQ,
132 ObjectIdGetDatum(dbid));
133
134 scan = systable_beginscan(rel, InvalidOid, false,
135 NULL, 1, &scankey);
136
137 while (HeapTupleIsValid(tup = systable_getnext(scan)))
138 nsubs++;
139
140 systable_endscan(scan);
141
142 table_close(rel, NoLock);
143
144 return nsubs;
145 }
146
147 /*
148 * Free memory allocated by subscription struct.
149 */
150 void
FreeSubscription(Subscription * sub)151 FreeSubscription(Subscription *sub)
152 {
153 pfree(sub->name);
154 pfree(sub->conninfo);
155 if (sub->slotname)
156 pfree(sub->slotname);
157 list_free_deep(sub->publications);
158 pfree(sub);
159 }
160
161 /*
162 * get_subscription_oid - given a subscription name, look up the OID
163 *
164 * If missing_ok is false, throw an error if name not found. If true, just
165 * return InvalidOid.
166 */
167 Oid
get_subscription_oid(const char * subname,bool missing_ok)168 get_subscription_oid(const char *subname, bool missing_ok)
169 {
170 Oid oid;
171
172 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
173 MyDatabaseId, CStringGetDatum(subname));
174 if (!OidIsValid(oid) && !missing_ok)
175 ereport(ERROR,
176 (errcode(ERRCODE_UNDEFINED_OBJECT),
177 errmsg("subscription \"%s\" does not exist", subname)));
178 return oid;
179 }
180
181 /*
182 * get_subscription_name - given a subscription OID, look up the name
183 *
184 * If missing_ok is false, throw an error if name not found. If true, just
185 * return NULL.
186 */
187 char *
get_subscription_name(Oid subid,bool missing_ok)188 get_subscription_name(Oid subid, bool missing_ok)
189 {
190 HeapTuple tup;
191 char *subname;
192 Form_pg_subscription subform;
193
194 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
195
196 if (!HeapTupleIsValid(tup))
197 {
198 if (!missing_ok)
199 elog(ERROR, "cache lookup failed for subscription %u", subid);
200 return NULL;
201 }
202
203 subform = (Form_pg_subscription) GETSTRUCT(tup);
204 subname = pstrdup(NameStr(subform->subname));
205
206 ReleaseSysCache(tup);
207
208 return subname;
209 }
210
211 /*
212 * Convert text array to list of strings.
213 *
214 * Note: the resulting list of strings is pallocated here.
215 */
216 static List *
textarray_to_stringlist(ArrayType * textarray)217 textarray_to_stringlist(ArrayType *textarray)
218 {
219 Datum *elems;
220 int nelems,
221 i;
222 List *res = NIL;
223
224 deconstruct_array(textarray,
225 TEXTOID, -1, false, 'i',
226 &elems, NULL, &nelems);
227
228 if (nelems == 0)
229 return NIL;
230
231 for (i = 0; i < nelems; i++)
232 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
233
234 return res;
235 }
236
237 /*
238 * Add new state record for a subscription table.
239 */
240 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)241 AddSubscriptionRelState(Oid subid, Oid relid, char state,
242 XLogRecPtr sublsn)
243 {
244 Relation rel;
245 HeapTuple tup;
246 bool nulls[Natts_pg_subscription_rel];
247 Datum values[Natts_pg_subscription_rel];
248
249 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
250
251 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
252
253 /* Try finding existing mapping. */
254 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
255 ObjectIdGetDatum(relid),
256 ObjectIdGetDatum(subid));
257 if (HeapTupleIsValid(tup))
258 elog(ERROR, "subscription table %u in subscription %u already exists",
259 relid, subid);
260
261 /* Form the tuple. */
262 memset(values, 0, sizeof(values));
263 memset(nulls, false, sizeof(nulls));
264 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
265 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
266 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
267 if (sublsn != InvalidXLogRecPtr)
268 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
269 else
270 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
271
272 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
273
274 /* Insert tuple into catalog. */
275 CatalogTupleInsert(rel, tup);
276
277 heap_freetuple(tup);
278
279 /* Cleanup. */
280 table_close(rel, NoLock);
281 }
282
283 /*
284 * Update the state of a subscription table.
285 */
286 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)287 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
288 XLogRecPtr sublsn)
289 {
290 Relation rel;
291 HeapTuple tup;
292 bool nulls[Natts_pg_subscription_rel];
293 Datum values[Natts_pg_subscription_rel];
294 bool replaces[Natts_pg_subscription_rel];
295
296 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
297
298 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
299
300 /* Try finding existing mapping. */
301 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
302 ObjectIdGetDatum(relid),
303 ObjectIdGetDatum(subid));
304 if (!HeapTupleIsValid(tup))
305 elog(ERROR, "subscription table %u in subscription %u does not exist",
306 relid, subid);
307
308 /* Update the tuple. */
309 memset(values, 0, sizeof(values));
310 memset(nulls, false, sizeof(nulls));
311 memset(replaces, false, sizeof(replaces));
312
313 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
314 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
315
316 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
317 if (sublsn != InvalidXLogRecPtr)
318 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
319 else
320 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
321
322 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
323 replaces);
324
325 /* Update the catalog. */
326 CatalogTupleUpdate(rel, &tup->t_self, tup);
327
328 /* Cleanup. */
329 table_close(rel, NoLock);
330 }
331
332 /*
333 * Get state of subscription table.
334 *
335 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
336 */
337 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)338 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
339 bool missing_ok)
340 {
341 Relation rel;
342 HeapTuple tup;
343 char substate;
344 bool isnull;
345 Datum d;
346
347 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
348
349 /* Try finding the mapping. */
350 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 ObjectIdGetDatum(relid),
352 ObjectIdGetDatum(subid));
353
354 if (!HeapTupleIsValid(tup))
355 {
356 if (missing_ok)
357 {
358 table_close(rel, AccessShareLock);
359 *sublsn = InvalidXLogRecPtr;
360 return SUBREL_STATE_UNKNOWN;
361 }
362
363 elog(ERROR, "subscription table %u in subscription %u does not exist",
364 relid, subid);
365 }
366
367 /* Get the state. */
368 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
369 Anum_pg_subscription_rel_srsubstate, &isnull);
370 Assert(!isnull);
371 substate = DatumGetChar(d);
372 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
373 Anum_pg_subscription_rel_srsublsn, &isnull);
374 if (isnull)
375 *sublsn = InvalidXLogRecPtr;
376 else
377 *sublsn = DatumGetLSN(d);
378
379 /* Cleanup */
380 ReleaseSysCache(tup);
381 table_close(rel, AccessShareLock);
382
383 return substate;
384 }
385
386 /*
387 * Drop subscription relation mapping. These can be for a particular
388 * subscription, or for a particular relation, or both.
389 */
390 void
RemoveSubscriptionRel(Oid subid,Oid relid)391 RemoveSubscriptionRel(Oid subid, Oid relid)
392 {
393 Relation rel;
394 TableScanDesc scan;
395 ScanKeyData skey[2];
396 HeapTuple tup;
397 int nkeys = 0;
398
399 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400
401 if (OidIsValid(subid))
402 {
403 ScanKeyInit(&skey[nkeys++],
404 Anum_pg_subscription_rel_srsubid,
405 BTEqualStrategyNumber,
406 F_OIDEQ,
407 ObjectIdGetDatum(subid));
408 }
409
410 if (OidIsValid(relid))
411 {
412 ScanKeyInit(&skey[nkeys++],
413 Anum_pg_subscription_rel_srrelid,
414 BTEqualStrategyNumber,
415 F_OIDEQ,
416 ObjectIdGetDatum(relid));
417 }
418
419 /* Do the search and delete what we found. */
420 scan = table_beginscan_catalog(rel, nkeys, skey);
421 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 {
423 CatalogTupleDelete(rel, &tup->t_self);
424 }
425 table_endscan(scan);
426
427 table_close(rel, RowExclusiveLock);
428 }
429
430
431 /*
432 * Get all relations for subscription.
433 *
434 * Returned list is palloc'ed in current memory context.
435 */
436 List *
GetSubscriptionRelations(Oid subid)437 GetSubscriptionRelations(Oid subid)
438 {
439 List *res = NIL;
440 Relation rel;
441 HeapTuple tup;
442 int nkeys = 0;
443 ScanKeyData skey[2];
444 SysScanDesc scan;
445
446 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
447
448 ScanKeyInit(&skey[nkeys++],
449 Anum_pg_subscription_rel_srsubid,
450 BTEqualStrategyNumber, F_OIDEQ,
451 ObjectIdGetDatum(subid));
452
453 scan = systable_beginscan(rel, InvalidOid, false,
454 NULL, nkeys, skey);
455
456 while (HeapTupleIsValid(tup = systable_getnext(scan)))
457 {
458 Form_pg_subscription_rel subrel;
459 SubscriptionRelState *relstate;
460 Datum d;
461 bool isnull;
462
463 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
464
465 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
466 relstate->relid = subrel->srrelid;
467 relstate->state = subrel->srsubstate;
468 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
469 Anum_pg_subscription_rel_srsublsn, &isnull);
470 if (isnull)
471 relstate->lsn = InvalidXLogRecPtr;
472 else
473 relstate->lsn = DatumGetLSN(d);
474
475 res = lappend(res, relstate);
476 }
477
478 /* Cleanup */
479 systable_endscan(scan);
480 table_close(rel, AccessShareLock);
481
482 return res;
483 }
484
485 /*
486 * Get all relations for subscription that are not in a ready state.
487 *
488 * Returned list is palloc'ed in current memory context.
489 */
490 List *
GetSubscriptionNotReadyRelations(Oid subid)491 GetSubscriptionNotReadyRelations(Oid subid)
492 {
493 List *res = NIL;
494 Relation rel;
495 HeapTuple tup;
496 int nkeys = 0;
497 ScanKeyData skey[2];
498 SysScanDesc scan;
499
500 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
501
502 ScanKeyInit(&skey[nkeys++],
503 Anum_pg_subscription_rel_srsubid,
504 BTEqualStrategyNumber, F_OIDEQ,
505 ObjectIdGetDatum(subid));
506
507 ScanKeyInit(&skey[nkeys++],
508 Anum_pg_subscription_rel_srsubstate,
509 BTEqualStrategyNumber, F_CHARNE,
510 CharGetDatum(SUBREL_STATE_READY));
511
512 scan = systable_beginscan(rel, InvalidOid, false,
513 NULL, nkeys, skey);
514
515 while (HeapTupleIsValid(tup = systable_getnext(scan)))
516 {
517 Form_pg_subscription_rel subrel;
518 SubscriptionRelState *relstate;
519 Datum d;
520 bool isnull;
521
522 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523
524 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525 relstate->relid = subrel->srrelid;
526 relstate->state = subrel->srsubstate;
527 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
528 Anum_pg_subscription_rel_srsublsn, &isnull);
529 if (isnull)
530 relstate->lsn = InvalidXLogRecPtr;
531 else
532 relstate->lsn = DatumGetLSN(d);
533
534 res = lappend(res, relstate);
535 }
536
537 /* Cleanup */
538 systable_endscan(scan);
539 table_close(rel, AccessShareLock);
540
541 return res;
542 }
543