1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  *		replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/pg_lsn.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
36 
37 static List *textarray_to_stringlist(ArrayType *textarray);
38 
39 /*
40  * Fetch the subscription from the syscache.
41  */
42 Subscription *
GetSubscription(Oid subid,bool missing_ok)43 GetSubscription(Oid subid, bool missing_ok)
44 {
45 	HeapTuple	tup;
46 	Subscription *sub;
47 	Form_pg_subscription subform;
48 	Datum		datum;
49 	bool		isnull;
50 
51 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
52 
53 	if (!HeapTupleIsValid(tup))
54 	{
55 		if (missing_ok)
56 			return NULL;
57 
58 		elog(ERROR, "cache lookup failed for subscription %u", subid);
59 	}
60 
61 	subform = (Form_pg_subscription) GETSTRUCT(tup);
62 
63 	sub = (Subscription *) palloc(sizeof(Subscription));
64 	sub->oid = subid;
65 	sub->dbid = subform->subdbid;
66 	sub->name = pstrdup(NameStr(subform->subname));
67 	sub->owner = subform->subowner;
68 	sub->enabled = subform->subenabled;
69 	sub->binary = subform->subbinary;
70 	sub->stream = subform->substream;
71 
72 	/* Get conninfo */
73 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
74 							tup,
75 							Anum_pg_subscription_subconninfo,
76 							&isnull);
77 	Assert(!isnull);
78 	sub->conninfo = TextDatumGetCString(datum);
79 
80 	/* Get slotname */
81 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
82 							tup,
83 							Anum_pg_subscription_subslotname,
84 							&isnull);
85 	if (!isnull)
86 		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
87 	else
88 		sub->slotname = NULL;
89 
90 	/* Get synccommit */
91 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
92 							tup,
93 							Anum_pg_subscription_subsynccommit,
94 							&isnull);
95 	Assert(!isnull);
96 	sub->synccommit = TextDatumGetCString(datum);
97 
98 	/* Get publications */
99 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
100 							tup,
101 							Anum_pg_subscription_subpublications,
102 							&isnull);
103 	Assert(!isnull);
104 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
105 
106 	ReleaseSysCache(tup);
107 
108 	return sub;
109 }
110 
111 /*
112  * Return number of subscriptions defined in given database.
113  * Used by dropdb() to check if database can indeed be dropped.
114  */
115 int
CountDBSubscriptions(Oid dbid)116 CountDBSubscriptions(Oid dbid)
117 {
118 	int			nsubs = 0;
119 	Relation	rel;
120 	ScanKeyData scankey;
121 	SysScanDesc scan;
122 	HeapTuple	tup;
123 
124 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
125 
126 	ScanKeyInit(&scankey,
127 				Anum_pg_subscription_subdbid,
128 				BTEqualStrategyNumber, F_OIDEQ,
129 				ObjectIdGetDatum(dbid));
130 
131 	scan = systable_beginscan(rel, InvalidOid, false,
132 							  NULL, 1, &scankey);
133 
134 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
135 		nsubs++;
136 
137 	systable_endscan(scan);
138 
139 	table_close(rel, NoLock);
140 
141 	return nsubs;
142 }
143 
144 /*
145  * Free memory allocated by subscription struct.
146  */
147 void
FreeSubscription(Subscription * sub)148 FreeSubscription(Subscription *sub)
149 {
150 	pfree(sub->name);
151 	pfree(sub->conninfo);
152 	if (sub->slotname)
153 		pfree(sub->slotname);
154 	list_free_deep(sub->publications);
155 	pfree(sub);
156 }
157 
158 /*
159  * get_subscription_oid - given a subscription name, look up the OID
160  *
161  * If missing_ok is false, throw an error if name not found.  If true, just
162  * return InvalidOid.
163  */
164 Oid
get_subscription_oid(const char * subname,bool missing_ok)165 get_subscription_oid(const char *subname, bool missing_ok)
166 {
167 	Oid			oid;
168 
169 	oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
170 						  MyDatabaseId, CStringGetDatum(subname));
171 	if (!OidIsValid(oid) && !missing_ok)
172 		ereport(ERROR,
173 				(errcode(ERRCODE_UNDEFINED_OBJECT),
174 				 errmsg("subscription \"%s\" does not exist", subname)));
175 	return oid;
176 }
177 
178 /*
179  * get_subscription_name - given a subscription OID, look up the name
180  *
181  * If missing_ok is false, throw an error if name not found.  If true, just
182  * return NULL.
183  */
184 char *
get_subscription_name(Oid subid,bool missing_ok)185 get_subscription_name(Oid subid, bool missing_ok)
186 {
187 	HeapTuple	tup;
188 	char	   *subname;
189 	Form_pg_subscription subform;
190 
191 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
192 
193 	if (!HeapTupleIsValid(tup))
194 	{
195 		if (!missing_ok)
196 			elog(ERROR, "cache lookup failed for subscription %u", subid);
197 		return NULL;
198 	}
199 
200 	subform = (Form_pg_subscription) GETSTRUCT(tup);
201 	subname = pstrdup(NameStr(subform->subname));
202 
203 	ReleaseSysCache(tup);
204 
205 	return subname;
206 }
207 
208 /*
209  * Convert text array to list of strings.
210  *
211  * Note: the resulting list of strings is pallocated here.
212  */
213 static List *
textarray_to_stringlist(ArrayType * textarray)214 textarray_to_stringlist(ArrayType *textarray)
215 {
216 	Datum	   *elems;
217 	int			nelems,
218 				i;
219 	List	   *res = NIL;
220 
221 	deconstruct_array(textarray,
222 					  TEXTOID, -1, false, TYPALIGN_INT,
223 					  &elems, NULL, &nelems);
224 
225 	if (nelems == 0)
226 		return NIL;
227 
228 	for (i = 0; i < nelems; i++)
229 		res = lappend(res, makeString(TextDatumGetCString(elems[i])));
230 
231 	return res;
232 }
233 
234 /*
235  * Add new state record for a subscription table.
236  */
237 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)238 AddSubscriptionRelState(Oid subid, Oid relid, char state,
239 						XLogRecPtr sublsn)
240 {
241 	Relation	rel;
242 	HeapTuple	tup;
243 	bool		nulls[Natts_pg_subscription_rel];
244 	Datum		values[Natts_pg_subscription_rel];
245 
246 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
247 
248 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
249 
250 	/* Try finding existing mapping. */
251 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
252 							  ObjectIdGetDatum(relid),
253 							  ObjectIdGetDatum(subid));
254 	if (HeapTupleIsValid(tup))
255 		elog(ERROR, "subscription table %u in subscription %u already exists",
256 			 relid, subid);
257 
258 	/* Form the tuple. */
259 	memset(values, 0, sizeof(values));
260 	memset(nulls, false, sizeof(nulls));
261 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
262 	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
263 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
264 	if (sublsn != InvalidXLogRecPtr)
265 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
266 	else
267 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
268 
269 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
270 
271 	/* Insert tuple into catalog. */
272 	CatalogTupleInsert(rel, tup);
273 
274 	heap_freetuple(tup);
275 
276 	/* Cleanup. */
277 	table_close(rel, NoLock);
278 }
279 
280 /*
281  * Update the state of a subscription table.
282  */
283 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)284 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
285 						   XLogRecPtr sublsn)
286 {
287 	Relation	rel;
288 	HeapTuple	tup;
289 	bool		nulls[Natts_pg_subscription_rel];
290 	Datum		values[Natts_pg_subscription_rel];
291 	bool		replaces[Natts_pg_subscription_rel];
292 
293 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
294 
295 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
296 
297 	/* Try finding existing mapping. */
298 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
299 							  ObjectIdGetDatum(relid),
300 							  ObjectIdGetDatum(subid));
301 	if (!HeapTupleIsValid(tup))
302 		elog(ERROR, "subscription table %u in subscription %u does not exist",
303 			 relid, subid);
304 
305 	/* Update the tuple. */
306 	memset(values, 0, sizeof(values));
307 	memset(nulls, false, sizeof(nulls));
308 	memset(replaces, false, sizeof(replaces));
309 
310 	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
311 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
312 
313 	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
314 	if (sublsn != InvalidXLogRecPtr)
315 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
316 	else
317 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
318 
319 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
320 							replaces);
321 
322 	/* Update the catalog. */
323 	CatalogTupleUpdate(rel, &tup->t_self, tup);
324 
325 	/* Cleanup. */
326 	table_close(rel, NoLock);
327 }
328 
329 /*
330  * Get state of subscription table.
331  *
332  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
333  */
334 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn)335 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
336 {
337 	HeapTuple	tup;
338 	char		substate;
339 	bool		isnull;
340 	Datum		d;
341 	Relation	rel;
342 
343 	/*
344 	 * This is to avoid the race condition with AlterSubscription which tries
345 	 * to remove this relstate.
346 	 */
347 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
348 
349 	/* Try finding the mapping. */
350 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 						  ObjectIdGetDatum(relid),
352 						  ObjectIdGetDatum(subid));
353 
354 	if (!HeapTupleIsValid(tup))
355 	{
356 		table_close(rel, AccessShareLock);
357 		*sublsn = InvalidXLogRecPtr;
358 		return SUBREL_STATE_UNKNOWN;
359 	}
360 
361 	/* Get the state. */
362 	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
363 
364 	/* Get the LSN */
365 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
366 						Anum_pg_subscription_rel_srsublsn, &isnull);
367 	if (isnull)
368 		*sublsn = InvalidXLogRecPtr;
369 	else
370 		*sublsn = DatumGetLSN(d);
371 
372 	/* Cleanup */
373 	ReleaseSysCache(tup);
374 
375 	table_close(rel, AccessShareLock);
376 
377 	return substate;
378 }
379 
380 /*
381  * Drop subscription relation mapping. These can be for a particular
382  * subscription, or for a particular relation, or both.
383  */
384 void
RemoveSubscriptionRel(Oid subid,Oid relid)385 RemoveSubscriptionRel(Oid subid, Oid relid)
386 {
387 	Relation	rel;
388 	TableScanDesc scan;
389 	ScanKeyData skey[2];
390 	HeapTuple	tup;
391 	int			nkeys = 0;
392 
393 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
394 
395 	if (OidIsValid(subid))
396 	{
397 		ScanKeyInit(&skey[nkeys++],
398 					Anum_pg_subscription_rel_srsubid,
399 					BTEqualStrategyNumber,
400 					F_OIDEQ,
401 					ObjectIdGetDatum(subid));
402 	}
403 
404 	if (OidIsValid(relid))
405 	{
406 		ScanKeyInit(&skey[nkeys++],
407 					Anum_pg_subscription_rel_srrelid,
408 					BTEqualStrategyNumber,
409 					F_OIDEQ,
410 					ObjectIdGetDatum(relid));
411 	}
412 
413 	/* Do the search and delete what we found. */
414 	scan = table_beginscan_catalog(rel, nkeys, skey);
415 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
416 	{
417 		Form_pg_subscription_rel subrel;
418 
419 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
420 
421 		/*
422 		 * We don't allow to drop the relation mapping when the table
423 		 * synchronization is in progress unless the caller updates the
424 		 * corresponding subscription as well. This is to ensure that we don't
425 		 * leave tablesync slots or origins in the system when the
426 		 * corresponding table is dropped.
427 		 */
428 		if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
429 		{
430 			ereport(ERROR,
431 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 					 errmsg("could not drop relation mapping for subscription \"%s\"",
433 							get_subscription_name(subrel->srsubid, false)),
434 					 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
435 							   get_rel_name(relid), subrel->srsubstate),
436 
437 			/*
438 			 * translator: first %s is a SQL ALTER command and second %s is a
439 			 * SQL DROP command
440 			 */
441 					 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
442 							 "ALTER SUBSCRIPTION ... ENABLE",
443 							 "DROP SUBSCRIPTION ...")));
444 		}
445 
446 		CatalogTupleDelete(rel, &tup->t_self);
447 	}
448 	table_endscan(scan);
449 
450 	table_close(rel, RowExclusiveLock);
451 }
452 
453 
454 /*
455  * Get all relations for subscription.
456  *
457  * Returned list is palloc'ed in current memory context.
458  */
459 List *
GetSubscriptionRelations(Oid subid)460 GetSubscriptionRelations(Oid subid)
461 {
462 	List	   *res = NIL;
463 	Relation	rel;
464 	HeapTuple	tup;
465 	ScanKeyData skey[1];
466 	SysScanDesc scan;
467 
468 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
469 
470 	ScanKeyInit(&skey[0],
471 				Anum_pg_subscription_rel_srsubid,
472 				BTEqualStrategyNumber, F_OIDEQ,
473 				ObjectIdGetDatum(subid));
474 
475 	scan = systable_beginscan(rel, InvalidOid, false,
476 							  NULL, 1, skey);
477 
478 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
479 	{
480 		Form_pg_subscription_rel subrel;
481 		SubscriptionRelState *relstate;
482 		Datum		d;
483 		bool		isnull;
484 
485 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
486 
487 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
488 		relstate->relid = subrel->srrelid;
489 		relstate->state = subrel->srsubstate;
490 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
491 							Anum_pg_subscription_rel_srsublsn, &isnull);
492 		if (isnull)
493 			relstate->lsn = InvalidXLogRecPtr;
494 		else
495 			relstate->lsn = DatumGetLSN(d);
496 
497 		res = lappend(res, relstate);
498 	}
499 
500 	/* Cleanup */
501 	systable_endscan(scan);
502 	table_close(rel, AccessShareLock);
503 
504 	return res;
505 }
506 
507 /*
508  * Get all relations for subscription that are not in a ready state.
509  *
510  * Returned list is palloc'ed in current memory context.
511  */
512 List *
GetSubscriptionNotReadyRelations(Oid subid)513 GetSubscriptionNotReadyRelations(Oid subid)
514 {
515 	List	   *res = NIL;
516 	Relation	rel;
517 	HeapTuple	tup;
518 	int			nkeys = 0;
519 	ScanKeyData skey[2];
520 	SysScanDesc scan;
521 
522 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
523 
524 	ScanKeyInit(&skey[nkeys++],
525 				Anum_pg_subscription_rel_srsubid,
526 				BTEqualStrategyNumber, F_OIDEQ,
527 				ObjectIdGetDatum(subid));
528 
529 	ScanKeyInit(&skey[nkeys++],
530 				Anum_pg_subscription_rel_srsubstate,
531 				BTEqualStrategyNumber, F_CHARNE,
532 				CharGetDatum(SUBREL_STATE_READY));
533 
534 	scan = systable_beginscan(rel, InvalidOid, false,
535 							  NULL, nkeys, skey);
536 
537 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
538 	{
539 		Form_pg_subscription_rel subrel;
540 		SubscriptionRelState *relstate;
541 		Datum		d;
542 		bool		isnull;
543 
544 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
545 
546 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
547 		relstate->relid = subrel->srrelid;
548 		relstate->state = subrel->srsubstate;
549 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
550 							Anum_pg_subscription_rel_srsublsn, &isnull);
551 		if (isnull)
552 			relstate->lsn = InvalidXLogRecPtr;
553 		else
554 			relstate->lsn = DatumGetLSN(d);
555 
556 		res = lappend(res, relstate);
557 	}
558 
559 	/* Cleanup */
560 	systable_endscan(scan);
561 	table_close(rel, AccessShareLock);
562 
563 	return res;
564 }
565