1 /*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23
24 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
26 #include "catalog/pg_subscription.h"
27 #include "catalog/pg_subscription_rel.h"
28
29 #include "nodes/makefuncs.h"
30
31 #include "storage/lmgr.h"
32
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/fmgroids.h"
36 #include "utils/pg_lsn.h"
37 #include "utils/rel.h"
38 #include "utils/syscache.h"
39
40
41 static List *textarray_to_stringlist(ArrayType *textarray);
42
43 /*
44 * Fetch the subscription from the syscache.
45 */
46 Subscription *
GetSubscription(Oid subid,bool missing_ok)47 GetSubscription(Oid subid, bool missing_ok)
48 {
49 HeapTuple tup;
50 Subscription *sub;
51 Form_pg_subscription subform;
52 Datum datum;
53 bool isnull;
54
55 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
56
57 if (!HeapTupleIsValid(tup))
58 {
59 if (missing_ok)
60 return NULL;
61
62 elog(ERROR, "cache lookup failed for subscription %u", subid);
63 }
64
65 subform = (Form_pg_subscription) GETSTRUCT(tup);
66
67 sub = (Subscription *) palloc(sizeof(Subscription));
68 sub->oid = subid;
69 sub->dbid = subform->subdbid;
70 sub->name = pstrdup(NameStr(subform->subname));
71 sub->owner = subform->subowner;
72 sub->enabled = subform->subenabled;
73
74 /* Get conninfo */
75 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
76 tup,
77 Anum_pg_subscription_subconninfo,
78 &isnull);
79 Assert(!isnull);
80 sub->conninfo = TextDatumGetCString(datum);
81
82 /* Get slotname */
83 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 tup,
85 Anum_pg_subscription_subslotname,
86 &isnull);
87 if (!isnull)
88 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 else
90 sub->slotname = NULL;
91
92 /* Get synccommit */
93 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
94 tup,
95 Anum_pg_subscription_subsynccommit,
96 &isnull);
97 Assert(!isnull);
98 sub->synccommit = TextDatumGetCString(datum);
99
100 /* Get publications */
101 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
102 tup,
103 Anum_pg_subscription_subpublications,
104 &isnull);
105 Assert(!isnull);
106 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107
108 ReleaseSysCache(tup);
109
110 return sub;
111 }
112
113 /*
114 * Return number of subscriptions defined in given database.
115 * Used by dropdb() to check if database can indeed be dropped.
116 */
117 int
CountDBSubscriptions(Oid dbid)118 CountDBSubscriptions(Oid dbid)
119 {
120 int nsubs = 0;
121 Relation rel;
122 ScanKeyData scankey;
123 SysScanDesc scan;
124 HeapTuple tup;
125
126 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
127
128 ScanKeyInit(&scankey,
129 Anum_pg_subscription_subdbid,
130 BTEqualStrategyNumber, F_OIDEQ,
131 ObjectIdGetDatum(dbid));
132
133 scan = systable_beginscan(rel, InvalidOid, false,
134 NULL, 1, &scankey);
135
136 while (HeapTupleIsValid(tup = systable_getnext(scan)))
137 nsubs++;
138
139 systable_endscan(scan);
140
141 heap_close(rel, NoLock);
142
143 return nsubs;
144 }
145
146 /*
147 * Free memory allocated by subscription struct.
148 */
149 void
FreeSubscription(Subscription * sub)150 FreeSubscription(Subscription *sub)
151 {
152 pfree(sub->name);
153 pfree(sub->conninfo);
154 if (sub->slotname)
155 pfree(sub->slotname);
156 list_free_deep(sub->publications);
157 pfree(sub);
158 }
159
160 /*
161 * get_subscription_oid - given a subscription name, look up the OID
162 *
163 * If missing_ok is false, throw an error if name not found. If true, just
164 * return InvalidOid.
165 */
166 Oid
get_subscription_oid(const char * subname,bool missing_ok)167 get_subscription_oid(const char *subname, bool missing_ok)
168 {
169 Oid oid;
170
171 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
172 CStringGetDatum(subname));
173 if (!OidIsValid(oid) && !missing_ok)
174 ereport(ERROR,
175 (errcode(ERRCODE_UNDEFINED_OBJECT),
176 errmsg("subscription \"%s\" does not exist", subname)));
177 return oid;
178 }
179
180 /*
181 * get_subscription_name - given a subscription OID, look up the name
182 */
183 char *
get_subscription_name(Oid subid)184 get_subscription_name(Oid subid)
185 {
186 HeapTuple tup;
187 char *subname;
188 Form_pg_subscription subform;
189
190 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
191
192 if (!HeapTupleIsValid(tup))
193 elog(ERROR, "cache lookup failed for subscription %u", subid);
194
195 subform = (Form_pg_subscription) GETSTRUCT(tup);
196 subname = pstrdup(NameStr(subform->subname));
197
198 ReleaseSysCache(tup);
199
200 return subname;
201 }
202
203 /*
204 * Convert text array to list of strings.
205 *
206 * Note: the resulting list of strings is pallocated here.
207 */
208 static List *
textarray_to_stringlist(ArrayType * textarray)209 textarray_to_stringlist(ArrayType *textarray)
210 {
211 Datum *elems;
212 int nelems,
213 i;
214 List *res = NIL;
215
216 deconstruct_array(textarray,
217 TEXTOID, -1, false, 'i',
218 &elems, NULL, &nelems);
219
220 if (nelems == 0)
221 return NIL;
222
223 for (i = 0; i < nelems; i++)
224 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225
226 return res;
227 }
228
229 /*
230 * Set the state of a subscription table.
231 *
232 * If update_only is true and the record for given table doesn't exist, do
233 * nothing. This can be used to avoid inserting a new record that was deleted
234 * by someone else. Generally, subscription DDL commands should use false,
235 * workers should use true.
236 *
237 * The insert-or-update logic in this function is not concurrency safe so it
238 * might raise an error in rare circumstances. But if we took a stronger lock
239 * such as ShareRowExclusiveLock, we would risk more deadlocks.
240 */
241 Oid
SetSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn,bool update_only)242 SetSubscriptionRelState(Oid subid, Oid relid, char state,
243 XLogRecPtr sublsn, bool update_only)
244 {
245 Relation rel;
246 HeapTuple tup;
247 Oid subrelid = InvalidOid;
248 bool nulls[Natts_pg_subscription_rel];
249 Datum values[Natts_pg_subscription_rel];
250
251 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
252
253 rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
254
255 /* Try finding existing mapping. */
256 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
257 ObjectIdGetDatum(relid),
258 ObjectIdGetDatum(subid));
259
260 /*
261 * If the record for given table does not exist yet create new record,
262 * otherwise update the existing one.
263 */
264 if (!HeapTupleIsValid(tup) && !update_only)
265 {
266 /* Form the tuple. */
267 memset(values, 0, sizeof(values));
268 memset(nulls, false, sizeof(nulls));
269 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
270 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
271 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
272 if (sublsn != InvalidXLogRecPtr)
273 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
274 else
275 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
276
277 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
278
279 /* Insert tuple into catalog. */
280 subrelid = CatalogTupleInsert(rel, tup);
281
282 heap_freetuple(tup);
283 }
284 else if (HeapTupleIsValid(tup))
285 {
286 bool replaces[Natts_pg_subscription_rel];
287
288 /* Update the tuple. */
289 memset(values, 0, sizeof(values));
290 memset(nulls, false, sizeof(nulls));
291 memset(replaces, false, sizeof(replaces));
292
293 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
294 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
295
296 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
297 if (sublsn != InvalidXLogRecPtr)
298 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
299 else
300 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
301
302 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
303 replaces);
304
305 /* Update the catalog. */
306 CatalogTupleUpdate(rel, &tup->t_self, tup);
307
308 subrelid = HeapTupleGetOid(tup);
309 }
310
311 /* Cleanup. */
312 heap_close(rel, NoLock);
313
314 return subrelid;
315 }
316
317 /*
318 * Get state of subscription table.
319 *
320 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
321 */
322 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)323 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
324 bool missing_ok)
325 {
326 Relation rel;
327 HeapTuple tup;
328 char substate;
329 bool isnull;
330 Datum d;
331
332 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
333
334 /* Try finding the mapping. */
335 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
336 ObjectIdGetDatum(relid),
337 ObjectIdGetDatum(subid));
338
339 if (!HeapTupleIsValid(tup))
340 {
341 if (missing_ok)
342 {
343 heap_close(rel, AccessShareLock);
344 *sublsn = InvalidXLogRecPtr;
345 return SUBREL_STATE_UNKNOWN;
346 }
347
348 elog(ERROR, "subscription table %u in subscription %u does not exist",
349 relid, subid);
350 }
351
352 /* Get the state. */
353 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
354 Anum_pg_subscription_rel_srsubstate, &isnull);
355 Assert(!isnull);
356 substate = DatumGetChar(d);
357 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
358 Anum_pg_subscription_rel_srsublsn, &isnull);
359 if (isnull)
360 *sublsn = InvalidXLogRecPtr;
361 else
362 *sublsn = DatumGetLSN(d);
363
364 /* Cleanup */
365 ReleaseSysCache(tup);
366 heap_close(rel, AccessShareLock);
367
368 return substate;
369 }
370
371 /*
372 * Drop subscription relation mapping. These can be for a particular
373 * subscription, or for a particular relation, or both.
374 */
375 void
RemoveSubscriptionRel(Oid subid,Oid relid)376 RemoveSubscriptionRel(Oid subid, Oid relid)
377 {
378 Relation rel;
379 HeapScanDesc scan;
380 ScanKeyData skey[2];
381 HeapTuple tup;
382 int nkeys = 0;
383
384 rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
385
386 if (OidIsValid(subid))
387 {
388 ScanKeyInit(&skey[nkeys++],
389 Anum_pg_subscription_rel_srsubid,
390 BTEqualStrategyNumber,
391 F_OIDEQ,
392 ObjectIdGetDatum(subid));
393 }
394
395 if (OidIsValid(relid))
396 {
397 ScanKeyInit(&skey[nkeys++],
398 Anum_pg_subscription_rel_srrelid,
399 BTEqualStrategyNumber,
400 F_OIDEQ,
401 ObjectIdGetDatum(relid));
402 }
403
404 /* Do the search and delete what we found. */
405 scan = heap_beginscan_catalog(rel, nkeys, skey);
406 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
407 {
408 CatalogTupleDelete(rel, &tup->t_self);
409 }
410 heap_endscan(scan);
411
412 heap_close(rel, RowExclusiveLock);
413 }
414
415
416 /*
417 * Get all relations for subscription.
418 *
419 * Returned list is palloc'ed in current memory context.
420 */
421 List *
GetSubscriptionRelations(Oid subid)422 GetSubscriptionRelations(Oid subid)
423 {
424 List *res = NIL;
425 Relation rel;
426 HeapTuple tup;
427 int nkeys = 0;
428 ScanKeyData skey[2];
429 SysScanDesc scan;
430
431 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
432
433 ScanKeyInit(&skey[nkeys++],
434 Anum_pg_subscription_rel_srsubid,
435 BTEqualStrategyNumber, F_OIDEQ,
436 ObjectIdGetDatum(subid));
437
438 scan = systable_beginscan(rel, InvalidOid, false,
439 NULL, nkeys, skey);
440
441 while (HeapTupleIsValid(tup = systable_getnext(scan)))
442 {
443 Form_pg_subscription_rel subrel;
444 SubscriptionRelState *relstate;
445 Datum d;
446 bool isnull;
447
448 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
449
450 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
451 relstate->relid = subrel->srrelid;
452 relstate->state = subrel->srsubstate;
453 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
454 Anum_pg_subscription_rel_srsublsn, &isnull);
455 if (isnull)
456 relstate->lsn = InvalidXLogRecPtr;
457 else
458 relstate->lsn = DatumGetLSN(d);
459
460 res = lappend(res, relstate);
461 }
462
463 /* Cleanup */
464 systable_endscan(scan);
465 heap_close(rel, AccessShareLock);
466
467 return res;
468 }
469
470 /*
471 * Get all relations for subscription that are not in a ready state.
472 *
473 * Returned list is palloc'ed in current memory context.
474 */
475 List *
GetSubscriptionNotReadyRelations(Oid subid)476 GetSubscriptionNotReadyRelations(Oid subid)
477 {
478 List *res = NIL;
479 Relation rel;
480 HeapTuple tup;
481 int nkeys = 0;
482 ScanKeyData skey[2];
483 SysScanDesc scan;
484
485 rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
486
487 ScanKeyInit(&skey[nkeys++],
488 Anum_pg_subscription_rel_srsubid,
489 BTEqualStrategyNumber, F_OIDEQ,
490 ObjectIdGetDatum(subid));
491
492 ScanKeyInit(&skey[nkeys++],
493 Anum_pg_subscription_rel_srsubstate,
494 BTEqualStrategyNumber, F_CHARNE,
495 CharGetDatum(SUBREL_STATE_READY));
496
497 scan = systable_beginscan(rel, InvalidOid, false,
498 NULL, nkeys, skey);
499
500 while (HeapTupleIsValid(tup = systable_getnext(scan)))
501 {
502 Form_pg_subscription_rel subrel;
503 SubscriptionRelState *relstate;
504 Datum d;
505 bool isnull;
506
507 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
508
509 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
510 relstate->relid = subrel->srrelid;
511 relstate->state = subrel->srsubstate;
512 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
513 Anum_pg_subscription_rel_srsublsn, &isnull);
514 if (isnull)
515 relstate->lsn = InvalidXLogRecPtr;
516 else
517 relstate->lsn = DatumGetLSN(d);
518
519 res = lappend(res, relstate);
520 }
521
522 /* Cleanup */
523 systable_endscan(scan);
524 heap_close(rel, AccessShareLock);
525
526 return res;
527 }
528