1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  *		replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23 
24 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
26 #include "catalog/pg_subscription.h"
27 #include "catalog/pg_subscription_rel.h"
28 
29 #include "nodes/makefuncs.h"
30 
31 #include "storage/lmgr.h"
32 
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/fmgroids.h"
36 #include "utils/pg_lsn.h"
37 #include "utils/rel.h"
38 #include "utils/syscache.h"
39 
40 
41 static List *textarray_to_stringlist(ArrayType *textarray);
42 
43 /*
44  * Fetch the subscription from the syscache.
45  */
46 Subscription *
GetSubscription(Oid subid,bool missing_ok)47 GetSubscription(Oid subid, bool missing_ok)
48 {
49 	HeapTuple	tup;
50 	Subscription *sub;
51 	Form_pg_subscription subform;
52 	Datum		datum;
53 	bool		isnull;
54 
55 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
56 
57 	if (!HeapTupleIsValid(tup))
58 	{
59 		if (missing_ok)
60 			return NULL;
61 
62 		elog(ERROR, "cache lookup failed for subscription %u", subid);
63 	}
64 
65 	subform = (Form_pg_subscription) GETSTRUCT(tup);
66 
67 	sub = (Subscription *) palloc(sizeof(Subscription));
68 	sub->oid = subid;
69 	sub->dbid = subform->subdbid;
70 	sub->name = pstrdup(NameStr(subform->subname));
71 	sub->owner = subform->subowner;
72 	sub->enabled = subform->subenabled;
73 
74 	/* Get conninfo */
75 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
76 							tup,
77 							Anum_pg_subscription_subconninfo,
78 							&isnull);
79 	Assert(!isnull);
80 	sub->conninfo = TextDatumGetCString(datum);
81 
82 	/* Get slotname */
83 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 							tup,
85 							Anum_pg_subscription_subslotname,
86 							&isnull);
87 	if (!isnull)
88 		sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 	else
90 		sub->slotname = NULL;
91 
92 	/* Get synccommit */
93 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
94 							tup,
95 							Anum_pg_subscription_subsynccommit,
96 							&isnull);
97 	Assert(!isnull);
98 	sub->synccommit = TextDatumGetCString(datum);
99 
100 	/* Get publications */
101 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
102 							tup,
103 							Anum_pg_subscription_subpublications,
104 							&isnull);
105 	Assert(!isnull);
106 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107 
108 	ReleaseSysCache(tup);
109 
110 	return sub;
111 }
112 
113 /*
114  * Return number of subscriptions defined in given database.
115  * Used by dropdb() to check if database can indeed be dropped.
116  */
117 int
CountDBSubscriptions(Oid dbid)118 CountDBSubscriptions(Oid dbid)
119 {
120 	int			nsubs = 0;
121 	Relation	rel;
122 	ScanKeyData scankey;
123 	SysScanDesc scan;
124 	HeapTuple	tup;
125 
126 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
127 
128 	ScanKeyInit(&scankey,
129 				Anum_pg_subscription_subdbid,
130 				BTEqualStrategyNumber, F_OIDEQ,
131 				ObjectIdGetDatum(dbid));
132 
133 	scan = systable_beginscan(rel, InvalidOid, false,
134 							  NULL, 1, &scankey);
135 
136 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
137 		nsubs++;
138 
139 	systable_endscan(scan);
140 
141 	heap_close(rel, NoLock);
142 
143 	return nsubs;
144 }
145 
146 /*
147  * Free memory allocated by subscription struct.
148  */
149 void
FreeSubscription(Subscription * sub)150 FreeSubscription(Subscription *sub)
151 {
152 	pfree(sub->name);
153 	pfree(sub->conninfo);
154 	if (sub->slotname)
155 		pfree(sub->slotname);
156 	list_free_deep(sub->publications);
157 	pfree(sub);
158 }
159 
160 /*
161  * get_subscription_oid - given a subscription name, look up the OID
162  *
163  * If missing_ok is false, throw an error if name not found.  If true, just
164  * return InvalidOid.
165  */
166 Oid
get_subscription_oid(const char * subname,bool missing_ok)167 get_subscription_oid(const char *subname, bool missing_ok)
168 {
169 	Oid			oid;
170 
171 	oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
172 						  CStringGetDatum(subname));
173 	if (!OidIsValid(oid) && !missing_ok)
174 		ereport(ERROR,
175 				(errcode(ERRCODE_UNDEFINED_OBJECT),
176 				 errmsg("subscription \"%s\" does not exist", subname)));
177 	return oid;
178 }
179 
180 /*
181  * get_subscription_name - given a subscription OID, look up the name
182  */
183 char *
get_subscription_name(Oid subid)184 get_subscription_name(Oid subid)
185 {
186 	HeapTuple	tup;
187 	char	   *subname;
188 	Form_pg_subscription subform;
189 
190 	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
191 
192 	if (!HeapTupleIsValid(tup))
193 		elog(ERROR, "cache lookup failed for subscription %u", subid);
194 
195 	subform = (Form_pg_subscription) GETSTRUCT(tup);
196 	subname = pstrdup(NameStr(subform->subname));
197 
198 	ReleaseSysCache(tup);
199 
200 	return subname;
201 }
202 
203 /*
204  * Convert text array to list of strings.
205  *
206  * Note: the resulting list of strings is pallocated here.
207  */
208 static List *
textarray_to_stringlist(ArrayType * textarray)209 textarray_to_stringlist(ArrayType *textarray)
210 {
211 	Datum	   *elems;
212 	int			nelems,
213 				i;
214 	List	   *res = NIL;
215 
216 	deconstruct_array(textarray,
217 					  TEXTOID, -1, false, 'i',
218 					  &elems, NULL, &nelems);
219 
220 	if (nelems == 0)
221 		return NIL;
222 
223 	for (i = 0; i < nelems; i++)
224 		res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225 
226 	return res;
227 }
228 
229 /*
230  * Set the state of a subscription table.
231  *
232  * If update_only is true and the record for given table doesn't exist, do
233  * nothing.  This can be used to avoid inserting a new record that was deleted
234  * by someone else.  Generally, subscription DDL commands should use false,
235  * workers should use true.
236  *
237  * The insert-or-update logic in this function is not concurrency safe so it
238  * might raise an error in rare circumstances.  But if we took a stronger lock
239  * such as ShareRowExclusiveLock, we would risk more deadlocks.
240  */
241 Oid
SetSubscriptionRelState(Oid subid,Oid relid,char state,XLogRecPtr sublsn,bool update_only)242 SetSubscriptionRelState(Oid subid, Oid relid, char state,
243 						XLogRecPtr sublsn, bool update_only)
244 {
245 	Relation	rel;
246 	HeapTuple	tup;
247 	Oid			subrelid = InvalidOid;
248 	bool		nulls[Natts_pg_subscription_rel];
249 	Datum		values[Natts_pg_subscription_rel];
250 
251 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
252 
253 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
254 
255 	/* Try finding existing mapping. */
256 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
257 							  ObjectIdGetDatum(relid),
258 							  ObjectIdGetDatum(subid));
259 
260 	/*
261 	 * If the record for given table does not exist yet create new record,
262 	 * otherwise update the existing one.
263 	 */
264 	if (!HeapTupleIsValid(tup) && !update_only)
265 	{
266 		/* Form the tuple. */
267 		memset(values, 0, sizeof(values));
268 		memset(nulls, false, sizeof(nulls));
269 		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
270 		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
271 		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
272 		if (sublsn != InvalidXLogRecPtr)
273 			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
274 		else
275 			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
276 
277 		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
278 
279 		/* Insert tuple into catalog. */
280 		subrelid = CatalogTupleInsert(rel, tup);
281 
282 		heap_freetuple(tup);
283 	}
284 	else if (HeapTupleIsValid(tup))
285 	{
286 		bool		replaces[Natts_pg_subscription_rel];
287 
288 		/* Update the tuple. */
289 		memset(values, 0, sizeof(values));
290 		memset(nulls, false, sizeof(nulls));
291 		memset(replaces, false, sizeof(replaces));
292 
293 		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
294 		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
295 
296 		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
297 		if (sublsn != InvalidXLogRecPtr)
298 			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
299 		else
300 			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
301 
302 		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
303 								replaces);
304 
305 		/* Update the catalog. */
306 		CatalogTupleUpdate(rel, &tup->t_self, tup);
307 
308 		subrelid = HeapTupleGetOid(tup);
309 	}
310 
311 	/* Cleanup. */
312 	heap_close(rel, NoLock);
313 
314 	return subrelid;
315 }
316 
317 /*
318  * Get state of subscription table.
319  *
320  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
321  */
322 char
GetSubscriptionRelState(Oid subid,Oid relid,XLogRecPtr * sublsn,bool missing_ok)323 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
324 						bool missing_ok)
325 {
326 	Relation	rel;
327 	HeapTuple	tup;
328 	char		substate;
329 	bool		isnull;
330 	Datum		d;
331 
332 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
333 
334 	/* Try finding the mapping. */
335 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
336 						  ObjectIdGetDatum(relid),
337 						  ObjectIdGetDatum(subid));
338 
339 	if (!HeapTupleIsValid(tup))
340 	{
341 		if (missing_ok)
342 		{
343 			heap_close(rel, AccessShareLock);
344 			*sublsn = InvalidXLogRecPtr;
345 			return SUBREL_STATE_UNKNOWN;
346 		}
347 
348 		elog(ERROR, "subscription table %u in subscription %u does not exist",
349 			 relid, subid);
350 	}
351 
352 	/* Get the state. */
353 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
354 						Anum_pg_subscription_rel_srsubstate, &isnull);
355 	Assert(!isnull);
356 	substate = DatumGetChar(d);
357 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
358 						Anum_pg_subscription_rel_srsublsn, &isnull);
359 	if (isnull)
360 		*sublsn = InvalidXLogRecPtr;
361 	else
362 		*sublsn = DatumGetLSN(d);
363 
364 	/* Cleanup */
365 	ReleaseSysCache(tup);
366 	heap_close(rel, AccessShareLock);
367 
368 	return substate;
369 }
370 
371 /*
372  * Drop subscription relation mapping. These can be for a particular
373  * subscription, or for a particular relation, or both.
374  */
375 void
RemoveSubscriptionRel(Oid subid,Oid relid)376 RemoveSubscriptionRel(Oid subid, Oid relid)
377 {
378 	Relation	rel;
379 	HeapScanDesc scan;
380 	ScanKeyData skey[2];
381 	HeapTuple	tup;
382 	int			nkeys = 0;
383 
384 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
385 
386 	if (OidIsValid(subid))
387 	{
388 		ScanKeyInit(&skey[nkeys++],
389 					Anum_pg_subscription_rel_srsubid,
390 					BTEqualStrategyNumber,
391 					F_OIDEQ,
392 					ObjectIdGetDatum(subid));
393 	}
394 
395 	if (OidIsValid(relid))
396 	{
397 		ScanKeyInit(&skey[nkeys++],
398 					Anum_pg_subscription_rel_srrelid,
399 					BTEqualStrategyNumber,
400 					F_OIDEQ,
401 					ObjectIdGetDatum(relid));
402 	}
403 
404 	/* Do the search and delete what we found. */
405 	scan = heap_beginscan_catalog(rel, nkeys, skey);
406 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
407 	{
408 		CatalogTupleDelete(rel, &tup->t_self);
409 	}
410 	heap_endscan(scan);
411 
412 	heap_close(rel, RowExclusiveLock);
413 }
414 
415 
416 /*
417  * Get all relations for subscription.
418  *
419  * Returned list is palloc'ed in current memory context.
420  */
421 List *
GetSubscriptionRelations(Oid subid)422 GetSubscriptionRelations(Oid subid)
423 {
424 	List	   *res = NIL;
425 	Relation	rel;
426 	HeapTuple	tup;
427 	int			nkeys = 0;
428 	ScanKeyData skey[2];
429 	SysScanDesc scan;
430 
431 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
432 
433 	ScanKeyInit(&skey[nkeys++],
434 				Anum_pg_subscription_rel_srsubid,
435 				BTEqualStrategyNumber, F_OIDEQ,
436 				ObjectIdGetDatum(subid));
437 
438 	scan = systable_beginscan(rel, InvalidOid, false,
439 							  NULL, nkeys, skey);
440 
441 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
442 	{
443 		Form_pg_subscription_rel subrel;
444 		SubscriptionRelState *relstate;
445 		Datum		d;
446 		bool		isnull;
447 
448 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
449 
450 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
451 		relstate->relid = subrel->srrelid;
452 		relstate->state = subrel->srsubstate;
453 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
454 							Anum_pg_subscription_rel_srsublsn, &isnull);
455 		if (isnull)
456 			relstate->lsn = InvalidXLogRecPtr;
457 		else
458 			relstate->lsn = DatumGetLSN(d);
459 
460 		res = lappend(res, relstate);
461 	}
462 
463 	/* Cleanup */
464 	systable_endscan(scan);
465 	heap_close(rel, AccessShareLock);
466 
467 	return res;
468 }
469 
470 /*
471  * Get all relations for subscription that are not in a ready state.
472  *
473  * Returned list is palloc'ed in current memory context.
474  */
475 List *
GetSubscriptionNotReadyRelations(Oid subid)476 GetSubscriptionNotReadyRelations(Oid subid)
477 {
478 	List	   *res = NIL;
479 	Relation	rel;
480 	HeapTuple	tup;
481 	int			nkeys = 0;
482 	ScanKeyData skey[2];
483 	SysScanDesc scan;
484 
485 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
486 
487 	ScanKeyInit(&skey[nkeys++],
488 				Anum_pg_subscription_rel_srsubid,
489 				BTEqualStrategyNumber, F_OIDEQ,
490 				ObjectIdGetDatum(subid));
491 
492 	ScanKeyInit(&skey[nkeys++],
493 				Anum_pg_subscription_rel_srsubstate,
494 				BTEqualStrategyNumber, F_CHARNE,
495 				CharGetDatum(SUBREL_STATE_READY));
496 
497 	scan = systable_beginscan(rel, InvalidOid, false,
498 							  NULL, nkeys, skey);
499 
500 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
501 	{
502 		Form_pg_subscription_rel subrel;
503 		SubscriptionRelState *relstate;
504 		Datum		d;
505 		bool		isnull;
506 
507 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
508 
509 		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
510 		relstate->relid = subrel->srrelid;
511 		relstate->state = subrel->srsubstate;
512 		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
513 							Anum_pg_subscription_rel_srsublsn, &isnull);
514 		if (isnull)
515 			relstate->lsn = InvalidXLogRecPtr;
516 		else
517 			relstate->lsn = DatumGetLSN(d);
518 
519 		res = lappend(res, relstate);
520 	}
521 
522 	/* Cleanup */
523 	systable_endscan(scan);
524 	heap_close(rel, AccessShareLock);
525 
526 	return res;
527 }
528