1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  *		replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/pg_lsn.h"
33 #include "utils/rel.h"
34 #include "utils/syscache.h"
35 
36 static List *textarray_to_stringlist(ArrayType *textarray);
37 
38 /*
39  * Fetch the subscription from the syscache.
40  */
41 Subscription *
GetSubscription(Oid subid,bool missing_ok)42 GetSubscription(Oid subid, bool missing_ok)
43 {
44 	HeapTuple	tup;
45 	Subscription *sub;
46 	Form_pg_subscription subform;
47 	Datum		datum;
48 	bool		isnull;
49 
50 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
51 
52 	if (!HeapTupleIsValid(tup))
53 	{
54 		if (missing_ok)
55 			return NULL;
56 
57 		elog(ERROR, "cache lookup failed for subscription %u", subid);
58 	}
59 
60 	subform = (Form_pg_subscription) GETSTRUCT(tup);
61 
62 	sub = (Subscription *) palloc(sizeof(Subscription));
63 	sub->oid = subid;
64 	sub->dbid = subform->subdbid;
65 	sub->name = pstrdup(NameStr(subform->subname));
66 	sub->owner = subform->subowner;
67 	sub->enabled = subform->subenabled;
68 
69 	/* Get conninfo */
70 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
71 							tup,
72 							Anum_pg_subscription_subconninfo,
73 							&isnull);
74 	Assert(!isnull);
75 	sub->conninfo = TextDatumGetCString(datum);
76 
77 	/* Get slotname */
78 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
79 							tup,
80 							Anum_pg_subscription_subslotname,
81 							&isnull);
82 	if (!isnull)
83 		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
84 	else
85 		sub->slotname = NULL;
86 
87 	/* Get synccommit */
88 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
89 							tup,
90 							Anum_pg_subscription_subsynccommit,
91 							&isnull);
92 	Assert(!isnull);
93 	sub->synccommit = TextDatumGetCString(datum);
94 
95 	/* Get publications */
96 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
97 							tup,
98 							Anum_pg_subscription_subpublications,
99 							&isnull);
100 	Assert(!isnull);
101 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
102 
103 	ReleaseSysCache(tup);
104 
105 	return sub;
106 }
107 
108 /*
109  * Return number of subscriptions defined in given database.
110  * Used by dropdb() to check if database can indeed be dropped.
111  */
112 int
CountDBSubscriptions(Oid dbid)113 CountDBSubscriptions(Oid dbid)
114 {
115 	int			nsubs = 0;
116 	Relation	rel;
117 	ScanKeyData scankey;
118 	SysScanDesc scan;
119 	HeapTuple	tup;
120 
121 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
122 
123 	ScanKeyInit(&scankey,
124 				Anum_pg_subscription_subdbid,
125 				BTEqualStrategyNumber, F_OIDEQ,
126 				ObjectIdGetDatum(dbid));
127 
128 	scan = systable_beginscan(rel, InvalidOid, false,
129 							  NULL, 1, &scankey);
130 
131 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
132 		nsubs++;
133 
134 	systable_endscan(scan);
135 
136 	table_close(rel, NoLock);
137 
138 	return nsubs;
139 }
140 
141 /*
142  * Free memory allocated by subscription struct.
143  */
144 void
FreeSubscription(Subscription * sub)145 FreeSubscription(Subscription *sub)
146 {
147 	pfree(sub->name);
148 	pfree(sub->conninfo);
149 	if (sub->slotname)
150 		pfree(sub->slotname);
151 	list_free_deep(sub->publications);
152 	pfree(sub);
153 }
154 
155 /*
156  * get_subscription_oid - given a subscription name, look up the OID
157  *
158  * If missing_ok is false, throw an error if name not found.  If true, just
159  * return InvalidOid.
160  */
161 Oid
get_subscription_oid(const char * subname,bool missing_ok)162 get_subscription_oid(const char *subname, bool missing_ok)
163 {
164 	Oid			oid;
165 
166 	oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
167 						  MyDatabaseId, CStringGetDatum(subname));
168 	if (!OidIsValid(oid) && !missing_ok)
169 		ereport(ERROR,
170 				(errcode(ERRCODE_UNDEFINED_OBJECT),
171 				 errmsg("subscription \"%s\" does not exist", subname)));
172 	return oid;
173 }
174 
175 /*
176  * get_subscription_name - given a subscription OID, look up the name
177  *
178  * If missing_ok is false, throw an error if name not found.  If true, just
179  * return NULL.
180  */
181 char *
get_subscription_name(Oid subid,bool missing_ok)182 get_subscription_name(Oid subid, bool missing_ok)
183 {
184 	HeapTuple	tup;
185 	char	   *subname;
186 	Form_pg_subscription subform;
187 
188 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
189 
190 	if (!HeapTupleIsValid(tup))
191 	{
192 		if (!missing_ok)
193 			elog(ERROR, "cache lookup failed for subscription %u", subid);
194 		return NULL;
195 	}
196 
197 	subform = (Form_pg_subscription) GETSTRUCT(tup);
198 	subname = pstrdup(NameStr(subform->subname));
199 
200 	ReleaseSysCache(tup);
201 
202 	return subname;
203 }
204 
205 /*
206  * Convert text array to list of strings.
207  *
208  * Note: the resulting list of strings is pallocated here.
209  */
210 static List *
textarray_to_stringlist(ArrayType * textarray)211 textarray_to_stringlist(ArrayType *textarray)
212 {
213 	Datum	   *elems;
214 	int			nelems,
215 				i;
216 	List	   *res = NIL;
217 
218 	deconstruct_array(textarray,
219 					  TEXTOID, -1, false, TYPALIGN_INT,
220 					  &elems, NULL, &nelems);
221 
222 	if (nelems == 0)
223 		return NIL;
224 
225 	for (i = 0; i < nelems; i++)
226 		res = lappend(res, makeString(TextDatumGetCString(elems[i])));
227 
228 	return res;
229 }
230 
231 /*
232  * Add new state record for a subscription table.
233  */
234 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)235 AddSubscriptionRelState(Oid subid, Oid relid, char state,
236 						XLogRecPtr sublsn)
237 {
238 	Relation	rel;
239 	HeapTuple	tup;
240 	bool		nulls[Natts_pg_subscription_rel];
241 	Datum		values[Natts_pg_subscription_rel];
242 
243 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
244 
245 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
246 
247 	/* Try finding existing mapping. */
248 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
249 							  ObjectIdGetDatum(relid),
250 							  ObjectIdGetDatum(subid));
251 	if (HeapTupleIsValid(tup))
252 		elog(ERROR, "subscription table %u in subscription %u already exists",
253 			 relid, subid);
254 
255 	/* Form the tuple. */
256 	memset(values, 0, sizeof(values));
257 	memset(nulls, false, sizeof(nulls));
258 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
259 	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
260 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
261 	if (sublsn != InvalidXLogRecPtr)
262 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
263 	else
264 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
265 
266 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
267 
268 	/* Insert tuple into catalog. */
269 	CatalogTupleInsert(rel, tup);
270 
271 	heap_freetuple(tup);
272 
273 	/* Cleanup. */
274 	table_close(rel, NoLock);
275 }
276 
277 /*
278  * Update the state of a subscription table.
279  */
280 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)281 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
282 						   XLogRecPtr sublsn)
283 {
284 	Relation	rel;
285 	HeapTuple	tup;
286 	bool		nulls[Natts_pg_subscription_rel];
287 	Datum		values[Natts_pg_subscription_rel];
288 	bool		replaces[Natts_pg_subscription_rel];
289 
290 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
291 
292 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
293 
294 	/* Try finding existing mapping. */
295 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
296 							  ObjectIdGetDatum(relid),
297 							  ObjectIdGetDatum(subid));
298 	if (!HeapTupleIsValid(tup))
299 		elog(ERROR, "subscription table %u in subscription %u does not exist",
300 			 relid, subid);
301 
302 	/* Update the tuple. */
303 	memset(values, 0, sizeof(values));
304 	memset(nulls, false, sizeof(nulls));
305 	memset(replaces, false, sizeof(replaces));
306 
307 	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
308 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
309 
310 	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
311 	if (sublsn != InvalidXLogRecPtr)
312 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
313 	else
314 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
315 
316 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
317 							replaces);
318 
319 	/* Update the catalog. */
320 	CatalogTupleUpdate(rel, &tup->t_self, tup);
321 
322 	/* Cleanup. */
323 	table_close(rel, NoLock);
324 }
325 
326 /*
327  * Get state of subscription table.
328  *
329  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
330  */
331 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)332 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
333 						bool missing_ok)
334 {
335 	Relation	rel;
336 	HeapTuple	tup;
337 	char		substate;
338 	bool		isnull;
339 	Datum		d;
340 
341 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
342 
343 	/* Try finding the mapping. */
344 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
345 						  ObjectIdGetDatum(relid),
346 						  ObjectIdGetDatum(subid));
347 
348 	if (!HeapTupleIsValid(tup))
349 	{
350 		if (missing_ok)
351 		{
352 			table_close(rel, AccessShareLock);
353 			*sublsn = InvalidXLogRecPtr;
354 			return SUBREL_STATE_UNKNOWN;
355 		}
356 
357 		elog(ERROR, "subscription table %u in subscription %u does not exist",
358 			 relid, subid);
359 	}
360 
361 	/* Get the state. */
362 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
363 						Anum_pg_subscription_rel_srsubstate, &isnull);
364 	Assert(!isnull);
365 	substate = DatumGetChar(d);
366 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
367 						Anum_pg_subscription_rel_srsublsn, &isnull);
368 	if (isnull)
369 		*sublsn = InvalidXLogRecPtr;
370 	else
371 		*sublsn = DatumGetLSN(d);
372 
373 	/* Cleanup */
374 	ReleaseSysCache(tup);
375 	table_close(rel, AccessShareLock);
376 
377 	return substate;
378 }
379 
380 /*
381  * Drop subscription relation mapping. These can be for a particular
382  * subscription, or for a particular relation, or both.
383  */
384 void
RemoveSubscriptionRel(Oid subid,Oid relid)385 RemoveSubscriptionRel(Oid subid, Oid relid)
386 {
387 	Relation	rel;
388 	TableScanDesc scan;
389 	ScanKeyData skey[2];
390 	HeapTuple	tup;
391 	int			nkeys = 0;
392 
393 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
394 
395 	if (OidIsValid(subid))
396 	{
397 		ScanKeyInit(&skey[nkeys++],
398 					Anum_pg_subscription_rel_srsubid,
399 					BTEqualStrategyNumber,
400 					F_OIDEQ,
401 					ObjectIdGetDatum(subid));
402 	}
403 
404 	if (OidIsValid(relid))
405 	{
406 		ScanKeyInit(&skey[nkeys++],
407 					Anum_pg_subscription_rel_srrelid,
408 					BTEqualStrategyNumber,
409 					F_OIDEQ,
410 					ObjectIdGetDatum(relid));
411 	}
412 
413 	/* Do the search and delete what we found. */
414 	scan = table_beginscan_catalog(rel, nkeys, skey);
415 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
416 	{
417 		CatalogTupleDelete(rel, &tup->t_self);
418 	}
419 	table_endscan(scan);
420 
421 	table_close(rel, RowExclusiveLock);
422 }
423 
424 
425 /*
426  * Get all relations for subscription.
427  *
428  * Returned list is palloc'ed in current memory context.
429  */
430 List *
GetSubscriptionRelations(Oid subid)431 GetSubscriptionRelations(Oid subid)
432 {
433 	List	   *res = NIL;
434 	Relation	rel;
435 	HeapTuple	tup;
436 	int			nkeys = 0;
437 	ScanKeyData skey[2];
438 	SysScanDesc scan;
439 
440 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
441 
442 	ScanKeyInit(&skey[nkeys++],
443 				Anum_pg_subscription_rel_srsubid,
444 				BTEqualStrategyNumber, F_OIDEQ,
445 				ObjectIdGetDatum(subid));
446 
447 	scan = systable_beginscan(rel, InvalidOid, false,
448 							  NULL, nkeys, skey);
449 
450 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
451 	{
452 		Form_pg_subscription_rel subrel;
453 		SubscriptionRelState *relstate;
454 		Datum		d;
455 		bool		isnull;
456 
457 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
458 
459 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
460 		relstate->relid = subrel->srrelid;
461 		relstate->state = subrel->srsubstate;
462 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
463 							Anum_pg_subscription_rel_srsublsn, &isnull);
464 		if (isnull)
465 			relstate->lsn = InvalidXLogRecPtr;
466 		else
467 			relstate->lsn = DatumGetLSN(d);
468 
469 		res = lappend(res, relstate);
470 	}
471 
472 	/* Cleanup */
473 	systable_endscan(scan);
474 	table_close(rel, AccessShareLock);
475 
476 	return res;
477 }
478 
479 /*
480  * Get all relations for subscription that are not in a ready state.
481  *
482  * Returned list is palloc'ed in current memory context.
483  */
484 List *
GetSubscriptionNotReadyRelations(Oid subid)485 GetSubscriptionNotReadyRelations(Oid subid)
486 {
487 	List	   *res = NIL;
488 	Relation	rel;
489 	HeapTuple	tup;
490 	int			nkeys = 0;
491 	ScanKeyData skey[2];
492 	SysScanDesc scan;
493 
494 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
495 
496 	ScanKeyInit(&skey[nkeys++],
497 				Anum_pg_subscription_rel_srsubid,
498 				BTEqualStrategyNumber, F_OIDEQ,
499 				ObjectIdGetDatum(subid));
500 
501 	ScanKeyInit(&skey[nkeys++],
502 				Anum_pg_subscription_rel_srsubstate,
503 				BTEqualStrategyNumber, F_CHARNE,
504 				CharGetDatum(SUBREL_STATE_READY));
505 
506 	scan = systable_beginscan(rel, InvalidOid, false,
507 							  NULL, nkeys, skey);
508 
509 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
510 	{
511 		Form_pg_subscription_rel subrel;
512 		SubscriptionRelState *relstate;
513 		Datum		d;
514 		bool		isnull;
515 
516 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
517 
518 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
519 		relstate->relid = subrel->srrelid;
520 		relstate->state = subrel->srsubstate;
521 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
522 							Anum_pg_subscription_rel_srsublsn, &isnull);
523 		if (isnull)
524 			relstate->lsn = InvalidXLogRecPtr;
525 		else
526 			relstate->lsn = DatumGetLSN(d);
527 
528 		res = lappend(res, relstate);
529 	}
530 
531 	/* Cleanup */
532 	systable_endscan(scan);
533 	table_close(rel, AccessShareLock);
534 
535 	return res;
536 }
537