1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  *		publication manipulation
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_publication.h"
29 #include "catalog/pg_publication_rel.h"
30 #include "catalog/pg_type.h"
31 #include "commands/dbcommands.h"
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/acl.h"
38 #include "utils/array.h"
39 #include "utils/builtins.h"
40 #include "utils/catcache.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/lsyscache.h"
44 #include "utils/rel.h"
45 #include "utils/syscache.h"
46 #include "utils/varlena.h"
47 
48 static List *OpenTableList(List *tables);
49 static void CloseTableList(List *rels);
50 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
51 								 AlterPublicationStmt *stmt);
52 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
53 
54 static void
parse_publication_options(List * options,bool * publish_given,PublicationActions * pubactions,bool * publish_via_partition_root_given,bool * publish_via_partition_root)55 parse_publication_options(List *options,
56 						  bool *publish_given,
57 						  PublicationActions *pubactions,
58 						  bool *publish_via_partition_root_given,
59 						  bool *publish_via_partition_root)
60 {
61 	ListCell   *lc;
62 
63 	*publish_given = false;
64 	*publish_via_partition_root_given = false;
65 
66 	/* defaults */
67 	pubactions->pubinsert = true;
68 	pubactions->pubupdate = true;
69 	pubactions->pubdelete = true;
70 	pubactions->pubtruncate = true;
71 	*publish_via_partition_root = false;
72 
73 	/* Parse options */
74 	foreach(lc, options)
75 	{
76 		DefElem    *defel = (DefElem *) lfirst(lc);
77 
78 		if (strcmp(defel->defname, "publish") == 0)
79 		{
80 			char	   *publish;
81 			List	   *publish_list;
82 			ListCell   *lc;
83 
84 			if (*publish_given)
85 				ereport(ERROR,
86 						(errcode(ERRCODE_SYNTAX_ERROR),
87 						 errmsg("conflicting or redundant options")));
88 
89 			/*
90 			 * If publish option was given only the explicitly listed actions
91 			 * should be published.
92 			 */
93 			pubactions->pubinsert = false;
94 			pubactions->pubupdate = false;
95 			pubactions->pubdelete = false;
96 			pubactions->pubtruncate = false;
97 
98 			*publish_given = true;
99 			publish = defGetString(defel);
100 
101 			if (!SplitIdentifierString(publish, ',', &publish_list))
102 				ereport(ERROR,
103 						(errcode(ERRCODE_SYNTAX_ERROR),
104 						 errmsg("invalid list syntax for \"publish\" option")));
105 
106 			/* Process the option list. */
107 			foreach(lc, publish_list)
108 			{
109 				char	   *publish_opt = (char *) lfirst(lc);
110 
111 				if (strcmp(publish_opt, "insert") == 0)
112 					pubactions->pubinsert = true;
113 				else if (strcmp(publish_opt, "update") == 0)
114 					pubactions->pubupdate = true;
115 				else if (strcmp(publish_opt, "delete") == 0)
116 					pubactions->pubdelete = true;
117 				else if (strcmp(publish_opt, "truncate") == 0)
118 					pubactions->pubtruncate = true;
119 				else
120 					ereport(ERROR,
121 							(errcode(ERRCODE_SYNTAX_ERROR),
122 							 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123 			}
124 		}
125 		else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
126 		{
127 			if (*publish_via_partition_root_given)
128 				ereport(ERROR,
129 						(errcode(ERRCODE_SYNTAX_ERROR),
130 						 errmsg("conflicting or redundant options")));
131 			*publish_via_partition_root_given = true;
132 			*publish_via_partition_root = defGetBoolean(defel);
133 		}
134 		else
135 			ereport(ERROR,
136 					(errcode(ERRCODE_SYNTAX_ERROR),
137 					 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
138 	}
139 }
140 
141 /*
142  * Create new publication.
143  */
144 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)145 CreatePublication(CreatePublicationStmt *stmt)
146 {
147 	Relation	rel;
148 	ObjectAddress myself;
149 	Oid			puboid;
150 	bool		nulls[Natts_pg_publication];
151 	Datum		values[Natts_pg_publication];
152 	HeapTuple	tup;
153 	bool		publish_given;
154 	PublicationActions pubactions;
155 	bool		publish_via_partition_root_given;
156 	bool		publish_via_partition_root;
157 	AclResult	aclresult;
158 
159 	/* must have CREATE privilege on database */
160 	aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
161 	if (aclresult != ACLCHECK_OK)
162 		aclcheck_error(aclresult, OBJECT_DATABASE,
163 					   get_database_name(MyDatabaseId));
164 
165 	/* FOR ALL TABLES requires superuser */
166 	if (stmt->for_all_tables && !superuser())
167 		ereport(ERROR,
168 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
169 				 errmsg("must be superuser to create FOR ALL TABLES publication")));
170 
171 	rel = table_open(PublicationRelationId, RowExclusiveLock);
172 
173 	/* Check if name is used */
174 	puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
175 							 CStringGetDatum(stmt->pubname));
176 	if (OidIsValid(puboid))
177 	{
178 		ereport(ERROR,
179 				(errcode(ERRCODE_DUPLICATE_OBJECT),
180 				 errmsg("publication \"%s\" already exists",
181 						stmt->pubname)));
182 	}
183 
184 	/* Form a tuple. */
185 	memset(values, 0, sizeof(values));
186 	memset(nulls, false, sizeof(nulls));
187 
188 	values[Anum_pg_publication_pubname - 1] =
189 		DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
190 	values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
191 
192 	parse_publication_options(stmt->options,
193 							  &publish_given, &pubactions,
194 							  &publish_via_partition_root_given,
195 							  &publish_via_partition_root);
196 
197 	puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
198 								Anum_pg_publication_oid);
199 	values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
200 	values[Anum_pg_publication_puballtables - 1] =
201 		BoolGetDatum(stmt->for_all_tables);
202 	values[Anum_pg_publication_pubinsert - 1] =
203 		BoolGetDatum(pubactions.pubinsert);
204 	values[Anum_pg_publication_pubupdate - 1] =
205 		BoolGetDatum(pubactions.pubupdate);
206 	values[Anum_pg_publication_pubdelete - 1] =
207 		BoolGetDatum(pubactions.pubdelete);
208 	values[Anum_pg_publication_pubtruncate - 1] =
209 		BoolGetDatum(pubactions.pubtruncate);
210 	values[Anum_pg_publication_pubviaroot - 1] =
211 		BoolGetDatum(publish_via_partition_root);
212 
213 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
214 
215 	/* Insert tuple into catalog. */
216 	CatalogTupleInsert(rel, tup);
217 	heap_freetuple(tup);
218 
219 	recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
220 
221 	ObjectAddressSet(myself, PublicationRelationId, puboid);
222 
223 	/* Make the changes visible. */
224 	CommandCounterIncrement();
225 
226 	if (stmt->tables)
227 	{
228 		List	   *rels;
229 
230 		Assert(list_length(stmt->tables) > 0);
231 
232 		rels = OpenTableList(stmt->tables);
233 		PublicationAddTables(puboid, rels, true, NULL);
234 		CloseTableList(rels);
235 	}
236 	else if (stmt->for_all_tables)
237 	{
238 		/* Invalidate relcache so that publication info is rebuilt. */
239 		CacheInvalidateRelcacheAll();
240 	}
241 
242 	table_close(rel, RowExclusiveLock);
243 
244 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
245 
246 	if (wal_level != WAL_LEVEL_LOGICAL)
247 	{
248 		ereport(WARNING,
249 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
250 				 errmsg("wal_level is insufficient to publish logical changes"),
251 				 errhint("Set wal_level to logical before creating subscriptions.")));
252 	}
253 
254 	return myself;
255 }
256 
257 /*
258  * Change options of a publication.
259  */
260 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)261 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
262 						HeapTuple tup)
263 {
264 	bool		nulls[Natts_pg_publication];
265 	bool		replaces[Natts_pg_publication];
266 	Datum		values[Natts_pg_publication];
267 	bool		publish_given;
268 	PublicationActions pubactions;
269 	bool		publish_via_partition_root_given;
270 	bool		publish_via_partition_root;
271 	ObjectAddress obj;
272 	Form_pg_publication pubform;
273 
274 	parse_publication_options(stmt->options,
275 							  &publish_given, &pubactions,
276 							  &publish_via_partition_root_given,
277 							  &publish_via_partition_root);
278 
279 	/* Everything ok, form a new tuple. */
280 	memset(values, 0, sizeof(values));
281 	memset(nulls, false, sizeof(nulls));
282 	memset(replaces, false, sizeof(replaces));
283 
284 	if (publish_given)
285 	{
286 		values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
287 		replaces[Anum_pg_publication_pubinsert - 1] = true;
288 
289 		values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
290 		replaces[Anum_pg_publication_pubupdate - 1] = true;
291 
292 		values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
293 		replaces[Anum_pg_publication_pubdelete - 1] = true;
294 
295 		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
296 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
297 	}
298 
299 	if (publish_via_partition_root_given)
300 	{
301 		values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
302 		replaces[Anum_pg_publication_pubviaroot - 1] = true;
303 	}
304 
305 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
306 							replaces);
307 
308 	/* Update the catalog. */
309 	CatalogTupleUpdate(rel, &tup->t_self, tup);
310 
311 	CommandCounterIncrement();
312 
313 	pubform = (Form_pg_publication) GETSTRUCT(tup);
314 
315 	/* Invalidate the relcache. */
316 	if (pubform->puballtables)
317 	{
318 		CacheInvalidateRelcacheAll();
319 	}
320 	else
321 	{
322 		/*
323 		 * For any partitioned tables contained in the publication, we must
324 		 * invalidate all partitions contained in the respective partition
325 		 * trees, not just those explicitly mentioned in the publication.
326 		 */
327 		List	   *relids = GetPublicationRelations(pubform->oid,
328 													 PUBLICATION_PART_ALL);
329 
330 		InvalidatePublicationRels(relids);
331 	}
332 
333 	ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
334 	EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
335 									 (Node *) stmt);
336 
337 	InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
338 }
339 
340 /*
341  * Invalidate the relations.
342  */
343 void
InvalidatePublicationRels(List * relids)344 InvalidatePublicationRels(List *relids)
345 {
346 	/*
347 	 * We don't want to send too many individual messages, at some point it's
348 	 * cheaper to just reset whole relcache.
349 	 */
350 	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
351 	{
352 		ListCell   *lc;
353 
354 		foreach(lc, relids)
355 			CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
356 	}
357 	else
358 		CacheInvalidateRelcacheAll();
359 }
360 
361 /*
362  * Add or remove table to/from publication.
363  */
364 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)365 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
366 					   HeapTuple tup)
367 {
368 	List	   *rels = NIL;
369 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
370 	Oid			pubid = pubform->oid;
371 
372 	/* Check that user is allowed to manipulate the publication tables. */
373 	if (pubform->puballtables)
374 		ereport(ERROR,
375 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
377 						NameStr(pubform->pubname)),
378 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
379 
380 	Assert(list_length(stmt->tables) > 0);
381 
382 	rels = OpenTableList(stmt->tables);
383 
384 	if (stmt->tableAction == DEFELEM_ADD)
385 		PublicationAddTables(pubid, rels, false, stmt);
386 	else if (stmt->tableAction == DEFELEM_DROP)
387 		PublicationDropTables(pubid, rels, false);
388 	else						/* DEFELEM_SET */
389 	{
390 		List	   *oldrelids = GetPublicationRelations(pubid,
391 														PUBLICATION_PART_ROOT);
392 		List	   *delrels = NIL;
393 		ListCell   *oldlc;
394 
395 		/* Calculate which relations to drop. */
396 		foreach(oldlc, oldrelids)
397 		{
398 			Oid			oldrelid = lfirst_oid(oldlc);
399 			ListCell   *newlc;
400 			bool		found = false;
401 
402 			foreach(newlc, rels)
403 			{
404 				Relation	newrel = (Relation) lfirst(newlc);
405 
406 				if (RelationGetRelid(newrel) == oldrelid)
407 				{
408 					found = true;
409 					break;
410 				}
411 			}
412 
413 			if (!found)
414 			{
415 				Relation	oldrel = table_open(oldrelid,
416 												ShareUpdateExclusiveLock);
417 
418 				delrels = lappend(delrels, oldrel);
419 			}
420 		}
421 
422 		/* And drop them. */
423 		PublicationDropTables(pubid, delrels, true);
424 
425 		/*
426 		 * Don't bother calculating the difference for adding, we'll catch and
427 		 * skip existing ones when doing catalog update.
428 		 */
429 		PublicationAddTables(pubid, rels, true, stmt);
430 
431 		CloseTableList(delrels);
432 	}
433 
434 	CloseTableList(rels);
435 }
436 
437 /*
438  * Alter the existing publication.
439  *
440  * This is dispatcher function for AlterPublicationOptions and
441  * AlterPublicationTables.
442  */
443 void
AlterPublication(AlterPublicationStmt * stmt)444 AlterPublication(AlterPublicationStmt *stmt)
445 {
446 	Relation	rel;
447 	HeapTuple	tup;
448 	Form_pg_publication pubform;
449 
450 	rel = table_open(PublicationRelationId, RowExclusiveLock);
451 
452 	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
453 							  CStringGetDatum(stmt->pubname));
454 
455 	if (!HeapTupleIsValid(tup))
456 		ereport(ERROR,
457 				(errcode(ERRCODE_UNDEFINED_OBJECT),
458 				 errmsg("publication \"%s\" does not exist",
459 						stmt->pubname)));
460 
461 	pubform = (Form_pg_publication) GETSTRUCT(tup);
462 
463 	/* must be owner */
464 	if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
465 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
466 					   stmt->pubname);
467 
468 	if (stmt->options)
469 		AlterPublicationOptions(stmt, rel, tup);
470 	else
471 		AlterPublicationTables(stmt, rel, tup);
472 
473 	/* Cleanup. */
474 	heap_freetuple(tup);
475 	table_close(rel, RowExclusiveLock);
476 }
477 
478 /*
479  * Remove relation from publication by mapping OID.
480  */
481 void
RemovePublicationRelById(Oid proid)482 RemovePublicationRelById(Oid proid)
483 {
484 	Relation	rel;
485 	HeapTuple	tup;
486 	Form_pg_publication_rel pubrel;
487 	List	   *relids = NIL;
488 
489 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
490 
491 	tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
492 
493 	if (!HeapTupleIsValid(tup))
494 		elog(ERROR, "cache lookup failed for publication table %u",
495 			 proid);
496 
497 	pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
498 
499 	/*
500 	 * Invalidate relcache so that publication info is rebuilt.
501 	 *
502 	 * For the partitioned tables, we must invalidate all partitions contained
503 	 * in the respective partition hierarchies, not just the one explicitly
504 	 * mentioned in the publication. This is required because we implicitly
505 	 * publish the child tables when the parent table is published.
506 	 */
507 	relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
508 											pubrel->prrelid);
509 
510 	InvalidatePublicationRels(relids);
511 
512 	CatalogTupleDelete(rel, &tup->t_self);
513 
514 	ReleaseSysCache(tup);
515 
516 	table_close(rel, RowExclusiveLock);
517 }
518 
519 /*
520  * Remove the publication by mapping OID.
521  */
522 void
RemovePublicationById(Oid pubid)523 RemovePublicationById(Oid pubid)
524 {
525 	Relation	rel;
526 	HeapTuple	tup;
527 	Form_pg_publication pubform;
528 
529 	rel = table_open(PublicationRelationId, RowExclusiveLock);
530 
531 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
532 	if (!HeapTupleIsValid(tup))
533 		elog(ERROR, "cache lookup failed for publication %u", pubid);
534 
535 	pubform = (Form_pg_publication) GETSTRUCT(tup);
536 
537 	/* Invalidate relcache so that publication info is rebuilt. */
538 	if (pubform->puballtables)
539 		CacheInvalidateRelcacheAll();
540 
541 	CatalogTupleDelete(rel, &tup->t_self);
542 
543 	ReleaseSysCache(tup);
544 
545 	table_close(rel, RowExclusiveLock);
546 }
547 
548 /*
549  * Open relations specified by a RangeVar list.
550  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
551  * add them to a publication.
552  */
553 static List *
OpenTableList(List * tables)554 OpenTableList(List *tables)
555 {
556 	List	   *relids = NIL;
557 	List	   *rels = NIL;
558 	ListCell   *lc;
559 
560 	/*
561 	 * Open, share-lock, and check all the explicitly-specified relations
562 	 */
563 	foreach(lc, tables)
564 	{
565 		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
566 		bool		recurse = rv->inh;
567 		Relation	rel;
568 		Oid			myrelid;
569 
570 		/* Allow query cancel in case this takes a long time */
571 		CHECK_FOR_INTERRUPTS();
572 
573 		rel = table_openrv(rv, ShareUpdateExclusiveLock);
574 		myrelid = RelationGetRelid(rel);
575 
576 		/*
577 		 * Filter out duplicates if user specifies "foo, foo".
578 		 *
579 		 * Note that this algorithm is known to not be very efficient (O(N^2))
580 		 * but given that it only works on list of tables given to us by user
581 		 * it's deemed acceptable.
582 		 */
583 		if (list_member_oid(relids, myrelid))
584 		{
585 			table_close(rel, ShareUpdateExclusiveLock);
586 			continue;
587 		}
588 
589 		rels = lappend(rels, rel);
590 		relids = lappend_oid(relids, myrelid);
591 
592 		/*
593 		 * Add children of this rel, if requested, so that they too are added
594 		 * to the publication.  A partitioned table can't have any inheritance
595 		 * children other than its partitions, which need not be explicitly
596 		 * added to the publication.
597 		 */
598 		if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
599 		{
600 			List	   *children;
601 			ListCell   *child;
602 
603 			children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
604 										   NULL);
605 
606 			foreach(child, children)
607 			{
608 				Oid			childrelid = lfirst_oid(child);
609 
610 				/* Allow query cancel in case this takes a long time */
611 				CHECK_FOR_INTERRUPTS();
612 
613 				/*
614 				 * Skip duplicates if user specified both parent and child
615 				 * tables.
616 				 */
617 				if (list_member_oid(relids, childrelid))
618 					continue;
619 
620 				/* find_all_inheritors already got lock */
621 				rel = table_open(childrelid, NoLock);
622 				rels = lappend(rels, rel);
623 				relids = lappend_oid(relids, childrelid);
624 			}
625 		}
626 	}
627 
628 	list_free(relids);
629 
630 	return rels;
631 }
632 
633 /*
634  * Close all relations in the list.
635  */
636 static void
CloseTableList(List * rels)637 CloseTableList(List *rels)
638 {
639 	ListCell   *lc;
640 
641 	foreach(lc, rels)
642 	{
643 		Relation	rel = (Relation) lfirst(lc);
644 
645 		table_close(rel, NoLock);
646 	}
647 }
648 
649 /*
650  * Add listed tables to the publication.
651  */
652 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)653 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
654 					 AlterPublicationStmt *stmt)
655 {
656 	ListCell   *lc;
657 
658 	Assert(!stmt || !stmt->for_all_tables);
659 
660 	foreach(lc, rels)
661 	{
662 		Relation	rel = (Relation) lfirst(lc);
663 		ObjectAddress obj;
664 
665 		/* Must be owner of the table or superuser. */
666 		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
667 			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
668 						   RelationGetRelationName(rel));
669 
670 		obj = publication_add_relation(pubid, rel, if_not_exists);
671 		if (stmt)
672 		{
673 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
674 											 (Node *) stmt);
675 
676 			InvokeObjectPostCreateHook(PublicationRelRelationId,
677 									   obj.objectId, 0);
678 		}
679 	}
680 }
681 
682 /*
683  * Remove listed tables from the publication.
684  */
685 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)686 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
687 {
688 	ObjectAddress obj;
689 	ListCell   *lc;
690 	Oid			prid;
691 
692 	foreach(lc, rels)
693 	{
694 		Relation	rel = (Relation) lfirst(lc);
695 		Oid			relid = RelationGetRelid(rel);
696 
697 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
698 							   ObjectIdGetDatum(relid),
699 							   ObjectIdGetDatum(pubid));
700 		if (!OidIsValid(prid))
701 		{
702 			if (missing_ok)
703 				continue;
704 
705 			ereport(ERROR,
706 					(errcode(ERRCODE_UNDEFINED_OBJECT),
707 					 errmsg("relation \"%s\" is not part of the publication",
708 							RelationGetRelationName(rel))));
709 		}
710 
711 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
712 		performDeletion(&obj, DROP_CASCADE, 0);
713 	}
714 }
715 
716 /*
717  * Internal workhorse for changing a publication owner
718  */
719 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)720 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
721 {
722 	Form_pg_publication form;
723 
724 	form = (Form_pg_publication) GETSTRUCT(tup);
725 
726 	if (form->pubowner == newOwnerId)
727 		return;
728 
729 	if (!superuser())
730 	{
731 		AclResult	aclresult;
732 
733 		/* Must be owner */
734 		if (!pg_publication_ownercheck(form->oid, GetUserId()))
735 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
736 						   NameStr(form->pubname));
737 
738 		/* Must be able to become new owner */
739 		check_is_member_of_role(GetUserId(), newOwnerId);
740 
741 		/* New owner must have CREATE privilege on database */
742 		aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
743 		if (aclresult != ACLCHECK_OK)
744 			aclcheck_error(aclresult, OBJECT_DATABASE,
745 						   get_database_name(MyDatabaseId));
746 
747 		if (form->puballtables && !superuser_arg(newOwnerId))
748 			ereport(ERROR,
749 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
750 					 errmsg("permission denied to change owner of publication \"%s\"",
751 							NameStr(form->pubname)),
752 					 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
753 	}
754 
755 	form->pubowner = newOwnerId;
756 	CatalogTupleUpdate(rel, &tup->t_self, tup);
757 
758 	/* Update owner dependency reference */
759 	changeDependencyOnOwner(PublicationRelationId,
760 							form->oid,
761 							newOwnerId);
762 
763 	InvokeObjectPostAlterHook(PublicationRelationId,
764 							  form->oid, 0);
765 }
766 
767 /*
768  * Change publication owner -- by name
769  */
770 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)771 AlterPublicationOwner(const char *name, Oid newOwnerId)
772 {
773 	Oid			subid;
774 	HeapTuple	tup;
775 	Relation	rel;
776 	ObjectAddress address;
777 	Form_pg_publication pubform;
778 
779 	rel = table_open(PublicationRelationId, RowExclusiveLock);
780 
781 	tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
782 
783 	if (!HeapTupleIsValid(tup))
784 		ereport(ERROR,
785 				(errcode(ERRCODE_UNDEFINED_OBJECT),
786 				 errmsg("publication \"%s\" does not exist", name)));
787 
788 	pubform = (Form_pg_publication) GETSTRUCT(tup);
789 	subid = pubform->oid;
790 
791 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
792 
793 	ObjectAddressSet(address, PublicationRelationId, subid);
794 
795 	heap_freetuple(tup);
796 
797 	table_close(rel, RowExclusiveLock);
798 
799 	return address;
800 }
801 
802 /*
803  * Change publication owner -- by OID
804  */
805 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)806 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
807 {
808 	HeapTuple	tup;
809 	Relation	rel;
810 
811 	rel = table_open(PublicationRelationId, RowExclusiveLock);
812 
813 	tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
814 
815 	if (!HeapTupleIsValid(tup))
816 		ereport(ERROR,
817 				(errcode(ERRCODE_UNDEFINED_OBJECT),
818 				 errmsg("publication with OID %u does not exist", subid)));
819 
820 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
821 
822 	heap_freetuple(tup);
823 
824 	table_close(rel, RowExclusiveLock);
825 }
826