1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  *		replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23 
24 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
26 #include "catalog/pg_subscription.h"
27 #include "catalog/pg_subscription_rel.h"
28 
29 #include "nodes/makefuncs.h"
30 
31 #include "storage/lmgr.h"
32 
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/fmgroids.h"
36 #include "utils/pg_lsn.h"
37 #include "utils/rel.h"
38 #include "utils/syscache.h"
39 
40 
41 static List *textarray_to_stringlist(ArrayType *textarray);
42 
43 /*
44  * Fetch the subscription from the syscache.
45  */
46 Subscription *
GetSubscription(Oid subid,bool missing_ok)47 GetSubscription(Oid subid, bool missing_ok)
48 {
49 	HeapTuple	tup;
50 	Subscription *sub;
51 	Form_pg_subscription subform;
52 	Datum		datum;
53 	bool		isnull;
54 
55 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
56 
57 	if (!HeapTupleIsValid(tup))
58 	{
59 		if (missing_ok)
60 			return NULL;
61 
62 		elog(ERROR, "cache lookup failed for subscription %u", subid);
63 	}
64 
65 	subform = (Form_pg_subscription) GETSTRUCT(tup);
66 
67 	sub = (Subscription *) palloc(sizeof(Subscription));
68 	sub->oid = subid;
69 	sub->dbid = subform->subdbid;
70 	sub->name = pstrdup(NameStr(subform->subname));
71 	sub->owner = subform->subowner;
72 	sub->enabled = subform->subenabled;
73 
74 	/* Get conninfo */
75 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
76 							tup,
77 							Anum_pg_subscription_subconninfo,
78 							&isnull);
79 	Assert(!isnull);
80 	sub->conninfo = TextDatumGetCString(datum);
81 
82 	/* Get slotname */
83 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 							tup,
85 							Anum_pg_subscription_subslotname,
86 							&isnull);
87 	if (!isnull)
88 		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 	else
90 		sub->slotname = NULL;
91 
92 	/* Get synccommit */
93 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
94 							tup,
95 							Anum_pg_subscription_subsynccommit,
96 							&isnull);
97 	Assert(!isnull);
98 	sub->synccommit = TextDatumGetCString(datum);
99 
100 	/* Get publications */
101 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
102 							tup,
103 							Anum_pg_subscription_subpublications,
104 							&isnull);
105 	Assert(!isnull);
106 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107 
108 	ReleaseSysCache(tup);
109 
110 	return sub;
111 }
112 
113 /*
114  * Return number of subscriptions defined in given database.
115  * Used by dropdb() to check if database can indeed be dropped.
116  */
117 int
CountDBSubscriptions(Oid dbid)118 CountDBSubscriptions(Oid dbid)
119 {
120 	int			nsubs = 0;
121 	Relation	rel;
122 	ScanKeyData scankey;
123 	SysScanDesc scan;
124 	HeapTuple	tup;
125 
126 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
127 
128 	ScanKeyInit(&scankey,
129 				Anum_pg_subscription_subdbid,
130 				BTEqualStrategyNumber, F_OIDEQ,
131 				ObjectIdGetDatum(dbid));
132 
133 	scan = systable_beginscan(rel, InvalidOid, false,
134 							  NULL, 1, &scankey);
135 
136 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
137 		nsubs++;
138 
139 	systable_endscan(scan);
140 
141 	heap_close(rel, NoLock);
142 
143 	return nsubs;
144 }
145 
146 /*
147  * Free memory allocated by subscription struct.
148  */
149 void
FreeSubscription(Subscription * sub)150 FreeSubscription(Subscription *sub)
151 {
152 	pfree(sub->name);
153 	pfree(sub->conninfo);
154 	if (sub->slotname)
155 		pfree(sub->slotname);
156 	list_free_deep(sub->publications);
157 	pfree(sub);
158 }
159 
160 /*
161  * get_subscription_oid - given a subscription name, look up the OID
162  *
163  * If missing_ok is false, throw an error if name not found.  If true, just
164  * return InvalidOid.
165  */
166 Oid
get_subscription_oid(const char * subname,bool missing_ok)167 get_subscription_oid(const char *subname, bool missing_ok)
168 {
169 	Oid			oid;
170 
171 	oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
172 						  CStringGetDatum(subname));
173 	if (!OidIsValid(oid) && !missing_ok)
174 		ereport(ERROR,
175 				(errcode(ERRCODE_UNDEFINED_OBJECT),
176 				 errmsg("subscription \"%s\" does not exist", subname)));
177 	return oid;
178 }
179 
180 /*
181  * get_subscription_name - given a subscription OID, look up the name
182  */
183 char *
get_subscription_name(Oid subid)184 get_subscription_name(Oid subid)
185 {
186 	HeapTuple	tup;
187 	char	   *subname;
188 	Form_pg_subscription subform;
189 
190 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
191 
192 	if (!HeapTupleIsValid(tup))
193 		elog(ERROR, "cache lookup failed for subscription %u", subid);
194 
195 	subform = (Form_pg_subscription) GETSTRUCT(tup);
196 	subname = pstrdup(NameStr(subform->subname));
197 
198 	ReleaseSysCache(tup);
199 
200 	return subname;
201 }
202 
203 /*
204  * Convert text array to list of strings.
205  *
206  * Note: the resulting list of strings is pallocated here.
207  */
208 static List *
textarray_to_stringlist(ArrayType * textarray)209 textarray_to_stringlist(ArrayType *textarray)
210 {
211 	Datum	   *elems;
212 	int			nelems,
213 				i;
214 	List	   *res = NIL;
215 
216 	deconstruct_array(textarray,
217 					  TEXTOID, -1, false, 'i',
218 					  &elems, NULL, &nelems);
219 
220 	if (nelems == 0)
221 		return NIL;
222 
223 	for (i = 0; i < nelems; i++)
224 		res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225 
226 	return res;
227 }
228 
229 /*
230  * Add new state record for a subscription table.
231  */
232 Oid
AddSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)233 AddSubscriptionRelState(Oid subid, Oid relid, char state,
234 						XLogRecPtr sublsn)
235 {
236 	Relation	rel;
237 	HeapTuple	tup;
238 	Oid			subrelid;
239 	bool		nulls[Natts_pg_subscription_rel];
240 	Datum		values[Natts_pg_subscription_rel];
241 
242 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
243 
244 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
245 
246 	/* Try finding existing mapping. */
247 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
248 							  ObjectIdGetDatum(relid),
249 							  ObjectIdGetDatum(subid));
250 	if (HeapTupleIsValid(tup))
251 		elog(ERROR, "subscription table %u in subscription %u already exists",
252 			 relid, subid);
253 
254 	/* Form the tuple. */
255 	memset(values, 0, sizeof(values));
256 	memset(nulls, false, sizeof(nulls));
257 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
258 	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
259 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
260 	if (sublsn != InvalidXLogRecPtr)
261 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
262 	else
263 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
264 
265 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
266 
267 	/* Insert tuple into catalog. */
268 	subrelid = CatalogTupleInsert(rel, tup);
269 
270 	heap_freetuple(tup);
271 
272 	/* Cleanup. */
273 	heap_close(rel, NoLock);
274 
275 	return subrelid;
276 }
277 
278 /*
279  * Update the state of a subscription table.
280  */
281 Oid
UpdateSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn)282 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
283 						   XLogRecPtr sublsn)
284 {
285 	Relation	rel;
286 	HeapTuple	tup;
287 	Oid			subrelid;
288 	bool		nulls[Natts_pg_subscription_rel];
289 	Datum		values[Natts_pg_subscription_rel];
290 	bool		replaces[Natts_pg_subscription_rel];
291 
292 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
293 
294 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
295 
296 	/* Try finding existing mapping. */
297 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
298 							  ObjectIdGetDatum(relid),
299 							  ObjectIdGetDatum(subid));
300 	if (!HeapTupleIsValid(tup))
301 		elog(ERROR, "subscription table %u in subscription %u does not exist",
302 			 relid, subid);
303 
304 	/* Update the tuple. */
305 	memset(values, 0, sizeof(values));
306 	memset(nulls, false, sizeof(nulls));
307 	memset(replaces, false, sizeof(replaces));
308 
309 	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
310 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
311 
312 	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
313 	if (sublsn != InvalidXLogRecPtr)
314 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
315 	else
316 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
317 
318 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
319 							replaces);
320 
321 	/* Update the catalog. */
322 	CatalogTupleUpdate(rel, &tup->t_self, tup);
323 
324 	subrelid = HeapTupleGetOid(tup);
325 
326 	/* Cleanup. */
327 	heap_close(rel, NoLock);
328 
329 	return subrelid;
330 }
331 
332 /*
333  * Get state of subscription table.
334  *
335  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
336  */
337 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)338 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
339 						bool missing_ok)
340 {
341 	Relation	rel;
342 	HeapTuple	tup;
343 	char		substate;
344 	bool		isnull;
345 	Datum		d;
346 
347 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
348 
349 	/* Try finding the mapping. */
350 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 						  ObjectIdGetDatum(relid),
352 						  ObjectIdGetDatum(subid));
353 
354 	if (!HeapTupleIsValid(tup))
355 	{
356 		if (missing_ok)
357 		{
358 			heap_close(rel, AccessShareLock);
359 			*sublsn = InvalidXLogRecPtr;
360 			return SUBREL_STATE_UNKNOWN;
361 		}
362 
363 		elog(ERROR, "subscription table %u in subscription %u does not exist",
364 			 relid, subid);
365 	}
366 
367 	/* Get the state. */
368 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
369 						Anum_pg_subscription_rel_srsubstate, &isnull);
370 	Assert(!isnull);
371 	substate = DatumGetChar(d);
372 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
373 						Anum_pg_subscription_rel_srsublsn, &isnull);
374 	if (isnull)
375 		*sublsn = InvalidXLogRecPtr;
376 	else
377 		*sublsn = DatumGetLSN(d);
378 
379 	/* Cleanup */
380 	ReleaseSysCache(tup);
381 	heap_close(rel, AccessShareLock);
382 
383 	return substate;
384 }
385 
386 /*
387  * Drop subscription relation mapping. These can be for a particular
388  * subscription, or for a particular relation, or both.
389  */
390 void
RemoveSubscriptionRel(Oid subid,Oid relid)391 RemoveSubscriptionRel(Oid subid, Oid relid)
392 {
393 	Relation	rel;
394 	HeapScanDesc scan;
395 	ScanKeyData skey[2];
396 	HeapTuple	tup;
397 	int			nkeys = 0;
398 
399 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
400 
401 	if (OidIsValid(subid))
402 	{
403 		ScanKeyInit(&skey[nkeys++],
404 					Anum_pg_subscription_rel_srsubid,
405 					BTEqualStrategyNumber,
406 					F_OIDEQ,
407 					ObjectIdGetDatum(subid));
408 	}
409 
410 	if (OidIsValid(relid))
411 	{
412 		ScanKeyInit(&skey[nkeys++],
413 					Anum_pg_subscription_rel_srrelid,
414 					BTEqualStrategyNumber,
415 					F_OIDEQ,
416 					ObjectIdGetDatum(relid));
417 	}
418 
419 	/* Do the search and delete what we found. */
420 	scan = heap_beginscan_catalog(rel, nkeys, skey);
421 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 	{
423 		CatalogTupleDelete(rel, &tup->t_self);
424 	}
425 	heap_endscan(scan);
426 
427 	heap_close(rel, RowExclusiveLock);
428 }
429 
430 
431 /*
432  * Get all relations for subscription.
433  *
434  * Returned list is palloc'ed in current memory context.
435  */
436 List *
GetSubscriptionRelations(Oid subid)437 GetSubscriptionRelations(Oid subid)
438 {
439 	List	   *res = NIL;
440 	Relation	rel;
441 	HeapTuple	tup;
442 	int			nkeys = 0;
443 	ScanKeyData skey[2];
444 	SysScanDesc scan;
445 
446 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
447 
448 	ScanKeyInit(&skey[nkeys++],
449 				Anum_pg_subscription_rel_srsubid,
450 				BTEqualStrategyNumber, F_OIDEQ,
451 				ObjectIdGetDatum(subid));
452 
453 	scan = systable_beginscan(rel, InvalidOid, false,
454 							  NULL, nkeys, skey);
455 
456 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
457 	{
458 		Form_pg_subscription_rel subrel;
459 		SubscriptionRelState *relstate;
460 		Datum		d;
461 		bool		isnull;
462 
463 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
464 
465 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
466 		relstate->relid = subrel->srrelid;
467 		relstate->state = subrel->srsubstate;
468 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
469 							Anum_pg_subscription_rel_srsublsn, &isnull);
470 		if (isnull)
471 			relstate->lsn = InvalidXLogRecPtr;
472 		else
473 			relstate->lsn = DatumGetLSN(d);
474 
475 		res = lappend(res, relstate);
476 	}
477 
478 	/* Cleanup */
479 	systable_endscan(scan);
480 	heap_close(rel, AccessShareLock);
481 
482 	return res;
483 }
484 
485 /*
486  * Get all relations for subscription that are not in a ready state.
487  *
488  * Returned list is palloc'ed in current memory context.
489  */
490 List *
GetSubscriptionNotReadyRelations(Oid subid)491 GetSubscriptionNotReadyRelations(Oid subid)
492 {
493 	List	   *res = NIL;
494 	Relation	rel;
495 	HeapTuple	tup;
496 	int			nkeys = 0;
497 	ScanKeyData skey[2];
498 	SysScanDesc scan;
499 
500 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
501 
502 	ScanKeyInit(&skey[nkeys++],
503 				Anum_pg_subscription_rel_srsubid,
504 				BTEqualStrategyNumber, F_OIDEQ,
505 				ObjectIdGetDatum(subid));
506 
507 	ScanKeyInit(&skey[nkeys++],
508 				Anum_pg_subscription_rel_srsubstate,
509 				BTEqualStrategyNumber, F_CHARNE,
510 				CharGetDatum(SUBREL_STATE_READY));
511 
512 	scan = systable_beginscan(rel, InvalidOid, false,
513 							  NULL, nkeys, skey);
514 
515 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
516 	{
517 		Form_pg_subscription_rel subrel;
518 		SubscriptionRelState *relstate;
519 		Datum		d;
520 		bool		isnull;
521 
522 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523 
524 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525 		relstate->relid = subrel->srrelid;
526 		relstate->state = subrel->srsubstate;
527 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
528 							Anum_pg_subscription_rel_srsublsn, &isnull);
529 		if (isnull)
530 			relstate->lsn = InvalidXLogRecPtr;
531 		else
532 			relstate->lsn = DatumGetLSN(d);
533 
534 		res = lappend(res, relstate);
535 	}
536 
537 	/* Cleanup */
538 	systable_endscan(scan);
539 	heap_close(rel, AccessShareLock);
540 
541 	return res;
542 }
543