1 /*-------------------------------------------------------------------------
2  *
3  * pg_publication.c
4  *		publication C API manipulation
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		pg_publication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25 
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/index.h"
29 #include "catalog/indexing.h"
30 #include "catalog/namespace.h"
31 #include "catalog/objectaccess.h"
32 #include "catalog/objectaddress.h"
33 #include "catalog/pg_type.h"
34 #include "catalog/pg_publication.h"
35 #include "catalog/pg_publication_rel.h"
36 
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45 
46 /*
47  * Check if relation can be in given publication and throws appropriate
48  * error if not.
49  */
50 static void
check_publication_add_relation(Relation targetrel)51 check_publication_add_relation(Relation targetrel)
52 {
53 	/* Give more specific error for partitioned tables */
54 	if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55 		ereport(ERROR,
56 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57 				 errmsg("\"%s\" is a partitioned table",
58 						RelationGetRelationName(targetrel)),
59 				 errdetail("Adding partitioned tables to publications is not supported."),
60 				 errhint("You can add the table partitions individually.")));
61 
62 	/* Must be table */
63 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64 		ereport(ERROR,
65 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 				 errmsg("\"%s\" is not a table",
67 						RelationGetRelationName(targetrel)),
68 				 errdetail("Only tables can be added to publications.")));
69 
70 	/* Can't be system table */
71 	if (IsCatalogRelation(targetrel))
72 		ereport(ERROR,
73 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 				 errmsg("\"%s\" is a system table",
75 						RelationGetRelationName(targetrel)),
76 				 errdetail("System tables cannot be added to publications.")));
77 
78 	/* UNLOGGED and TEMP relations cannot be part of publication. */
79 	if (!RelationNeedsWAL(targetrel))
80 		ereport(ERROR,
81 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 				 errmsg("table \"%s\" cannot be replicated",
83 						RelationGetRelationName(targetrel)),
84 				 errdetail("Temporary and unlogged relations cannot be replicated.")));
85 }
86 
87 /*
88  * Returns if relation represented by oid and Form_pg_class entry
89  * is publishable.
90  *
91  * Does same checks as the above, but does not need relation to be opened
92  * and also does not throw errors.
93  *
94  * Note this also excludes all tables with relid < FirstNormalObjectId,
95  * ie all tables created during initdb.  This mainly affects the preinstalled
96  * information_schema.  (IsCatalogClass() only checks for these inside
97  * pg_catalog and toast schemas.)
98  */
99 static bool
is_publishable_class(Oid relid,Form_pg_class reltuple)100 is_publishable_class(Oid relid, Form_pg_class reltuple)
101 {
102 	return reltuple->relkind == RELKIND_RELATION &&
103 		!IsCatalogClass(relid, reltuple) &&
104 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 		relid >= FirstNormalObjectId;
106 }
107 
108 /*
109  * Another variant of this, taking a Relation.
110  */
111 bool
is_publishable_relation(Relation rel)112 is_publishable_relation(Relation rel)
113 {
114 	return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
115 }
116 
117 
118 /*
119  * SQL-callable variant of the above
120  *
121  * This returns null when the relation does not exist.  This is intended to be
122  * used for example in psql to avoid gratuitous errors when there are
123  * concurrent catalog changes.
124  */
125 Datum
pg_relation_is_publishable(PG_FUNCTION_ARGS)126 pg_relation_is_publishable(PG_FUNCTION_ARGS)
127 {
128 	Oid			relid = PG_GETARG_OID(0);
129 	HeapTuple	tuple;
130 	bool		result;
131 
132 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133 	if (!tuple)
134 		PG_RETURN_NULL();
135 	result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136 	ReleaseSysCache(tuple);
137 	PG_RETURN_BOOL(result);
138 }
139 
140 
141 /*
142  * Insert new publication / relation mapping.
143  */
144 ObjectAddress
publication_add_relation(Oid pubid,Relation targetrel,bool if_not_exists)145 publication_add_relation(Oid pubid, Relation targetrel,
146 						 bool if_not_exists)
147 {
148 	Relation	rel;
149 	HeapTuple	tup;
150 	Datum		values[Natts_pg_publication_rel];
151 	bool		nulls[Natts_pg_publication_rel];
152 	Oid			relid = RelationGetRelid(targetrel);
153 	Oid			prrelid;
154 	Publication *pub = GetPublication(pubid);
155 	ObjectAddress myself,
156 				referenced;
157 
158 	rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
159 
160 	/*
161 	 * Check for duplicates. Note that this does not really prevent
162 	 * duplicates, it's here just to provide nicer error message in common
163 	 * case. The real protection is the unique key on the catalog.
164 	 */
165 	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
166 							  ObjectIdGetDatum(pubid)))
167 	{
168 		heap_close(rel, RowExclusiveLock);
169 
170 		if (if_not_exists)
171 			return InvalidObjectAddress;
172 
173 		ereport(ERROR,
174 				(errcode(ERRCODE_DUPLICATE_OBJECT),
175 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
176 						RelationGetRelationName(targetrel), pub->name)));
177 	}
178 
179 	check_publication_add_relation(targetrel);
180 
181 	/* Form a tuple. */
182 	memset(values, 0, sizeof(values));
183 	memset(nulls, false, sizeof(nulls));
184 
185 	values[Anum_pg_publication_rel_prpubid - 1] =
186 		ObjectIdGetDatum(pubid);
187 	values[Anum_pg_publication_rel_prrelid - 1] =
188 		ObjectIdGetDatum(relid);
189 
190 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
191 
192 	/* Insert tuple into catalog. */
193 	prrelid = CatalogTupleInsert(rel, tup);
194 	heap_freetuple(tup);
195 
196 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
197 
198 	/* Add dependency on the publication */
199 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
200 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
201 
202 	/* Add dependency on the relation */
203 	ObjectAddressSet(referenced, RelationRelationId, relid);
204 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
205 
206 	/* Close the table. */
207 	heap_close(rel, RowExclusiveLock);
208 
209 	/* Invalidate relcache so that publication info is rebuilt. */
210 	CacheInvalidateRelcache(targetrel);
211 
212 	return myself;
213 }
214 
215 
216 /*
217  * Gets list of publication oids for a relation oid.
218  */
219 List *
GetRelationPublications(Oid relid)220 GetRelationPublications(Oid relid)
221 {
222 	List	   *result = NIL;
223 	CatCList   *pubrellist;
224 	int			i;
225 
226 	/* Find all publications associated with the relation. */
227 	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
228 									 ObjectIdGetDatum(relid));
229 	for (i = 0; i < pubrellist->n_members; i++)
230 	{
231 		HeapTuple	tup = &pubrellist->members[i]->tuple;
232 		Oid			pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
233 
234 		result = lappend_oid(result, pubid);
235 	}
236 
237 	ReleaseSysCacheList(pubrellist);
238 
239 	return result;
240 }
241 
242 /*
243  * Gets list of relation oids for a publication.
244  *
245  * This should only be used for normal publications, the FOR ALL TABLES
246  * should use GetAllTablesPublicationRelations().
247  */
248 List *
GetPublicationRelations(Oid pubid)249 GetPublicationRelations(Oid pubid)
250 {
251 	List	   *result;
252 	Relation	pubrelsrel;
253 	ScanKeyData scankey;
254 	SysScanDesc scan;
255 	HeapTuple	tup;
256 
257 	/* Find all publications associated with the relation. */
258 	pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
259 
260 	ScanKeyInit(&scankey,
261 				Anum_pg_publication_rel_prpubid,
262 				BTEqualStrategyNumber, F_OIDEQ,
263 				ObjectIdGetDatum(pubid));
264 
265 	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
266 							  true, NULL, 1, &scankey);
267 
268 	result = NIL;
269 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
270 	{
271 		Form_pg_publication_rel pubrel;
272 
273 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
274 
275 		result = lappend_oid(result, pubrel->prrelid);
276 	}
277 
278 	systable_endscan(scan);
279 	heap_close(pubrelsrel, AccessShareLock);
280 
281 	return result;
282 }
283 
284 /*
285  * Gets list of publication oids for publications marked as FOR ALL TABLES.
286  */
287 List *
GetAllTablesPublications(void)288 GetAllTablesPublications(void)
289 {
290 	List	   *result;
291 	Relation	rel;
292 	ScanKeyData scankey;
293 	SysScanDesc scan;
294 	HeapTuple	tup;
295 
296 	/* Find all publications that are marked as for all tables. */
297 	rel = heap_open(PublicationRelationId, AccessShareLock);
298 
299 	ScanKeyInit(&scankey,
300 				Anum_pg_publication_puballtables,
301 				BTEqualStrategyNumber, F_BOOLEQ,
302 				BoolGetDatum(true));
303 
304 	scan = systable_beginscan(rel, InvalidOid, false,
305 							  NULL, 1, &scankey);
306 
307 	result = NIL;
308 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
309 		result = lappend_oid(result, HeapTupleGetOid(tup));
310 
311 	systable_endscan(scan);
312 	heap_close(rel, AccessShareLock);
313 
314 	return result;
315 }
316 
317 /*
318  * Gets list of all relation published by FOR ALL TABLES publication(s).
319  */
320 List *
GetAllTablesPublicationRelations(void)321 GetAllTablesPublicationRelations(void)
322 {
323 	Relation	classRel;
324 	ScanKeyData key[1];
325 	HeapScanDesc scan;
326 	HeapTuple	tuple;
327 	List	   *result = NIL;
328 
329 	classRel = heap_open(RelationRelationId, AccessShareLock);
330 
331 	ScanKeyInit(&key[0],
332 				Anum_pg_class_relkind,
333 				BTEqualStrategyNumber, F_CHAREQ,
334 				CharGetDatum(RELKIND_RELATION));
335 
336 	scan = heap_beginscan_catalog(classRel, 1, key);
337 
338 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
339 	{
340 		Oid			relid = HeapTupleGetOid(tuple);
341 		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
342 
343 		if (is_publishable_class(relid, relForm))
344 			result = lappend_oid(result, relid);
345 	}
346 
347 	heap_endscan(scan);
348 	heap_close(classRel, AccessShareLock);
349 
350 	return result;
351 }
352 
353 /*
354  * Get publication using oid
355  *
356  * The Publication struct and its data are palloc'ed here.
357  */
358 Publication *
GetPublication(Oid pubid)359 GetPublication(Oid pubid)
360 {
361 	HeapTuple	tup;
362 	Publication *pub;
363 	Form_pg_publication pubform;
364 
365 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
366 
367 	if (!HeapTupleIsValid(tup))
368 		elog(ERROR, "cache lookup failed for publication %u", pubid);
369 
370 	pubform = (Form_pg_publication) GETSTRUCT(tup);
371 
372 	pub = (Publication *) palloc(sizeof(Publication));
373 	pub->oid = pubid;
374 	pub->name = pstrdup(NameStr(pubform->pubname));
375 	pub->alltables = pubform->puballtables;
376 	pub->pubactions.pubinsert = pubform->pubinsert;
377 	pub->pubactions.pubupdate = pubform->pubupdate;
378 	pub->pubactions.pubdelete = pubform->pubdelete;
379 
380 	ReleaseSysCache(tup);
381 
382 	return pub;
383 }
384 
385 
386 /*
387  * Get Publication using name.
388  */
389 Publication *
GetPublicationByName(const char * pubname,bool missing_ok)390 GetPublicationByName(const char *pubname, bool missing_ok)
391 {
392 	Oid			oid;
393 
394 	oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
395 	if (!OidIsValid(oid))
396 	{
397 		if (missing_ok)
398 			return NULL;
399 
400 		ereport(ERROR,
401 				(errcode(ERRCODE_UNDEFINED_OBJECT),
402 				 errmsg("publication \"%s\" does not exist", pubname)));
403 	}
404 
405 	return GetPublication(oid);
406 }
407 
408 /*
409  * get_publication_oid - given a publication name, look up the OID
410  *
411  * If missing_ok is false, throw an error if name not found.  If true, just
412  * return InvalidOid.
413  */
414 Oid
get_publication_oid(const char * pubname,bool missing_ok)415 get_publication_oid(const char *pubname, bool missing_ok)
416 {
417 	Oid			oid;
418 
419 	oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
420 	if (!OidIsValid(oid) && !missing_ok)
421 		ereport(ERROR,
422 				(errcode(ERRCODE_UNDEFINED_OBJECT),
423 				 errmsg("publication \"%s\" does not exist", pubname)));
424 	return oid;
425 }
426 
427 /*
428  * get_publication_name - given a publication Oid, look up the name
429  */
430 char *
get_publication_name(Oid pubid)431 get_publication_name(Oid pubid)
432 {
433 	HeapTuple	tup;
434 	char	   *pubname;
435 	Form_pg_publication pubform;
436 
437 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
438 
439 	if (!HeapTupleIsValid(tup))
440 		elog(ERROR, "cache lookup failed for publication %u", pubid);
441 
442 	pubform = (Form_pg_publication) GETSTRUCT(tup);
443 	pubname = pstrdup(NameStr(pubform->pubname));
444 
445 	ReleaseSysCache(tup);
446 
447 	return pubname;
448 }
449 
450 /*
451  * Returns Oids of tables in a publication.
452  */
453 Datum
pg_get_publication_tables(PG_FUNCTION_ARGS)454 pg_get_publication_tables(PG_FUNCTION_ARGS)
455 {
456 	FuncCallContext *funcctx;
457 	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
458 	Publication *publication;
459 	List	   *tables;
460 	ListCell  **lcp;
461 
462 	/* stuff done only on the first call of the function */
463 	if (SRF_IS_FIRSTCALL())
464 	{
465 		MemoryContext oldcontext;
466 
467 		/* create a function context for cross-call persistence */
468 		funcctx = SRF_FIRSTCALL_INIT();
469 
470 		/* switch to memory context appropriate for multiple function calls */
471 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
472 
473 		publication = GetPublicationByName(pubname, false);
474 		if (publication->alltables)
475 			tables = GetAllTablesPublicationRelations();
476 		else
477 			tables = GetPublicationRelations(publication->oid);
478 		lcp = (ListCell **) palloc(sizeof(ListCell *));
479 		*lcp = list_head(tables);
480 		funcctx->user_fctx = (void *) lcp;
481 
482 		MemoryContextSwitchTo(oldcontext);
483 	}
484 
485 	/* stuff done on every call of the function */
486 	funcctx = SRF_PERCALL_SETUP();
487 	lcp = (ListCell **) funcctx->user_fctx;
488 
489 	while (*lcp != NULL)
490 	{
491 		Oid			relid = lfirst_oid(*lcp);
492 
493 		*lcp = lnext(*lcp);
494 		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
495 	}
496 
497 	SRF_RETURN_DONE(funcctx);
498 }
499