1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  *		publication manipulation
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_publication.h"
29 #include "catalog/pg_publication_rel.h"
30 #include "catalog/pg_type.h"
31 #include "commands/dbcommands.h"
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/acl.h"
38 #include "utils/array.h"
39 #include "utils/builtins.h"
40 #include "utils/catcache.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/lsyscache.h"
44 #include "utils/rel.h"
45 #include "utils/syscache.h"
46 #include "utils/varlena.h"
47 
48 static List *OpenTableList(List *tables);
49 static void CloseTableList(List *rels);
50 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
51 								 AlterPublicationStmt *stmt);
52 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
53 
54 static void
parse_publication_options(List * options,bool * publish_given,PublicationActions * pubactions,bool * publish_via_partition_root_given,bool * publish_via_partition_root)55 parse_publication_options(List *options,
56 						  bool *publish_given,
57 						  PublicationActions *pubactions,
58 						  bool *publish_via_partition_root_given,
59 						  bool *publish_via_partition_root)
60 {
61 	ListCell   *lc;
62 
63 	*publish_given = false;
64 	*publish_via_partition_root_given = false;
65 
66 	/* defaults */
67 	pubactions->pubinsert = true;
68 	pubactions->pubupdate = true;
69 	pubactions->pubdelete = true;
70 	pubactions->pubtruncate = true;
71 	*publish_via_partition_root = false;
72 
73 	/* Parse options */
74 	foreach(lc, options)
75 	{
76 		DefElem    *defel = (DefElem *) lfirst(lc);
77 
78 		if (strcmp(defel->defname, "publish") == 0)
79 		{
80 			char	   *publish;
81 			List	   *publish_list;
82 			ListCell   *lc;
83 
84 			if (*publish_given)
85 				ereport(ERROR,
86 						(errcode(ERRCODE_SYNTAX_ERROR),
87 						 errmsg("conflicting or redundant options")));
88 
89 			/*
90 			 * If publish option was given only the explicitly listed actions
91 			 * should be published.
92 			 */
93 			pubactions->pubinsert = false;
94 			pubactions->pubupdate = false;
95 			pubactions->pubdelete = false;
96 			pubactions->pubtruncate = false;
97 
98 			*publish_given = true;
99 			publish = defGetString(defel);
100 
101 			if (!SplitIdentifierString(publish, ',', &publish_list))
102 				ereport(ERROR,
103 						(errcode(ERRCODE_SYNTAX_ERROR),
104 						 errmsg("invalid list syntax for \"publish\" option")));
105 
106 			/* Process the option list. */
107 			foreach(lc, publish_list)
108 			{
109 				char	   *publish_opt = (char *) lfirst(lc);
110 
111 				if (strcmp(publish_opt, "insert") == 0)
112 					pubactions->pubinsert = true;
113 				else if (strcmp(publish_opt, "update") == 0)
114 					pubactions->pubupdate = true;
115 				else if (strcmp(publish_opt, "delete") == 0)
116 					pubactions->pubdelete = true;
117 				else if (strcmp(publish_opt, "truncate") == 0)
118 					pubactions->pubtruncate = true;
119 				else
120 					ereport(ERROR,
121 							(errcode(ERRCODE_SYNTAX_ERROR),
122 							 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123 			}
124 		}
125 		else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
126 		{
127 			if (*publish_via_partition_root_given)
128 				ereport(ERROR,
129 						(errcode(ERRCODE_SYNTAX_ERROR),
130 						 errmsg("conflicting or redundant options")));
131 			*publish_via_partition_root_given = true;
132 			*publish_via_partition_root = defGetBoolean(defel);
133 		}
134 		else
135 			ereport(ERROR,
136 					(errcode(ERRCODE_SYNTAX_ERROR),
137 					 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
138 	}
139 }
140 
141 /*
142  * Create new publication.
143  */
144 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)145 CreatePublication(CreatePublicationStmt *stmt)
146 {
147 	Relation	rel;
148 	ObjectAddress myself;
149 	Oid			puboid;
150 	bool		nulls[Natts_pg_publication];
151 	Datum		values[Natts_pg_publication];
152 	HeapTuple	tup;
153 	bool		publish_given;
154 	PublicationActions pubactions;
155 	bool		publish_via_partition_root_given;
156 	bool		publish_via_partition_root;
157 	AclResult	aclresult;
158 
159 	/* must have CREATE privilege on database */
160 	aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
161 	if (aclresult != ACLCHECK_OK)
162 		aclcheck_error(aclresult, OBJECT_DATABASE,
163 					   get_database_name(MyDatabaseId));
164 
165 	/* FOR ALL TABLES requires superuser */
166 	if (stmt->for_all_tables && !superuser())
167 		ereport(ERROR,
168 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
169 				 errmsg("must be superuser to create FOR ALL TABLES publication")));
170 
171 	rel = table_open(PublicationRelationId, RowExclusiveLock);
172 
173 	/* Check if name is used */
174 	puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
175 							 CStringGetDatum(stmt->pubname));
176 	if (OidIsValid(puboid))
177 	{
178 		ereport(ERROR,
179 				(errcode(ERRCODE_DUPLICATE_OBJECT),
180 				 errmsg("publication \"%s\" already exists",
181 						stmt->pubname)));
182 	}
183 
184 	/* Form a tuple. */
185 	memset(values, 0, sizeof(values));
186 	memset(nulls, false, sizeof(nulls));
187 
188 	values[Anum_pg_publication_pubname - 1] =
189 		DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
190 	values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
191 
192 	parse_publication_options(stmt->options,
193 							  &publish_given, &pubactions,
194 							  &publish_via_partition_root_given,
195 							  &publish_via_partition_root);
196 
197 	puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
198 								Anum_pg_publication_oid);
199 	values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
200 	values[Anum_pg_publication_puballtables - 1] =
201 		BoolGetDatum(stmt->for_all_tables);
202 	values[Anum_pg_publication_pubinsert - 1] =
203 		BoolGetDatum(pubactions.pubinsert);
204 	values[Anum_pg_publication_pubupdate - 1] =
205 		BoolGetDatum(pubactions.pubupdate);
206 	values[Anum_pg_publication_pubdelete - 1] =
207 		BoolGetDatum(pubactions.pubdelete);
208 	values[Anum_pg_publication_pubtruncate - 1] =
209 		BoolGetDatum(pubactions.pubtruncate);
210 	values[Anum_pg_publication_pubviaroot - 1] =
211 		BoolGetDatum(publish_via_partition_root);
212 
213 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
214 
215 	/* Insert tuple into catalog. */
216 	CatalogTupleInsert(rel, tup);
217 	heap_freetuple(tup);
218 
219 	recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
220 
221 	ObjectAddressSet(myself, PublicationRelationId, puboid);
222 
223 	/* Make the changes visible. */
224 	CommandCounterIncrement();
225 
226 	if (stmt->tables)
227 	{
228 		List	   *rels;
229 
230 		Assert(list_length(stmt->tables) > 0);
231 
232 		rels = OpenTableList(stmt->tables);
233 		PublicationAddTables(puboid, rels, true, NULL);
234 		CloseTableList(rels);
235 	}
236 	else if (stmt->for_all_tables)
237 	{
238 		/* Invalidate relcache so that publication info is rebuilt. */
239 		CacheInvalidateRelcacheAll();
240 	}
241 
242 	table_close(rel, RowExclusiveLock);
243 
244 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
245 
246 	if (wal_level != WAL_LEVEL_LOGICAL)
247 	{
248 		ereport(WARNING,
249 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
250 				 errmsg("wal_level is insufficient to publish logical changes"),
251 				 errhint("Set wal_level to logical before creating subscriptions.")));
252 	}
253 
254 	return myself;
255 }
256 
257 /*
258  * Change options of a publication.
259  */
260 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)261 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
262 						HeapTuple tup)
263 {
264 	bool		nulls[Natts_pg_publication];
265 	bool		replaces[Natts_pg_publication];
266 	Datum		values[Natts_pg_publication];
267 	bool		publish_given;
268 	PublicationActions pubactions;
269 	bool		publish_via_partition_root_given;
270 	bool		publish_via_partition_root;
271 	ObjectAddress obj;
272 	Form_pg_publication pubform;
273 
274 	parse_publication_options(stmt->options,
275 							  &publish_given, &pubactions,
276 							  &publish_via_partition_root_given,
277 							  &publish_via_partition_root);
278 
279 	/* Everything ok, form a new tuple. */
280 	memset(values, 0, sizeof(values));
281 	memset(nulls, false, sizeof(nulls));
282 	memset(replaces, false, sizeof(replaces));
283 
284 	if (publish_given)
285 	{
286 		values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
287 		replaces[Anum_pg_publication_pubinsert - 1] = true;
288 
289 		values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
290 		replaces[Anum_pg_publication_pubupdate - 1] = true;
291 
292 		values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
293 		replaces[Anum_pg_publication_pubdelete - 1] = true;
294 
295 		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
296 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
297 	}
298 
299 	if (publish_via_partition_root_given)
300 	{
301 		values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
302 		replaces[Anum_pg_publication_pubviaroot - 1] = true;
303 	}
304 
305 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
306 							replaces);
307 
308 	/* Update the catalog. */
309 	CatalogTupleUpdate(rel, &tup->t_self, tup);
310 
311 	CommandCounterIncrement();
312 
313 	pubform = (Form_pg_publication) GETSTRUCT(tup);
314 
315 	/* Invalidate the relcache. */
316 	if (pubform->puballtables)
317 	{
318 		CacheInvalidateRelcacheAll();
319 	}
320 	else
321 	{
322 		/*
323 		 * For any partitioned tables contained in the publication, we must
324 		 * invalidate all partitions contained in the respective partition
325 		 * trees, not just those explicitly mentioned in the publication.
326 		 */
327 		List	   *relids = GetPublicationRelations(pubform->oid,
328 													 PUBLICATION_PART_ALL);
329 
330 		InvalidatePublicationRels(relids);
331 	}
332 
333 	ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
334 	EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
335 									 (Node *) stmt);
336 
337 	InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
338 }
339 
340 /*
341  * Invalidate the relations.
342  */
343 void
InvalidatePublicationRels(List * relids)344 InvalidatePublicationRels(List *relids)
345 {
346 	/*
347 	 * We don't want to send too many individual messages, at some point it's
348 	 * cheaper to just reset whole relcache.
349 	 */
350 	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
351 	{
352 		ListCell   *lc;
353 
354 		foreach(lc, relids)
355 			CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
356 	}
357 	else
358 		CacheInvalidateRelcacheAll();
359 }
360 
361 /*
362  * Add or remove table to/from publication.
363  */
364 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)365 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
366 					   HeapTuple tup)
367 {
368 	List	   *rels = NIL;
369 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
370 	Oid			pubid = pubform->oid;
371 
372 	/* Check that user is allowed to manipulate the publication tables. */
373 	if (pubform->puballtables)
374 		ereport(ERROR,
375 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
377 						NameStr(pubform->pubname)),
378 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
379 
380 	Assert(list_length(stmt->tables) > 0);
381 
382 	rels = OpenTableList(stmt->tables);
383 
384 	if (stmt->tableAction == DEFELEM_ADD)
385 		PublicationAddTables(pubid, rels, false, stmt);
386 	else if (stmt->tableAction == DEFELEM_DROP)
387 		PublicationDropTables(pubid, rels, false);
388 	else						/* DEFELEM_SET */
389 	{
390 		List	   *oldrelids = GetPublicationRelations(pubid,
391 														PUBLICATION_PART_ROOT);
392 		List	   *delrels = NIL;
393 		ListCell   *oldlc;
394 
395 		/* Calculate which relations to drop. */
396 		foreach(oldlc, oldrelids)
397 		{
398 			Oid			oldrelid = lfirst_oid(oldlc);
399 			ListCell   *newlc;
400 			bool		found = false;
401 
402 			foreach(newlc, rels)
403 			{
404 				Relation	newrel = (Relation) lfirst(newlc);
405 
406 				if (RelationGetRelid(newrel) == oldrelid)
407 				{
408 					found = true;
409 					break;
410 				}
411 			}
412 
413 			if (!found)
414 			{
415 				Relation	oldrel = table_open(oldrelid,
416 												ShareUpdateExclusiveLock);
417 
418 				delrels = lappend(delrels, oldrel);
419 			}
420 		}
421 
422 		/* And drop them. */
423 		PublicationDropTables(pubid, delrels, true);
424 
425 		/*
426 		 * Don't bother calculating the difference for adding, we'll catch and
427 		 * skip existing ones when doing catalog update.
428 		 */
429 		PublicationAddTables(pubid, rels, true, stmt);
430 
431 		CloseTableList(delrels);
432 	}
433 
434 	CloseTableList(rels);
435 }
436 
437 /*
438  * Alter the existing publication.
439  *
440  * This is dispatcher function for AlterPublicationOptions and
441  * AlterPublicationTables.
442  */
443 void
AlterPublication(AlterPublicationStmt * stmt)444 AlterPublication(AlterPublicationStmt *stmt)
445 {
446 	Relation	rel;
447 	HeapTuple	tup;
448 	Form_pg_publication pubform;
449 
450 	rel = table_open(PublicationRelationId, RowExclusiveLock);
451 
452 	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
453 							  CStringGetDatum(stmt->pubname));
454 
455 	if (!HeapTupleIsValid(tup))
456 		ereport(ERROR,
457 				(errcode(ERRCODE_UNDEFINED_OBJECT),
458 				 errmsg("publication \"%s\" does not exist",
459 						stmt->pubname)));
460 
461 	pubform = (Form_pg_publication) GETSTRUCT(tup);
462 
463 	/* must be owner */
464 	if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
465 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
466 					   stmt->pubname);
467 
468 	if (stmt->options)
469 		AlterPublicationOptions(stmt, rel, tup);
470 	else
471 		AlterPublicationTables(stmt, rel, tup);
472 
473 	/* Cleanup. */
474 	heap_freetuple(tup);
475 	table_close(rel, RowExclusiveLock);
476 }
477 
478 /*
479  * Remove the publication by mapping OID.
480  */
481 void
RemovePublicationById(Oid pubid)482 RemovePublicationById(Oid pubid)
483 {
484 	Relation	rel;
485 	HeapTuple	tup;
486 	Form_pg_publication pubform;
487 
488 	rel = table_open(PublicationRelationId, RowExclusiveLock);
489 
490 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
491 
492 	if (!HeapTupleIsValid(tup))
493 		elog(ERROR, "cache lookup failed for publication %u", pubid);
494 
495 	pubform = (Form_pg_publication) GETSTRUCT(tup);
496 
497 	/* Invalidate relcache so that publication info is rebuilt. */
498 	if (pubform->puballtables)
499 		CacheInvalidateRelcacheAll();
500 
501 	CatalogTupleDelete(rel, &tup->t_self);
502 
503 	ReleaseSysCache(tup);
504 
505 	table_close(rel, RowExclusiveLock);
506 }
507 
508 /*
509  * Remove relation from publication by mapping OID.
510  */
511 void
RemovePublicationRelById(Oid proid)512 RemovePublicationRelById(Oid proid)
513 {
514 	Relation	rel;
515 	HeapTuple	tup;
516 	Form_pg_publication_rel pubrel;
517 	List	   *relids = NIL;
518 
519 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
520 
521 	tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
522 
523 	if (!HeapTupleIsValid(tup))
524 		elog(ERROR, "cache lookup failed for publication table %u",
525 			 proid);
526 
527 	pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
528 
529 	/*
530 	 * Invalidate relcache so that publication info is rebuilt.
531 	 *
532 	 * For the partitioned tables, we must invalidate all partitions contained
533 	 * in the respective partition hierarchies, not just the one explicitly
534 	 * mentioned in the publication. This is required because we implicitly
535 	 * publish the child tables when the parent table is published.
536 	 */
537 	relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538 											pubrel->prrelid);
539 
540 	InvalidatePublicationRels(relids);
541 
542 	CatalogTupleDelete(rel, &tup->t_self);
543 
544 	ReleaseSysCache(tup);
545 
546 	table_close(rel, RowExclusiveLock);
547 }
548 
549 /*
550  * Open relations specified by a RangeVar list.
551  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
552  * add them to a publication.
553  */
554 static List *
OpenTableList(List * tables)555 OpenTableList(List *tables)
556 {
557 	List	   *relids = NIL;
558 	List	   *rels = NIL;
559 	ListCell   *lc;
560 
561 	/*
562 	 * Open, share-lock, and check all the explicitly-specified relations
563 	 */
564 	foreach(lc, tables)
565 	{
566 		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
567 		bool		recurse = rv->inh;
568 		Relation	rel;
569 		Oid			myrelid;
570 
571 		/* Allow query cancel in case this takes a long time */
572 		CHECK_FOR_INTERRUPTS();
573 
574 		rel = table_openrv(rv, ShareUpdateExclusiveLock);
575 		myrelid = RelationGetRelid(rel);
576 
577 		/*
578 		 * Filter out duplicates if user specifies "foo, foo".
579 		 *
580 		 * Note that this algorithm is known to not be very efficient (O(N^2))
581 		 * but given that it only works on list of tables given to us by user
582 		 * it's deemed acceptable.
583 		 */
584 		if (list_member_oid(relids, myrelid))
585 		{
586 			table_close(rel, ShareUpdateExclusiveLock);
587 			continue;
588 		}
589 
590 		rels = lappend(rels, rel);
591 		relids = lappend_oid(relids, myrelid);
592 
593 		/*
594 		 * Add children of this rel, if requested, so that they too are added
595 		 * to the publication.  A partitioned table can't have any inheritance
596 		 * children other than its partitions, which need not be explicitly
597 		 * added to the publication.
598 		 */
599 		if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
600 		{
601 			List	   *children;
602 			ListCell   *child;
603 
604 			children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
605 										   NULL);
606 
607 			foreach(child, children)
608 			{
609 				Oid			childrelid = lfirst_oid(child);
610 
611 				/* Allow query cancel in case this takes a long time */
612 				CHECK_FOR_INTERRUPTS();
613 
614 				/*
615 				 * Skip duplicates if user specified both parent and child
616 				 * tables.
617 				 */
618 				if (list_member_oid(relids, childrelid))
619 					continue;
620 
621 				/* find_all_inheritors already got lock */
622 				rel = table_open(childrelid, NoLock);
623 				rels = lappend(rels, rel);
624 				relids = lappend_oid(relids, childrelid);
625 			}
626 		}
627 	}
628 
629 	list_free(relids);
630 
631 	return rels;
632 }
633 
634 /*
635  * Close all relations in the list.
636  */
637 static void
CloseTableList(List * rels)638 CloseTableList(List *rels)
639 {
640 	ListCell   *lc;
641 
642 	foreach(lc, rels)
643 	{
644 		Relation	rel = (Relation) lfirst(lc);
645 
646 		table_close(rel, NoLock);
647 	}
648 }
649 
650 /*
651  * Add listed tables to the publication.
652  */
653 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)654 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
655 					 AlterPublicationStmt *stmt)
656 {
657 	ListCell   *lc;
658 
659 	Assert(!stmt || !stmt->for_all_tables);
660 
661 	foreach(lc, rels)
662 	{
663 		Relation	rel = (Relation) lfirst(lc);
664 		ObjectAddress obj;
665 
666 		/* Must be owner of the table or superuser. */
667 		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
668 			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
669 						   RelationGetRelationName(rel));
670 
671 		obj = publication_add_relation(pubid, rel, if_not_exists);
672 		if (stmt)
673 		{
674 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
675 											 (Node *) stmt);
676 
677 			InvokeObjectPostCreateHook(PublicationRelRelationId,
678 									   obj.objectId, 0);
679 		}
680 	}
681 }
682 
683 /*
684  * Remove listed tables from the publication.
685  */
686 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)687 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
688 {
689 	ObjectAddress obj;
690 	ListCell   *lc;
691 	Oid			prid;
692 
693 	foreach(lc, rels)
694 	{
695 		Relation	rel = (Relation) lfirst(lc);
696 		Oid			relid = RelationGetRelid(rel);
697 
698 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
699 							   ObjectIdGetDatum(relid),
700 							   ObjectIdGetDatum(pubid));
701 		if (!OidIsValid(prid))
702 		{
703 			if (missing_ok)
704 				continue;
705 
706 			ereport(ERROR,
707 					(errcode(ERRCODE_UNDEFINED_OBJECT),
708 					 errmsg("relation \"%s\" is not part of the publication",
709 							RelationGetRelationName(rel))));
710 		}
711 
712 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
713 		performDeletion(&obj, DROP_CASCADE, 0);
714 	}
715 }
716 
717 /*
718  * Internal workhorse for changing a publication owner
719  */
720 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)721 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
722 {
723 	Form_pg_publication form;
724 
725 	form = (Form_pg_publication) GETSTRUCT(tup);
726 
727 	if (form->pubowner == newOwnerId)
728 		return;
729 
730 	if (!superuser())
731 	{
732 		AclResult	aclresult;
733 
734 		/* Must be owner */
735 		if (!pg_publication_ownercheck(form->oid, GetUserId()))
736 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
737 						   NameStr(form->pubname));
738 
739 		/* Must be able to become new owner */
740 		check_is_member_of_role(GetUserId(), newOwnerId);
741 
742 		/* New owner must have CREATE privilege on database */
743 		aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
744 		if (aclresult != ACLCHECK_OK)
745 			aclcheck_error(aclresult, OBJECT_DATABASE,
746 						   get_database_name(MyDatabaseId));
747 
748 		if (form->puballtables && !superuser_arg(newOwnerId))
749 			ereport(ERROR,
750 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
751 					 errmsg("permission denied to change owner of publication \"%s\"",
752 							NameStr(form->pubname)),
753 					 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
754 	}
755 
756 	form->pubowner = newOwnerId;
757 	CatalogTupleUpdate(rel, &tup->t_self, tup);
758 
759 	/* Update owner dependency reference */
760 	changeDependencyOnOwner(PublicationRelationId,
761 							form->oid,
762 							newOwnerId);
763 
764 	InvokeObjectPostAlterHook(PublicationRelationId,
765 							  form->oid, 0);
766 }
767 
768 /*
769  * Change publication owner -- by name
770  */
771 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)772 AlterPublicationOwner(const char *name, Oid newOwnerId)
773 {
774 	Oid			subid;
775 	HeapTuple	tup;
776 	Relation	rel;
777 	ObjectAddress address;
778 	Form_pg_publication pubform;
779 
780 	rel = table_open(PublicationRelationId, RowExclusiveLock);
781 
782 	tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
783 
784 	if (!HeapTupleIsValid(tup))
785 		ereport(ERROR,
786 				(errcode(ERRCODE_UNDEFINED_OBJECT),
787 				 errmsg("publication \"%s\" does not exist", name)));
788 
789 	pubform = (Form_pg_publication) GETSTRUCT(tup);
790 	subid = pubform->oid;
791 
792 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
793 
794 	ObjectAddressSet(address, PublicationRelationId, subid);
795 
796 	heap_freetuple(tup);
797 
798 	table_close(rel, RowExclusiveLock);
799 
800 	return address;
801 }
802 
803 /*
804  * Change publication owner -- by OID
805  */
806 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)807 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
808 {
809 	HeapTuple	tup;
810 	Relation	rel;
811 
812 	rel = table_open(PublicationRelationId, RowExclusiveLock);
813 
814 	tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
815 
816 	if (!HeapTupleIsValid(tup))
817 		ereport(ERROR,
818 				(errcode(ERRCODE_UNDEFINED_OBJECT),
819 				 errmsg("publication with OID %u does not exist", subid)));
820 
821 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
822 
823 	heap_freetuple(tup);
824 
825 	table_close(rel, RowExclusiveLock);
826 }
827