1 /*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23
24 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
26 #include "catalog/pg_subscription.h"
27 #include "catalog/pg_subscription_rel.h"
28
29 #include "nodes/makefuncs.h"
30
31 #include "storage/lmgr.h"
32
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/fmgroids.h"
36 #include "utils/pg_lsn.h"
37 #include "utils/rel.h"
38 #include "utils/syscache.h"
39
40
41 static List *textarray_to_stringlist(ArrayType *textarray);
42
43 /*
44 * Fetch the subscription from the syscache.
45 */
46 Subscription *
GetSubscription(Oid subid,bool missing_ok)47 GetSubscription(Oid subid, bool missing_ok)
48 {
49 HeapTuple tup;
50 Subscription *sub;
51 Form_pg_subscription subform;
52 Datum datum;
53 bool isnull;
54
55 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
56
57 if (!HeapTupleIsValid(tup))
58 {
59 if (missing_ok)
60 return NULL;
61
62 elog(ERROR, "cache lookup failed for subscription %u", subid);
63 }
64
65 subform = (Form_pg_subscription) GETSTRUCT(tup);
66
67 sub = (Subscription *) palloc(sizeof(Subscription));
68 sub->oid = subid;
69 sub->dbid = subform->subdbid;
70 sub->name = pstrdup(NameStr(subform->subname));
71 sub->owner = subform->subowner;
72 sub->enabled = subform->subenabled;
73
74 /* Get conninfo */
75 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
76 tup,
77 Anum_pg_subscription_subconninfo,
78 &isnull);
79 Assert(!isnull);
80 sub->conninfo = TextDatumGetCString(datum);
81
82 /* Get slotname */
83 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 tup,
85 Anum_pg_subscription_subslotname,
86 &isnull);
87 if (!isnull)
88 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 else
90 sub->slotname = NULL;
91
92 /* Get synccommit */
93 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
94 tup,
95 Anum_pg_subscription_subsynccommit,
96 &isnull);
97 Assert(!isnull);
98 sub->synccommit = TextDatumGetCString(datum);
99
100 /* Get publications */
101 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
102 tup,
103 Anum_pg_subscription_subpublications,
104 &isnull);
105 Assert(!isnull);
106 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107
108 ReleaseSysCache(tup);
109
110 return sub;
111 }
112
113 /*
114 * Return number of subscriptions defined in given database.
115 * Used by dropdb() to check if database can indeed be dropped.
116 */
117 int
CountDBSubscriptions(Oid dbid)118 CountDBSubscriptions(Oid dbid)
119 {
120 int nsubs = 0;
121 Relation rel;
122 ScanKeyData scankey;
123 SysScanDesc scan;
124 HeapTuple tup;
125
126 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
127
128 ScanKeyInit(&scankey,
129 Anum_pg_subscription_subdbid,
130 BTEqualStrategyNumber, F_OIDEQ,
131 ObjectIdGetDatum(dbid));
132
133 scan = systable_beginscan(rel, InvalidOid, false,
134 NULL, 1, &scankey);
135
136 while (HeapTupleIsValid(tup = systable_getnext(scan)))
137 nsubs++;
138
139 systable_endscan(scan);
140
141 heap_close(rel, NoLock);
142
143 return nsubs;
144 }
145
146 /*
147 * Free memory allocated by subscription struct.
148 */
149 void
FreeSubscription(Subscription * sub)150 FreeSubscription(Subscription *sub)
151 {
152 pfree(sub->name);
153 pfree(sub->conninfo);
154 if (sub->slotname)
155 pfree(sub->slotname);
156 list_free_deep(sub->publications);
157 pfree(sub);
158 }
159
160 /*
161 * get_subscription_oid - given a subscription name, look up the OID
162 *
163 * If missing_ok is false, throw an error if name not found. If true, just
164 * return InvalidOid.
165 */
166 Oid
get_subscription_oid(const char * subname,bool missing_ok)167 get_subscription_oid(const char *subname, bool missing_ok)
168 {
169 Oid oid;
170
171 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
172 CStringGetDatum(subname));
173 if (!OidIsValid(oid) && !missing_ok)
174 ereport(ERROR,
175 (errcode(ERRCODE_UNDEFINED_OBJECT),
176 errmsg("subscription \"%s\" does not exist", subname)));
177 return oid;
178 }
179
180 /*
181 * get_subscription_name - given a subscription OID, look up the name
182 */
183 char *
get_subscription_name(Oid subid)184 get_subscription_name(Oid subid)
185 {
186 HeapTuple tup;
187 char *subname;
188 Form_pg_subscription subform;
189
190 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
191
192 if (!HeapTupleIsValid(tup))
193 elog(ERROR, "cache lookup failed for subscription %u", subid);
194
195 subform = (Form_pg_subscription) GETSTRUCT(tup);
196 subname = pstrdup(NameStr(subform->subname));
197
198 ReleaseSysCache(tup);
199
200 return subname;
201 }
202
203 /*
204 * Convert text array to list of strings.
205 *
206 * Note: the resulting list of strings is pallocated here.
207 */
208 static List *
textarray_to_stringlist(ArrayType * textarray)209 textarray_to_stringlist(ArrayType *textarray)
210 {
211 Datum *elems;
212 int nelems,
213 i;
214 List *res = NIL;
215
216 deconstruct_array(textarray,
217 TEXTOID, -1, false, 'i',
218 &elems, NULL, &nelems);
219
220 if (nelems == 0)
221 return NIL;
222
223 for (i = 0; i < nelems; i++)
224 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225
226 return res;
227 }
228
229 /*
230 * Add new state record for a subscription table.
231 */
232 Oid
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)233 AddSubscriptionRelState(Oid subid, Oid relid, char state,
234 XLogRecPtr sublsn)
235 {
236 Relation rel;
237 HeapTuple tup;
238 Oid subrelid;
239 bool nulls[Natts_pg_subscription_rel];
240 Datum values[Natts_pg_subscription_rel];
241
242 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
243
244 rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
245
246 /* Try finding existing mapping. */
247 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
248 ObjectIdGetDatum(relid),
249 ObjectIdGetDatum(subid));
250 if (HeapTupleIsValid(tup))
251 elog(ERROR, "subscription table %u in subscription %u already exists",
252 relid, subid);
253
254 /* Form the tuple. */
255 memset(values, 0, sizeof(values));
256 memset(nulls, false, sizeof(nulls));
257 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
258 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
259 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
260 if (sublsn != InvalidXLogRecPtr)
261 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
262 else
263 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
264
265 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
266
267 /* Insert tuple into catalog. */
268 subrelid = CatalogTupleInsert(rel, tup);
269
270 heap_freetuple(tup);
271
272 /* Cleanup. */
273 heap_close(rel, NoLock);
274
275 return subrelid;
276 }
277
278 /*
279 * Update the state of a subscription table.
280 */
281 Oid
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)282 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
283 XLogRecPtr sublsn)
284 {
285 Relation rel;
286 HeapTuple tup;
287 Oid subrelid;
288 bool nulls[Natts_pg_subscription_rel];
289 Datum values[Natts_pg_subscription_rel];
290 bool replaces[Natts_pg_subscription_rel];
291
292 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
293
294 rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
295
296 /* Try finding existing mapping. */
297 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
298 ObjectIdGetDatum(relid),
299 ObjectIdGetDatum(subid));
300 if (!HeapTupleIsValid(tup))
301 elog(ERROR, "subscription table %u in subscription %u does not exist",
302 relid, subid);
303
304 /* Update the tuple. */
305 memset(values, 0, sizeof(values));
306 memset(nulls, false, sizeof(nulls));
307 memset(replaces, false, sizeof(replaces));
308
309 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
310 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
311
312 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
313 if (sublsn != InvalidXLogRecPtr)
314 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
315 else
316 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
317
318 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
319 replaces);
320
321 /* Update the catalog. */
322 CatalogTupleUpdate(rel, &tup->t_self, tup);
323
324 subrelid = HeapTupleGetOid(tup);
325
326 /* Cleanup. */
327 heap_close(rel, NoLock);
328
329 return subrelid;
330 }
331
332 /*
333 * Get state of subscription table.
334 *
335 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
336 */
337 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)338 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
339 bool missing_ok)
340 {
341 Relation rel;
342 HeapTuple tup;
343 char substate;
344 bool isnull;
345 Datum d;
346
347 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
348
349 /* Try finding the mapping. */
350 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 ObjectIdGetDatum(relid),
352 ObjectIdGetDatum(subid));
353
354 if (!HeapTupleIsValid(tup))
355 {
356 if (missing_ok)
357 {
358 heap_close(rel, AccessShareLock);
359 *sublsn = InvalidXLogRecPtr;
360 return SUBREL_STATE_UNKNOWN;
361 }
362
363 elog(ERROR, "subscription table %u in subscription %u does not exist",
364 relid, subid);
365 }
366
367 /* Get the state. */
368 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
369 Anum_pg_subscription_rel_srsubstate, &isnull);
370 Assert(!isnull);
371 substate = DatumGetChar(d);
372 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
373 Anum_pg_subscription_rel_srsublsn, &isnull);
374 if (isnull)
375 *sublsn = InvalidXLogRecPtr;
376 else
377 *sublsn = DatumGetLSN(d);
378
379 /* Cleanup */
380 ReleaseSysCache(tup);
381 heap_close(rel, AccessShareLock);
382
383 return substate;
384 }
385
386 /*
387 * Drop subscription relation mapping. These can be for a particular
388 * subscription, or for a particular relation, or both.
389 */
390 void
RemoveSubscriptionRel(Oid subid,Oid relid)391 RemoveSubscriptionRel(Oid subid, Oid relid)
392 {
393 Relation rel;
394 HeapScanDesc scan;
395 ScanKeyData skey[2];
396 HeapTuple tup;
397 int nkeys = 0;
398
399 rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
400
401 if (OidIsValid(subid))
402 {
403 ScanKeyInit(&skey[nkeys++],
404 Anum_pg_subscription_rel_srsubid,
405 BTEqualStrategyNumber,
406 F_OIDEQ,
407 ObjectIdGetDatum(subid));
408 }
409
410 if (OidIsValid(relid))
411 {
412 ScanKeyInit(&skey[nkeys++],
413 Anum_pg_subscription_rel_srrelid,
414 BTEqualStrategyNumber,
415 F_OIDEQ,
416 ObjectIdGetDatum(relid));
417 }
418
419 /* Do the search and delete what we found. */
420 scan = heap_beginscan_catalog(rel, nkeys, skey);
421 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 {
423 CatalogTupleDelete(rel, &tup->t_self);
424 }
425 heap_endscan(scan);
426
427 heap_close(rel, RowExclusiveLock);
428 }
429
430
431 /*
432 * Get all relations for subscription.
433 *
434 * Returned list is palloc'ed in current memory context.
435 */
436 List *
GetSubscriptionRelations(Oid subid)437 GetSubscriptionRelations(Oid subid)
438 {
439 List *res = NIL;
440 Relation rel;
441 HeapTuple tup;
442 int nkeys = 0;
443 ScanKeyData skey[2];
444 SysScanDesc scan;
445
446 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
447
448 ScanKeyInit(&skey[nkeys++],
449 Anum_pg_subscription_rel_srsubid,
450 BTEqualStrategyNumber, F_OIDEQ,
451 ObjectIdGetDatum(subid));
452
453 scan = systable_beginscan(rel, InvalidOid, false,
454 NULL, nkeys, skey);
455
456 while (HeapTupleIsValid(tup = systable_getnext(scan)))
457 {
458 Form_pg_subscription_rel subrel;
459 SubscriptionRelState *relstate;
460 Datum d;
461 bool isnull;
462
463 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
464
465 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
466 relstate->relid = subrel->srrelid;
467 relstate->state = subrel->srsubstate;
468 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
469 Anum_pg_subscription_rel_srsublsn, &isnull);
470 if (isnull)
471 relstate->lsn = InvalidXLogRecPtr;
472 else
473 relstate->lsn = DatumGetLSN(d);
474
475 res = lappend(res, relstate);
476 }
477
478 /* Cleanup */
479 systable_endscan(scan);
480 heap_close(rel, AccessShareLock);
481
482 return res;
483 }
484
485 /*
486 * Get all relations for subscription that are not in a ready state.
487 *
488 * Returned list is palloc'ed in current memory context.
489 */
490 List *
GetSubscriptionNotReadyRelations(Oid subid)491 GetSubscriptionNotReadyRelations(Oid subid)
492 {
493 List *res = NIL;
494 Relation rel;
495 HeapTuple tup;
496 int nkeys = 0;
497 ScanKeyData skey[2];
498 SysScanDesc scan;
499
500 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
501
502 ScanKeyInit(&skey[nkeys++],
503 Anum_pg_subscription_rel_srsubid,
504 BTEqualStrategyNumber, F_OIDEQ,
505 ObjectIdGetDatum(subid));
506
507 ScanKeyInit(&skey[nkeys++],
508 Anum_pg_subscription_rel_srsubstate,
509 BTEqualStrategyNumber, F_CHARNE,
510 CharGetDatum(SUBREL_STATE_READY));
511
512 scan = systable_beginscan(rel, InvalidOid, false,
513 NULL, nkeys, skey);
514
515 while (HeapTupleIsValid(tup = systable_getnext(scan)))
516 {
517 Form_pg_subscription_rel subrel;
518 SubscriptionRelState *relstate;
519 Datum d;
520 bool isnull;
521
522 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523
524 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525 relstate->relid = subrel->srrelid;
526 relstate->state = subrel->srsubstate;
527 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
528 Anum_pg_subscription_rel_srsublsn, &isnull);
529 if (isnull)
530 relstate->lsn = InvalidXLogRecPtr;
531 else
532 relstate->lsn = DatumGetLSN(d);
533
534 res = lappend(res, relstate);
535 }
536
537 /* Cleanup */
538 systable_endscan(scan);
539 heap_close(rel, AccessShareLock);
540
541 return res;
542 }
543