1 /*-------------------------------------------------------------------------
2  *
3  * pg_publication.c
4  *		publication C API manipulation
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		pg_publication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/partition.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/objectaddress.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_publication.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45 
46 /*
47  * Check if relation can be in given publication and throws appropriate
48  * error if not.
49  */
50 static void
check_publication_add_relation(Relation targetrel)51 check_publication_add_relation(Relation targetrel)
52 {
53 	/* Must be a regular or partitioned table */
54 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
55 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
56 		ereport(ERROR,
57 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
58 				 errmsg("\"%s\" is not a table",
59 						RelationGetRelationName(targetrel)),
60 				 errdetail("Only tables can be added to publications.")));
61 
62 	/* Can't be system table */
63 	if (IsCatalogRelation(targetrel))
64 		ereport(ERROR,
65 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 				 errmsg("\"%s\" is a system table",
67 						RelationGetRelationName(targetrel)),
68 				 errdetail("System tables cannot be added to publications.")));
69 
70 	/* UNLOGGED and TEMP relations cannot be part of publication. */
71 	if (!RelationIsPermanent(targetrel))
72 		ereport(ERROR,
73 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 				 errmsg("table \"%s\" cannot be replicated",
75 						RelationGetRelationName(targetrel)),
76 				 errdetail("Temporary and unlogged relations cannot be replicated.")));
77 }
78 
79 /*
80  * Returns if relation represented by oid and Form_pg_class entry
81  * is publishable.
82  *
83  * Does same checks as the above, but does not need relation to be opened
84  * and also does not throw errors.
85  *
86  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
87  * ie all tables created during initdb.  This mainly affects the preinstalled
88  * information_schema.  IsCatalogRelationOid() only excludes tables with
89  * relid < FirstBootstrapObjectId, making that test rather redundant,
90  * but really we should get rid of the FirstNormalObjectId test not
91  * IsCatalogRelationOid.  We can't do so today because we don't want
92  * information_schema tables to be considered publishable; but this test
93  * is really inadequate for that, since the information_schema could be
94  * dropped and reloaded and then it'll be considered publishable.  The best
95  * long-term solution may be to add a "relispublishable" bool to pg_class,
96  * and depend on that instead of OID checks.
97  */
98 static bool
is_publishable_class(Oid relid,Form_pg_class reltuple)99 is_publishable_class(Oid relid, Form_pg_class reltuple)
100 {
101 	return (reltuple->relkind == RELKIND_RELATION ||
102 			reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
103 		!IsCatalogRelationOid(relid) &&
104 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 		relid >= FirstNormalObjectId;
106 }
107 
108 /*
109  * Another variant of this, taking a Relation.
110  */
111 bool
is_publishable_relation(Relation rel)112 is_publishable_relation(Relation rel)
113 {
114 	return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
115 }
116 
117 
118 /*
119  * SQL-callable variant of the above
120  *
121  * This returns null when the relation does not exist.  This is intended to be
122  * used for example in psql to avoid gratuitous errors when there are
123  * concurrent catalog changes.
124  */
125 Datum
pg_relation_is_publishable(PG_FUNCTION_ARGS)126 pg_relation_is_publishable(PG_FUNCTION_ARGS)
127 {
128 	Oid			relid = PG_GETARG_OID(0);
129 	HeapTuple	tuple;
130 	bool		result;
131 
132 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133 	if (!HeapTupleIsValid(tuple))
134 		PG_RETURN_NULL();
135 	result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136 	ReleaseSysCache(tuple);
137 	PG_RETURN_BOOL(result);
138 }
139 
140 /*
141  * Gets the relations based on the publication partition option for a specified
142  * relation.
143  */
144 List *
GetPubPartitionOptionRelations(List * result,PublicationPartOpt pub_partopt,Oid relid)145 GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
146 							   Oid relid)
147 {
148 	if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
149 		pub_partopt != PUBLICATION_PART_ROOT)
150 	{
151 		List	   *all_parts = find_all_inheritors(relid, NoLock,
152 													NULL);
153 
154 		if (pub_partopt == PUBLICATION_PART_ALL)
155 			result = list_concat(result, all_parts);
156 		else if (pub_partopt == PUBLICATION_PART_LEAF)
157 		{
158 			ListCell   *lc;
159 
160 			foreach(lc, all_parts)
161 			{
162 				Oid			partOid = lfirst_oid(lc);
163 
164 				if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
165 					result = lappend_oid(result, partOid);
166 			}
167 		}
168 		else
169 			Assert(false);
170 	}
171 	else
172 		result = lappend_oid(result, relid);
173 
174 	return result;
175 }
176 
177 /*
178  * Insert new publication / relation mapping.
179  */
180 ObjectAddress
publication_add_relation(Oid pubid,Relation targetrel,bool if_not_exists)181 publication_add_relation(Oid pubid, Relation targetrel,
182 						 bool if_not_exists)
183 {
184 	Relation	rel;
185 	HeapTuple	tup;
186 	Datum		values[Natts_pg_publication_rel];
187 	bool		nulls[Natts_pg_publication_rel];
188 	Oid			relid = RelationGetRelid(targetrel);
189 	Oid			prrelid;
190 	Publication *pub = GetPublication(pubid);
191 	ObjectAddress myself,
192 				referenced;
193 	List	   *relids = NIL;
194 
195 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
196 
197 	/*
198 	 * Check for duplicates. Note that this does not really prevent
199 	 * duplicates, it's here just to provide nicer error message in common
200 	 * case. The real protection is the unique key on the catalog.
201 	 */
202 	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
203 							  ObjectIdGetDatum(pubid)))
204 	{
205 		table_close(rel, RowExclusiveLock);
206 
207 		if (if_not_exists)
208 			return InvalidObjectAddress;
209 
210 		ereport(ERROR,
211 				(errcode(ERRCODE_DUPLICATE_OBJECT),
212 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
213 						RelationGetRelationName(targetrel), pub->name)));
214 	}
215 
216 	check_publication_add_relation(targetrel);
217 
218 	/* Form a tuple. */
219 	memset(values, 0, sizeof(values));
220 	memset(nulls, false, sizeof(nulls));
221 
222 	prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
223 								 Anum_pg_publication_rel_oid);
224 	values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
225 	values[Anum_pg_publication_rel_prpubid - 1] =
226 		ObjectIdGetDatum(pubid);
227 	values[Anum_pg_publication_rel_prrelid - 1] =
228 		ObjectIdGetDatum(relid);
229 
230 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
231 
232 	/* Insert tuple into catalog. */
233 	CatalogTupleInsert(rel, tup);
234 	heap_freetuple(tup);
235 
236 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
237 
238 	/* Add dependency on the publication */
239 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
240 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
241 
242 	/* Add dependency on the relation */
243 	ObjectAddressSet(referenced, RelationRelationId, relid);
244 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
245 
246 	/* Close the table. */
247 	table_close(rel, RowExclusiveLock);
248 
249 	/*
250 	 * Invalidate relcache so that publication info is rebuilt.
251 	 *
252 	 * For the partitioned tables, we must invalidate all partitions contained
253 	 * in the respective partition hierarchies, not just the one explicitly
254 	 * mentioned in the publication. This is required because we implicitly
255 	 * publish the child tables when the parent table is published.
256 	 */
257 	relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
258 											relid);
259 
260 	InvalidatePublicationRels(relids);
261 
262 	return myself;
263 }
264 
265 /* Gets list of publication oids for a relation */
266 List *
GetRelationPublications(Oid relid)267 GetRelationPublications(Oid relid)
268 {
269 	List	   *result = NIL;
270 	CatCList   *pubrellist;
271 	int			i;
272 
273 	/* Find all publications associated with the relation. */
274 	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
275 									 ObjectIdGetDatum(relid));
276 	for (i = 0; i < pubrellist->n_members; i++)
277 	{
278 		HeapTuple	tup = &pubrellist->members[i]->tuple;
279 		Oid			pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
280 
281 		result = lappend_oid(result, pubid);
282 	}
283 
284 	ReleaseSysCacheList(pubrellist);
285 
286 	return result;
287 }
288 
289 /*
290  * Gets list of relation oids for a publication.
291  *
292  * This should only be used FOR TABLE publications, the FOR ALL TABLES
293  * should use GetAllTablesPublicationRelations().
294  */
295 List *
GetPublicationRelations(Oid pubid,PublicationPartOpt pub_partopt)296 GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
297 {
298 	List	   *result;
299 	Relation	pubrelsrel;
300 	ScanKeyData scankey;
301 	SysScanDesc scan;
302 	HeapTuple	tup;
303 
304 	/* Find all publications associated with the relation. */
305 	pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
306 
307 	ScanKeyInit(&scankey,
308 				Anum_pg_publication_rel_prpubid,
309 				BTEqualStrategyNumber, F_OIDEQ,
310 				ObjectIdGetDatum(pubid));
311 
312 	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
313 							  true, NULL, 1, &scankey);
314 
315 	result = NIL;
316 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
317 	{
318 		Form_pg_publication_rel pubrel;
319 
320 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
321 		result = GetPubPartitionOptionRelations(result, pub_partopt,
322 												pubrel->prrelid);
323 	}
324 
325 	systable_endscan(scan);
326 	table_close(pubrelsrel, AccessShareLock);
327 
328 	return result;
329 }
330 
331 /*
332  * Gets list of publication oids for publications marked as FOR ALL TABLES.
333  */
334 List *
GetAllTablesPublications(void)335 GetAllTablesPublications(void)
336 {
337 	List	   *result;
338 	Relation	rel;
339 	ScanKeyData scankey;
340 	SysScanDesc scan;
341 	HeapTuple	tup;
342 
343 	/* Find all publications that are marked as for all tables. */
344 	rel = table_open(PublicationRelationId, AccessShareLock);
345 
346 	ScanKeyInit(&scankey,
347 				Anum_pg_publication_puballtables,
348 				BTEqualStrategyNumber, F_BOOLEQ,
349 				BoolGetDatum(true));
350 
351 	scan = systable_beginscan(rel, InvalidOid, false,
352 							  NULL, 1, &scankey);
353 
354 	result = NIL;
355 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
356 	{
357 		Oid			oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
358 
359 		result = lappend_oid(result, oid);
360 	}
361 
362 	systable_endscan(scan);
363 	table_close(rel, AccessShareLock);
364 
365 	return result;
366 }
367 
368 /*
369  * Gets list of all relation published by FOR ALL TABLES publication(s).
370  *
371  * If the publication publishes partition changes via their respective root
372  * partitioned tables, we must exclude partitions in favor of including the
373  * root partitioned tables.
374  */
375 List *
GetAllTablesPublicationRelations(bool pubviaroot)376 GetAllTablesPublicationRelations(bool pubviaroot)
377 {
378 	Relation	classRel;
379 	ScanKeyData key[1];
380 	TableScanDesc scan;
381 	HeapTuple	tuple;
382 	List	   *result = NIL;
383 
384 	classRel = table_open(RelationRelationId, AccessShareLock);
385 
386 	ScanKeyInit(&key[0],
387 				Anum_pg_class_relkind,
388 				BTEqualStrategyNumber, F_CHAREQ,
389 				CharGetDatum(RELKIND_RELATION));
390 
391 	scan = table_beginscan_catalog(classRel, 1, key);
392 
393 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
394 	{
395 		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
396 		Oid			relid = relForm->oid;
397 
398 		if (is_publishable_class(relid, relForm) &&
399 			!(relForm->relispartition && pubviaroot))
400 			result = lappend_oid(result, relid);
401 	}
402 
403 	table_endscan(scan);
404 
405 	if (pubviaroot)
406 	{
407 		ScanKeyInit(&key[0],
408 					Anum_pg_class_relkind,
409 					BTEqualStrategyNumber, F_CHAREQ,
410 					CharGetDatum(RELKIND_PARTITIONED_TABLE));
411 
412 		scan = table_beginscan_catalog(classRel, 1, key);
413 
414 		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
415 		{
416 			Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
417 			Oid			relid = relForm->oid;
418 
419 			if (is_publishable_class(relid, relForm) &&
420 				!relForm->relispartition)
421 				result = lappend_oid(result, relid);
422 		}
423 
424 		table_endscan(scan);
425 	}
426 
427 	table_close(classRel, AccessShareLock);
428 	return result;
429 }
430 
431 /*
432  * Get publication using oid
433  *
434  * The Publication struct and its data are palloc'ed here.
435  */
436 Publication *
GetPublication(Oid pubid)437 GetPublication(Oid pubid)
438 {
439 	HeapTuple	tup;
440 	Publication *pub;
441 	Form_pg_publication pubform;
442 
443 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
444 	if (!HeapTupleIsValid(tup))
445 		elog(ERROR, "cache lookup failed for publication %u", pubid);
446 
447 	pubform = (Form_pg_publication) GETSTRUCT(tup);
448 
449 	pub = (Publication *) palloc(sizeof(Publication));
450 	pub->oid = pubid;
451 	pub->name = pstrdup(NameStr(pubform->pubname));
452 	pub->alltables = pubform->puballtables;
453 	pub->pubactions.pubinsert = pubform->pubinsert;
454 	pub->pubactions.pubupdate = pubform->pubupdate;
455 	pub->pubactions.pubdelete = pubform->pubdelete;
456 	pub->pubactions.pubtruncate = pubform->pubtruncate;
457 	pub->pubviaroot = pubform->pubviaroot;
458 
459 	ReleaseSysCache(tup);
460 
461 	return pub;
462 }
463 
464 
465 /*
466  * Get Publication using name.
467  */
468 Publication *
GetPublicationByName(const char * pubname,bool missing_ok)469 GetPublicationByName(const char *pubname, bool missing_ok)
470 {
471 	Oid			oid;
472 
473 	oid = get_publication_oid(pubname, missing_ok);
474 
475 	return OidIsValid(oid) ? GetPublication(oid) : NULL;
476 }
477 
478 /*
479  * get_publication_oid - given a publication name, look up the OID
480  *
481  * If missing_ok is false, throw an error if name not found.  If true, just
482  * return InvalidOid.
483  */
484 Oid
get_publication_oid(const char * pubname,bool missing_ok)485 get_publication_oid(const char *pubname, bool missing_ok)
486 {
487 	Oid			oid;
488 
489 	oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
490 						  CStringGetDatum(pubname));
491 	if (!OidIsValid(oid) && !missing_ok)
492 		ereport(ERROR,
493 				(errcode(ERRCODE_UNDEFINED_OBJECT),
494 				 errmsg("publication \"%s\" does not exist", pubname)));
495 	return oid;
496 }
497 
498 /*
499  * get_publication_name - given a publication Oid, look up the name
500  *
501  * If missing_ok is false, throw an error if name not found.  If true, just
502  * return NULL.
503  */
504 char *
get_publication_name(Oid pubid,bool missing_ok)505 get_publication_name(Oid pubid, bool missing_ok)
506 {
507 	HeapTuple	tup;
508 	char	   *pubname;
509 	Form_pg_publication pubform;
510 
511 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
512 
513 	if (!HeapTupleIsValid(tup))
514 	{
515 		if (!missing_ok)
516 			elog(ERROR, "cache lookup failed for publication %u", pubid);
517 		return NULL;
518 	}
519 
520 	pubform = (Form_pg_publication) GETSTRUCT(tup);
521 	pubname = pstrdup(NameStr(pubform->pubname));
522 
523 	ReleaseSysCache(tup);
524 
525 	return pubname;
526 }
527 
528 /*
529  * Returns Oids of tables in a publication.
530  */
531 Datum
pg_get_publication_tables(PG_FUNCTION_ARGS)532 pg_get_publication_tables(PG_FUNCTION_ARGS)
533 {
534 	FuncCallContext *funcctx;
535 	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
536 	Publication *publication;
537 	List	   *tables;
538 
539 	/* stuff done only on the first call of the function */
540 	if (SRF_IS_FIRSTCALL())
541 	{
542 		MemoryContext oldcontext;
543 
544 		/* create a function context for cross-call persistence */
545 		funcctx = SRF_FIRSTCALL_INIT();
546 
547 		/* switch to memory context appropriate for multiple function calls */
548 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
549 
550 		publication = GetPublicationByName(pubname, false);
551 
552 		/*
553 		 * Publications support partitioned tables, although all changes are
554 		 * replicated using leaf partition identity and schema, so we only
555 		 * need those.
556 		 */
557 		if (publication->alltables)
558 			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
559 		else
560 			tables = GetPublicationRelations(publication->oid,
561 											 publication->pubviaroot ?
562 											 PUBLICATION_PART_ROOT :
563 											 PUBLICATION_PART_LEAF);
564 		funcctx->user_fctx = (void *) tables;
565 
566 		MemoryContextSwitchTo(oldcontext);
567 	}
568 
569 	/* stuff done on every call of the function */
570 	funcctx = SRF_PERCALL_SETUP();
571 	tables = (List *) funcctx->user_fctx;
572 
573 	if (funcctx->call_cntr < list_length(tables))
574 	{
575 		Oid			relid = list_nth_oid(tables, funcctx->call_cntr);
576 
577 		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
578 	}
579 
580 	SRF_RETURN_DONE(funcctx);
581 }
582