1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  *		publication manipulation
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25 
26 #include "catalog/catalog.h"
27 #include "catalog/indexing.h"
28 #include "catalog/namespace.h"
29 #include "catalog/objectaccess.h"
30 #include "catalog/objectaddress.h"
31 #include "catalog/pg_inherits.h"
32 #include "catalog/pg_type.h"
33 #include "catalog/pg_publication.h"
34 #include "catalog/pg_publication_rel.h"
35 
36 #include "commands/dbcommands.h"
37 #include "commands/defrem.h"
38 #include "commands/event_trigger.h"
39 #include "commands/publicationcmds.h"
40 
41 #include "utils/array.h"
42 #include "utils/builtins.h"
43 #include "utils/catcache.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/rel.h"
48 #include "utils/syscache.h"
49 #include "utils/varlena.h"
50 
51 /* Same as MAXNUMMESSAGES in sinvaladt.c */
52 #define MAX_RELCACHE_INVAL_MSGS 4096
53 
54 static List *OpenTableList(List *tables);
55 static void CloseTableList(List *rels);
56 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
57 					 AlterPublicationStmt *stmt);
58 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
59 
60 static void
parse_publication_options(List * options,bool * publish_given,bool * publish_insert,bool * publish_update,bool * publish_delete,bool * publish_truncate)61 parse_publication_options(List *options,
62 						  bool *publish_given,
63 						  bool *publish_insert,
64 						  bool *publish_update,
65 						  bool *publish_delete,
66 						  bool *publish_truncate)
67 {
68 	ListCell   *lc;
69 
70 	*publish_given = false;
71 
72 	/* Defaults are true */
73 	*publish_insert = true;
74 	*publish_update = true;
75 	*publish_delete = true;
76 	*publish_truncate = true;
77 
78 	/* Parse options */
79 	foreach(lc, options)
80 	{
81 		DefElem    *defel = (DefElem *) lfirst(lc);
82 
83 		if (strcmp(defel->defname, "publish") == 0)
84 		{
85 			char	   *publish;
86 			List	   *publish_list;
87 			ListCell   *lc;
88 
89 			if (*publish_given)
90 				ereport(ERROR,
91 						(errcode(ERRCODE_SYNTAX_ERROR),
92 						 errmsg("conflicting or redundant options")));
93 
94 			/*
95 			 * If publish option was given only the explicitly listed actions
96 			 * should be published.
97 			 */
98 			*publish_insert = false;
99 			*publish_update = false;
100 			*publish_delete = false;
101 			*publish_truncate = false;
102 
103 			*publish_given = true;
104 			publish = defGetString(defel);
105 
106 			if (!SplitIdentifierString(publish, ',', &publish_list))
107 				ereport(ERROR,
108 						(errcode(ERRCODE_SYNTAX_ERROR),
109 						 errmsg("invalid list syntax for \"publish\" option")));
110 
111 			/* Process the option list. */
112 			foreach(lc, publish_list)
113 			{
114 				char	   *publish_opt = (char *) lfirst(lc);
115 
116 				if (strcmp(publish_opt, "insert") == 0)
117 					*publish_insert = true;
118 				else if (strcmp(publish_opt, "update") == 0)
119 					*publish_update = true;
120 				else if (strcmp(publish_opt, "delete") == 0)
121 					*publish_delete = true;
122 				else if (strcmp(publish_opt, "truncate") == 0)
123 					*publish_truncate = true;
124 				else
125 					ereport(ERROR,
126 							(errcode(ERRCODE_SYNTAX_ERROR),
127 							 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
128 			}
129 		}
130 		else
131 			ereport(ERROR,
132 					(errcode(ERRCODE_SYNTAX_ERROR),
133 					 errmsg("unrecognized publication parameter: %s", defel->defname)));
134 	}
135 }
136 
137 /*
138  * Create new publication.
139  */
140 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)141 CreatePublication(CreatePublicationStmt *stmt)
142 {
143 	Relation	rel;
144 	ObjectAddress myself;
145 	Oid			puboid;
146 	bool		nulls[Natts_pg_publication];
147 	Datum		values[Natts_pg_publication];
148 	HeapTuple	tup;
149 	bool		publish_given;
150 	bool		publish_insert;
151 	bool		publish_update;
152 	bool		publish_delete;
153 	bool		publish_truncate;
154 	AclResult	aclresult;
155 
156 	/* must have CREATE privilege on database */
157 	aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
158 	if (aclresult != ACLCHECK_OK)
159 		aclcheck_error(aclresult, OBJECT_DATABASE,
160 					   get_database_name(MyDatabaseId));
161 
162 	/* FOR ALL TABLES requires superuser */
163 	if (stmt->for_all_tables && !superuser())
164 		ereport(ERROR,
165 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
166 				 (errmsg("must be superuser to create FOR ALL TABLES publication"))));
167 
168 	rel = heap_open(PublicationRelationId, RowExclusiveLock);
169 
170 	/* Check if name is used */
171 	puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
172 	if (OidIsValid(puboid))
173 	{
174 		ereport(ERROR,
175 				(errcode(ERRCODE_DUPLICATE_OBJECT),
176 				 errmsg("publication \"%s\" already exists",
177 						stmt->pubname)));
178 	}
179 
180 	/* Form a tuple. */
181 	memset(values, 0, sizeof(values));
182 	memset(nulls, false, sizeof(nulls));
183 
184 	values[Anum_pg_publication_pubname - 1] =
185 		DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
186 	values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
187 
188 	parse_publication_options(stmt->options,
189 							  &publish_given, &publish_insert,
190 							  &publish_update, &publish_delete,
191 							  &publish_truncate);
192 
193 	values[Anum_pg_publication_puballtables - 1] =
194 		BoolGetDatum(stmt->for_all_tables);
195 	values[Anum_pg_publication_pubinsert - 1] =
196 		BoolGetDatum(publish_insert);
197 	values[Anum_pg_publication_pubupdate - 1] =
198 		BoolGetDatum(publish_update);
199 	values[Anum_pg_publication_pubdelete - 1] =
200 		BoolGetDatum(publish_delete);
201 	values[Anum_pg_publication_pubtruncate - 1] =
202 		BoolGetDatum(publish_truncate);
203 
204 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
205 
206 	/* Insert tuple into catalog. */
207 	puboid = CatalogTupleInsert(rel, tup);
208 	heap_freetuple(tup);
209 
210 	recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
211 
212 	ObjectAddressSet(myself, PublicationRelationId, puboid);
213 
214 	/* Make the changes visible. */
215 	CommandCounterIncrement();
216 
217 	if (stmt->tables)
218 	{
219 		List	   *rels;
220 
221 		Assert(list_length(stmt->tables) > 0);
222 
223 		rels = OpenTableList(stmt->tables);
224 		PublicationAddTables(puboid, rels, true, NULL);
225 		CloseTableList(rels);
226 	}
227 	else if (stmt->for_all_tables)
228 	{
229 		/* Invalidate relcache so that publication info is rebuilt. */
230 		CacheInvalidateRelcacheAll();
231 	}
232 
233 	heap_close(rel, RowExclusiveLock);
234 
235 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
236 
237 	return myself;
238 }
239 
240 /*
241  * Change options of a publication.
242  */
243 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)244 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
245 						HeapTuple tup)
246 {
247 	bool		nulls[Natts_pg_publication];
248 	bool		replaces[Natts_pg_publication];
249 	Datum		values[Natts_pg_publication];
250 	bool		publish_given;
251 	bool		publish_insert;
252 	bool		publish_update;
253 	bool		publish_delete;
254 	bool		publish_truncate;
255 	ObjectAddress obj;
256 
257 	parse_publication_options(stmt->options,
258 							  &publish_given, &publish_insert,
259 							  &publish_update, &publish_delete,
260 							  &publish_truncate);
261 
262 	/* Everything ok, form a new tuple. */
263 	memset(values, 0, sizeof(values));
264 	memset(nulls, false, sizeof(nulls));
265 	memset(replaces, false, sizeof(replaces));
266 
267 	if (publish_given)
268 	{
269 		values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
270 		replaces[Anum_pg_publication_pubinsert - 1] = true;
271 
272 		values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
273 		replaces[Anum_pg_publication_pubupdate - 1] = true;
274 
275 		values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
276 		replaces[Anum_pg_publication_pubdelete - 1] = true;
277 
278 		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
279 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
280 	}
281 
282 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
283 							replaces);
284 
285 	/* Update the catalog. */
286 	CatalogTupleUpdate(rel, &tup->t_self, tup);
287 
288 	CommandCounterIncrement();
289 
290 	/* Invalidate the relcache. */
291 	if (((Form_pg_publication) GETSTRUCT(tup))->puballtables)
292 	{
293 		CacheInvalidateRelcacheAll();
294 	}
295 	else
296 	{
297 		List	   *relids = GetPublicationRelations(HeapTupleGetOid(tup));
298 
299 		/*
300 		 * We don't want to send too many individual messages, at some point
301 		 * it's cheaper to just reset whole relcache.
302 		 */
303 		if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
304 		{
305 			ListCell   *lc;
306 
307 			foreach(lc, relids)
308 			{
309 				Oid			relid = lfirst_oid(lc);
310 
311 				CacheInvalidateRelcacheByRelid(relid);
312 			}
313 		}
314 		else
315 			CacheInvalidateRelcacheAll();
316 	}
317 
318 	ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup));
319 	EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
320 									 (Node *) stmt);
321 
322 	InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0);
323 }
324 
325 /*
326  * Add or remove table to/from publication.
327  */
328 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)329 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
330 					   HeapTuple tup)
331 {
332 	Oid			pubid = HeapTupleGetOid(tup);
333 	List	   *rels = NIL;
334 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
335 
336 	/* Check that user is allowed to manipulate the publication tables. */
337 	if (pubform->puballtables)
338 		ereport(ERROR,
339 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
340 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
341 						NameStr(pubform->pubname)),
342 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
343 
344 	Assert(list_length(stmt->tables) > 0);
345 
346 	rels = OpenTableList(stmt->tables);
347 
348 	if (stmt->tableAction == DEFELEM_ADD)
349 		PublicationAddTables(pubid, rels, false, stmt);
350 	else if (stmt->tableAction == DEFELEM_DROP)
351 		PublicationDropTables(pubid, rels, false);
352 	else						/* DEFELEM_SET */
353 	{
354 		List	   *oldrelids = GetPublicationRelations(pubid);
355 		List	   *delrels = NIL;
356 		ListCell   *oldlc;
357 
358 		/* Calculate which relations to drop. */
359 		foreach(oldlc, oldrelids)
360 		{
361 			Oid			oldrelid = lfirst_oid(oldlc);
362 			ListCell   *newlc;
363 			bool		found = false;
364 
365 			foreach(newlc, rels)
366 			{
367 				Relation	newrel = (Relation) lfirst(newlc);
368 
369 				if (RelationGetRelid(newrel) == oldrelid)
370 				{
371 					found = true;
372 					break;
373 				}
374 			}
375 
376 			if (!found)
377 			{
378 				Relation	oldrel = heap_open(oldrelid,
379 											   ShareUpdateExclusiveLock);
380 
381 				delrels = lappend(delrels, oldrel);
382 			}
383 		}
384 
385 		/* And drop them. */
386 		PublicationDropTables(pubid, delrels, true);
387 
388 		/*
389 		 * Don't bother calculating the difference for adding, we'll catch and
390 		 * skip existing ones when doing catalog update.
391 		 */
392 		PublicationAddTables(pubid, rels, true, stmt);
393 
394 		CloseTableList(delrels);
395 	}
396 
397 	CloseTableList(rels);
398 }
399 
400 /*
401  * Alter the existing publication.
402  *
403  * This is dispatcher function for AlterPublicationOptions and
404  * AlterPublicationTables.
405  */
406 void
AlterPublication(AlterPublicationStmt * stmt)407 AlterPublication(AlterPublicationStmt *stmt)
408 {
409 	Relation	rel;
410 	HeapTuple	tup;
411 
412 	rel = heap_open(PublicationRelationId, RowExclusiveLock);
413 
414 	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
415 							  CStringGetDatum(stmt->pubname));
416 
417 	if (!HeapTupleIsValid(tup))
418 		ereport(ERROR,
419 				(errcode(ERRCODE_UNDEFINED_OBJECT),
420 				 errmsg("publication \"%s\" does not exist",
421 						stmt->pubname)));
422 
423 	/* must be owner */
424 	if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
425 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
426 					   stmt->pubname);
427 
428 	if (stmt->options)
429 		AlterPublicationOptions(stmt, rel, tup);
430 	else
431 		AlterPublicationTables(stmt, rel, tup);
432 
433 	/* Cleanup. */
434 	heap_freetuple(tup);
435 	heap_close(rel, RowExclusiveLock);
436 }
437 
438 /*
439  * Drop publication by OID
440  */
441 void
RemovePublicationById(Oid pubid)442 RemovePublicationById(Oid pubid)
443 {
444 	Relation	rel;
445 	HeapTuple	tup;
446 	Form_pg_publication pubform;
447 
448 	rel = heap_open(PublicationRelationId, RowExclusiveLock);
449 
450 	tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
451 
452 	if (!HeapTupleIsValid(tup))
453 		elog(ERROR, "cache lookup failed for publication %u", pubid);
454 
455 	pubform = (Form_pg_publication) GETSTRUCT(tup);
456 
457 	/* Invalidate relcache so that publication info is rebuilt. */
458 	if (pubform->puballtables)
459 		CacheInvalidateRelcacheAll();
460 
461 	CatalogTupleDelete(rel, &tup->t_self);
462 
463 	ReleaseSysCache(tup);
464 
465 	heap_close(rel, RowExclusiveLock);
466 }
467 
468 /*
469  * Remove relation from publication by mapping OID.
470  */
471 void
RemovePublicationRelById(Oid proid)472 RemovePublicationRelById(Oid proid)
473 {
474 	Relation	rel;
475 	HeapTuple	tup;
476 	Form_pg_publication_rel pubrel;
477 
478 	rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
479 
480 	tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
481 
482 	if (!HeapTupleIsValid(tup))
483 		elog(ERROR, "cache lookup failed for publication table %u",
484 			 proid);
485 
486 	pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
487 
488 	/* Invalidate relcache so that publication info is rebuilt. */
489 	CacheInvalidateRelcacheByRelid(pubrel->prrelid);
490 
491 	CatalogTupleDelete(rel, &tup->t_self);
492 
493 	ReleaseSysCache(tup);
494 
495 	heap_close(rel, RowExclusiveLock);
496 }
497 
498 /*
499  * Open relations specified by a RangeVar list.
500  * The returned tables are locked in ShareUpdateExclusiveLock mode.
501  */
502 static List *
OpenTableList(List * tables)503 OpenTableList(List *tables)
504 {
505 	List	   *relids = NIL;
506 	List	   *rels = NIL;
507 	ListCell   *lc;
508 
509 	/*
510 	 * Open, share-lock, and check all the explicitly-specified relations
511 	 */
512 	foreach(lc, tables)
513 	{
514 		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
515 		bool		recurse = rv->inh;
516 		Relation	rel;
517 		Oid			myrelid;
518 
519 		/* Allow query cancel in case this takes a long time */
520 		CHECK_FOR_INTERRUPTS();
521 
522 		rel = heap_openrv(rv, ShareUpdateExclusiveLock);
523 		myrelid = RelationGetRelid(rel);
524 
525 		/*
526 		 * Filter out duplicates if user specifies "foo, foo".
527 		 *
528 		 * Note that this algorithm is known to not be very efficient (O(N^2))
529 		 * but given that it only works on list of tables given to us by user
530 		 * it's deemed acceptable.
531 		 */
532 		if (list_member_oid(relids, myrelid))
533 		{
534 			heap_close(rel, ShareUpdateExclusiveLock);
535 			continue;
536 		}
537 
538 		rels = lappend(rels, rel);
539 		relids = lappend_oid(relids, myrelid);
540 
541 		/* Add children of this rel, if requested */
542 		if (recurse)
543 		{
544 			List	   *children;
545 			ListCell   *child;
546 
547 			children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
548 										   NULL);
549 
550 			foreach(child, children)
551 			{
552 				Oid			childrelid = lfirst_oid(child);
553 
554 				/* Allow query cancel in case this takes a long time */
555 				CHECK_FOR_INTERRUPTS();
556 
557 				/*
558 				 * Skip duplicates if user specified both parent and child
559 				 * tables.
560 				 */
561 				if (list_member_oid(relids, childrelid))
562 					continue;
563 
564 				/* find_all_inheritors already got lock */
565 				rel = heap_open(childrelid, NoLock);
566 				rels = lappend(rels, rel);
567 				relids = lappend_oid(relids, childrelid);
568 			}
569 		}
570 	}
571 
572 	list_free(relids);
573 
574 	return rels;
575 }
576 
577 /*
578  * Close all relations in the list.
579  */
580 static void
CloseTableList(List * rels)581 CloseTableList(List *rels)
582 {
583 	ListCell   *lc;
584 
585 	foreach(lc, rels)
586 	{
587 		Relation	rel = (Relation) lfirst(lc);
588 
589 		heap_close(rel, NoLock);
590 	}
591 }
592 
593 /*
594  * Add listed tables to the publication.
595  */
596 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)597 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
598 					 AlterPublicationStmt *stmt)
599 {
600 	ListCell   *lc;
601 
602 	Assert(!stmt || !stmt->for_all_tables);
603 
604 	foreach(lc, rels)
605 	{
606 		Relation	rel = (Relation) lfirst(lc);
607 		ObjectAddress obj;
608 
609 		/* Must be owner of the table or superuser. */
610 		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
611 			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
612 						   RelationGetRelationName(rel));
613 
614 		obj = publication_add_relation(pubid, rel, if_not_exists);
615 		if (stmt)
616 		{
617 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
618 											 (Node *) stmt);
619 
620 			InvokeObjectPostCreateHook(PublicationRelRelationId,
621 									   obj.objectId, 0);
622 		}
623 	}
624 }
625 
626 /*
627  * Remove listed tables from the publication.
628  */
629 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)630 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
631 {
632 	ObjectAddress obj;
633 	ListCell   *lc;
634 	Oid			prid;
635 
636 	foreach(lc, rels)
637 	{
638 		Relation	rel = (Relation) lfirst(lc);
639 		Oid			relid = RelationGetRelid(rel);
640 
641 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
642 							   ObjectIdGetDatum(pubid));
643 		if (!OidIsValid(prid))
644 		{
645 			if (missing_ok)
646 				continue;
647 
648 			ereport(ERROR,
649 					(errcode(ERRCODE_UNDEFINED_OBJECT),
650 					 errmsg("relation \"%s\" is not part of the publication",
651 							RelationGetRelationName(rel))));
652 		}
653 
654 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
655 		performDeletion(&obj, DROP_CASCADE, 0);
656 	}
657 }
658 
659 /*
660  * Internal workhorse for changing a publication owner
661  */
662 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)663 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
664 {
665 	Form_pg_publication form;
666 
667 	form = (Form_pg_publication) GETSTRUCT(tup);
668 
669 	if (form->pubowner == newOwnerId)
670 		return;
671 
672 	if (!superuser())
673 	{
674 		AclResult	aclresult;
675 
676 		/* Must be owner */
677 		if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
678 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
679 						   NameStr(form->pubname));
680 
681 		/* Must be able to become new owner */
682 		check_is_member_of_role(GetUserId(), newOwnerId);
683 
684 		/* New owner must have CREATE privilege on database */
685 		aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
686 		if (aclresult != ACLCHECK_OK)
687 			aclcheck_error(aclresult, OBJECT_DATABASE,
688 						   get_database_name(MyDatabaseId));
689 
690 		if (form->puballtables && !superuser_arg(newOwnerId))
691 			ereport(ERROR,
692 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
693 					 errmsg("permission denied to change owner of publication \"%s\"",
694 							NameStr(form->pubname)),
695 					 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
696 	}
697 
698 	form->pubowner = newOwnerId;
699 	CatalogTupleUpdate(rel, &tup->t_self, tup);
700 
701 	/* Update owner dependency reference */
702 	changeDependencyOnOwner(PublicationRelationId,
703 							HeapTupleGetOid(tup),
704 							newOwnerId);
705 
706 	InvokeObjectPostAlterHook(PublicationRelationId,
707 							  HeapTupleGetOid(tup), 0);
708 }
709 
710 /*
711  * Change publication owner -- by name
712  */
713 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)714 AlterPublicationOwner(const char *name, Oid newOwnerId)
715 {
716 	Oid			subid;
717 	HeapTuple	tup;
718 	Relation	rel;
719 	ObjectAddress address;
720 
721 	rel = heap_open(PublicationRelationId, RowExclusiveLock);
722 
723 	tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
724 
725 	if (!HeapTupleIsValid(tup))
726 		ereport(ERROR,
727 				(errcode(ERRCODE_UNDEFINED_OBJECT),
728 				 errmsg("publication \"%s\" does not exist", name)));
729 
730 	subid = HeapTupleGetOid(tup);
731 
732 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
733 
734 	ObjectAddressSet(address, PublicationRelationId, subid);
735 
736 	heap_freetuple(tup);
737 
738 	heap_close(rel, RowExclusiveLock);
739 
740 	return address;
741 }
742 
743 /*
744  * Change publication owner -- by OID
745  */
746 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)747 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
748 {
749 	HeapTuple	tup;
750 	Relation	rel;
751 
752 	rel = heap_open(PublicationRelationId, RowExclusiveLock);
753 
754 	tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
755 
756 	if (!HeapTupleIsValid(tup))
757 		ereport(ERROR,
758 				(errcode(ERRCODE_UNDEFINED_OBJECT),
759 				 errmsg("publication with OID %u does not exist", subid)));
760 
761 	AlterPublicationOwner_internal(rel, tup, newOwnerId);
762 
763 	heap_freetuple(tup);
764 
765 	heap_close(rel, RowExclusiveLock);
766 }
767