1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  *		replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/tableam.h"
23 #include "access/xact.h"
24 
25 #include "catalog/indexing.h"
26 #include "catalog/pg_type.h"
27 #include "catalog/pg_subscription.h"
28 #include "catalog/pg_subscription_rel.h"
29 
30 #include "nodes/makefuncs.h"
31 
32 #include "storage/lmgr.h"
33 
34 #include "utils/array.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/pg_lsn.h"
38 #include "utils/rel.h"
39 #include "utils/syscache.h"
40 
41 
42 static List *textarray_to_stringlist(ArrayType *textarray);
43 
44 /*
45  * Fetch the subscription from the syscache.
46  */
47 Subscription *
GetSubscription(Oid subid,bool missing_ok)48 GetSubscription(Oid subid, bool missing_ok)
49 {
50 	HeapTuple	tup;
51 	Subscription *sub;
52 	Form_pg_subscription subform;
53 	Datum		datum;
54 	bool		isnull;
55 
56 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
57 
58 	if (!HeapTupleIsValid(tup))
59 	{
60 		if (missing_ok)
61 			return NULL;
62 
63 		elog(ERROR, "cache lookup failed for subscription %u", subid);
64 	}
65 
66 	subform = (Form_pg_subscription) GETSTRUCT(tup);
67 
68 	sub = (Subscription *) palloc(sizeof(Subscription));
69 	sub->oid = subid;
70 	sub->dbid = subform->subdbid;
71 	sub->name = pstrdup(NameStr(subform->subname));
72 	sub->owner = subform->subowner;
73 	sub->enabled = subform->subenabled;
74 
75 	/* Get conninfo */
76 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
77 							tup,
78 							Anum_pg_subscription_subconninfo,
79 							&isnull);
80 	Assert(!isnull);
81 	sub->conninfo = TextDatumGetCString(datum);
82 
83 	/* Get slotname */
84 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
85 							tup,
86 							Anum_pg_subscription_subslotname,
87 							&isnull);
88 	if (!isnull)
89 		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90 	else
91 		sub->slotname = NULL;
92 
93 	/* Get synccommit */
94 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
95 							tup,
96 							Anum_pg_subscription_subsynccommit,
97 							&isnull);
98 	Assert(!isnull);
99 	sub->synccommit = TextDatumGetCString(datum);
100 
101 	/* Get publications */
102 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
103 							tup,
104 							Anum_pg_subscription_subpublications,
105 							&isnull);
106 	Assert(!isnull);
107 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
108 
109 	ReleaseSysCache(tup);
110 
111 	return sub;
112 }
113 
114 /*
115  * Return number of subscriptions defined in given database.
116  * Used by dropdb() to check if database can indeed be dropped.
117  */
118 int
CountDBSubscriptions(Oid dbid)119 CountDBSubscriptions(Oid dbid)
120 {
121 	int			nsubs = 0;
122 	Relation	rel;
123 	ScanKeyData scankey;
124 	SysScanDesc scan;
125 	HeapTuple	tup;
126 
127 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
128 
129 	ScanKeyInit(&scankey,
130 				Anum_pg_subscription_subdbid,
131 				BTEqualStrategyNumber, F_OIDEQ,
132 				ObjectIdGetDatum(dbid));
133 
134 	scan = systable_beginscan(rel, InvalidOid, false,
135 							  NULL, 1, &scankey);
136 
137 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
138 		nsubs++;
139 
140 	systable_endscan(scan);
141 
142 	table_close(rel, NoLock);
143 
144 	return nsubs;
145 }
146 
147 /*
148  * Free memory allocated by subscription struct.
149  */
150 void
FreeSubscription(Subscription * sub)151 FreeSubscription(Subscription *sub)
152 {
153 	pfree(sub->name);
154 	pfree(sub->conninfo);
155 	if (sub->slotname)
156 		pfree(sub->slotname);
157 	list_free_deep(sub->publications);
158 	pfree(sub);
159 }
160 
161 /*
162  * get_subscription_oid - given a subscription name, look up the OID
163  *
164  * If missing_ok is false, throw an error if name not found.  If true, just
165  * return InvalidOid.
166  */
167 Oid
get_subscription_oid(const char * subname,bool missing_ok)168 get_subscription_oid(const char *subname, bool missing_ok)
169 {
170 	Oid			oid;
171 
172 	oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
173 						  MyDatabaseId, CStringGetDatum(subname));
174 	if (!OidIsValid(oid) && !missing_ok)
175 		ereport(ERROR,
176 				(errcode(ERRCODE_UNDEFINED_OBJECT),
177 				 errmsg("subscription \"%s\" does not exist", subname)));
178 	return oid;
179 }
180 
181 /*
182  * get_subscription_name - given a subscription OID, look up the name
183  *
184  * If missing_ok is false, throw an error if name not found.  If true, just
185  * return NULL.
186  */
187 char *
get_subscription_name(Oid subid,bool missing_ok)188 get_subscription_name(Oid subid, bool missing_ok)
189 {
190 	HeapTuple	tup;
191 	char	   *subname;
192 	Form_pg_subscription subform;
193 
194 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
195 
196 	if (!HeapTupleIsValid(tup))
197 	{
198 		if (!missing_ok)
199 			elog(ERROR, "cache lookup failed for subscription %u", subid);
200 		return NULL;
201 	}
202 
203 	subform = (Form_pg_subscription) GETSTRUCT(tup);
204 	subname = pstrdup(NameStr(subform->subname));
205 
206 	ReleaseSysCache(tup);
207 
208 	return subname;
209 }
210 
211 /*
212  * Convert text array to list of strings.
213  *
214  * Note: the resulting list of strings is pallocated here.
215  */
216 static List *
textarray_to_stringlist(ArrayType * textarray)217 textarray_to_stringlist(ArrayType *textarray)
218 {
219 	Datum	   *elems;
220 	int			nelems,
221 				i;
222 	List	   *res = NIL;
223 
224 	deconstruct_array(textarray,
225 					  TEXTOID, -1, false, 'i',
226 					  &elems, NULL, &nelems);
227 
228 	if (nelems == 0)
229 		return NIL;
230 
231 	for (i = 0; i < nelems; i++)
232 		res = lappend(res, makeString(TextDatumGetCString(elems[i])));
233 
234 	return res;
235 }
236 
237 /*
238  * Add new state record for a subscription table.
239  */
240 void
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)241 AddSubscriptionRelState(Oid subid, Oid relid, char state,
242 						XLogRecPtr sublsn)
243 {
244 	Relation	rel;
245 	HeapTuple	tup;
246 	bool		nulls[Natts_pg_subscription_rel];
247 	Datum		values[Natts_pg_subscription_rel];
248 
249 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
250 
251 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
252 
253 	/* Try finding existing mapping. */
254 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
255 							  ObjectIdGetDatum(relid),
256 							  ObjectIdGetDatum(subid));
257 	if (HeapTupleIsValid(tup))
258 		elog(ERROR, "subscription table %u in subscription %u already exists",
259 			 relid, subid);
260 
261 	/* Form the tuple. */
262 	memset(values, 0, sizeof(values));
263 	memset(nulls, false, sizeof(nulls));
264 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
265 	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
266 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
267 	if (sublsn != InvalidXLogRecPtr)
268 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
269 	else
270 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
271 
272 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
273 
274 	/* Insert tuple into catalog. */
275 	CatalogTupleInsert(rel, tup);
276 
277 	heap_freetuple(tup);
278 
279 	/* Cleanup. */
280 	table_close(rel, NoLock);
281 }
282 
283 /*
284  * Update the state of a subscription table.
285  */
286 void
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)287 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
288 						   XLogRecPtr sublsn)
289 {
290 	Relation	rel;
291 	HeapTuple	tup;
292 	bool		nulls[Natts_pg_subscription_rel];
293 	Datum		values[Natts_pg_subscription_rel];
294 	bool		replaces[Natts_pg_subscription_rel];
295 
296 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
297 
298 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
299 
300 	/* Try finding existing mapping. */
301 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
302 							  ObjectIdGetDatum(relid),
303 							  ObjectIdGetDatum(subid));
304 	if (!HeapTupleIsValid(tup))
305 		elog(ERROR, "subscription table %u in subscription %u does not exist",
306 			 relid, subid);
307 
308 	/* Update the tuple. */
309 	memset(values, 0, sizeof(values));
310 	memset(nulls, false, sizeof(nulls));
311 	memset(replaces, false, sizeof(replaces));
312 
313 	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
314 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
315 
316 	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
317 	if (sublsn != InvalidXLogRecPtr)
318 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
319 	else
320 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
321 
322 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
323 							replaces);
324 
325 	/* Update the catalog. */
326 	CatalogTupleUpdate(rel, &tup->t_self, tup);
327 
328 	/* Cleanup. */
329 	table_close(rel, NoLock);
330 }
331 
332 /*
333  * Get state of subscription table.
334  *
335  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
336  */
337 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)338 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
339 						bool missing_ok)
340 {
341 	Relation	rel;
342 	HeapTuple	tup;
343 	char		substate;
344 	bool		isnull;
345 	Datum		d;
346 
347 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
348 
349 	/* Try finding the mapping. */
350 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 						  ObjectIdGetDatum(relid),
352 						  ObjectIdGetDatum(subid));
353 
354 	if (!HeapTupleIsValid(tup))
355 	{
356 		if (missing_ok)
357 		{
358 			table_close(rel, AccessShareLock);
359 			*sublsn = InvalidXLogRecPtr;
360 			return SUBREL_STATE_UNKNOWN;
361 		}
362 
363 		elog(ERROR, "subscription table %u in subscription %u does not exist",
364 			 relid, subid);
365 	}
366 
367 	/* Get the state. */
368 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
369 						Anum_pg_subscription_rel_srsubstate, &isnull);
370 	Assert(!isnull);
371 	substate = DatumGetChar(d);
372 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
373 						Anum_pg_subscription_rel_srsublsn, &isnull);
374 	if (isnull)
375 		*sublsn = InvalidXLogRecPtr;
376 	else
377 		*sublsn = DatumGetLSN(d);
378 
379 	/* Cleanup */
380 	ReleaseSysCache(tup);
381 	table_close(rel, AccessShareLock);
382 
383 	return substate;
384 }
385 
386 /*
387  * Drop subscription relation mapping. These can be for a particular
388  * subscription, or for a particular relation, or both.
389  */
390 void
RemoveSubscriptionRel(Oid subid,Oid relid)391 RemoveSubscriptionRel(Oid subid, Oid relid)
392 {
393 	Relation	rel;
394 	TableScanDesc scan;
395 	ScanKeyData skey[2];
396 	HeapTuple	tup;
397 	int			nkeys = 0;
398 
399 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400 
401 	if (OidIsValid(subid))
402 	{
403 		ScanKeyInit(&skey[nkeys++],
404 					Anum_pg_subscription_rel_srsubid,
405 					BTEqualStrategyNumber,
406 					F_OIDEQ,
407 					ObjectIdGetDatum(subid));
408 	}
409 
410 	if (OidIsValid(relid))
411 	{
412 		ScanKeyInit(&skey[nkeys++],
413 					Anum_pg_subscription_rel_srrelid,
414 					BTEqualStrategyNumber,
415 					F_OIDEQ,
416 					ObjectIdGetDatum(relid));
417 	}
418 
419 	/* Do the search and delete what we found. */
420 	scan = table_beginscan_catalog(rel, nkeys, skey);
421 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 	{
423 		CatalogTupleDelete(rel, &tup->t_self);
424 	}
425 	table_endscan(scan);
426 
427 	table_close(rel, RowExclusiveLock);
428 }
429 
430 
431 /*
432  * Get all relations for subscription.
433  *
434  * Returned list is palloc'ed in current memory context.
435  */
436 List *
GetSubscriptionRelations(Oid subid)437 GetSubscriptionRelations(Oid subid)
438 {
439 	List	   *res = NIL;
440 	Relation	rel;
441 	HeapTuple	tup;
442 	int			nkeys = 0;
443 	ScanKeyData skey[2];
444 	SysScanDesc scan;
445 
446 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
447 
448 	ScanKeyInit(&skey[nkeys++],
449 				Anum_pg_subscription_rel_srsubid,
450 				BTEqualStrategyNumber, F_OIDEQ,
451 				ObjectIdGetDatum(subid));
452 
453 	scan = systable_beginscan(rel, InvalidOid, false,
454 							  NULL, nkeys, skey);
455 
456 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
457 	{
458 		Form_pg_subscription_rel subrel;
459 		SubscriptionRelState *relstate;
460 		Datum		d;
461 		bool		isnull;
462 
463 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
464 
465 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
466 		relstate->relid = subrel->srrelid;
467 		relstate->state = subrel->srsubstate;
468 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
469 							Anum_pg_subscription_rel_srsublsn, &isnull);
470 		if (isnull)
471 			relstate->lsn = InvalidXLogRecPtr;
472 		else
473 			relstate->lsn = DatumGetLSN(d);
474 
475 		res = lappend(res, relstate);
476 	}
477 
478 	/* Cleanup */
479 	systable_endscan(scan);
480 	table_close(rel, AccessShareLock);
481 
482 	return res;
483 }
484 
485 /*
486  * Get all relations for subscription that are not in a ready state.
487  *
488  * Returned list is palloc'ed in current memory context.
489  */
490 List *
GetSubscriptionNotReadyRelations(Oid subid)491 GetSubscriptionNotReadyRelations(Oid subid)
492 {
493 	List	   *res = NIL;
494 	Relation	rel;
495 	HeapTuple	tup;
496 	int			nkeys = 0;
497 	ScanKeyData skey[2];
498 	SysScanDesc scan;
499 
500 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
501 
502 	ScanKeyInit(&skey[nkeys++],
503 				Anum_pg_subscription_rel_srsubid,
504 				BTEqualStrategyNumber, F_OIDEQ,
505 				ObjectIdGetDatum(subid));
506 
507 	ScanKeyInit(&skey[nkeys++],
508 				Anum_pg_subscription_rel_srsubstate,
509 				BTEqualStrategyNumber, F_CHARNE,
510 				CharGetDatum(SUBREL_STATE_READY));
511 
512 	scan = systable_beginscan(rel, InvalidOid, false,
513 							  NULL, nkeys, skey);
514 
515 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
516 	{
517 		Form_pg_subscription_rel subrel;
518 		SubscriptionRelState *relstate;
519 		Datum		d;
520 		bool		isnull;
521 
522 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523 
524 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525 		relstate->relid = subrel->srrelid;
526 		relstate->state = subrel->srsubstate;
527 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
528 							Anum_pg_subscription_rel_srsublsn, &isnull);
529 		if (isnull)
530 			relstate->lsn = InvalidXLogRecPtr;
531 		else
532 			relstate->lsn = DatumGetLSN(d);
533 
534 		res = lappend(res, relstate);
535 	}
536 
537 	/* Cleanup */
538 	systable_endscan(scan);
539 	table_close(rel, AccessShareLock);
540 
541 	return res;
542 }
543