1 /*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_publication.h"
29 #include "catalog/pg_publication_rel.h"
30 #include "catalog/pg_type.h"
31 #include "commands/dbcommands.h"
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/acl.h"
38 #include "utils/array.h"
39 #include "utils/builtins.h"
40 #include "utils/catcache.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/lsyscache.h"
44 #include "utils/rel.h"
45 #include "utils/syscache.h"
46 #include "utils/varlena.h"
47
48 static List *OpenTableList(List *tables);
49 static void CloseTableList(List *rels);
50 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
51 AlterPublicationStmt *stmt);
52 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
53
54 static void
parse_publication_options(List * options,bool * publish_given,PublicationActions * pubactions,bool * publish_via_partition_root_given,bool * publish_via_partition_root)55 parse_publication_options(List *options,
56 bool *publish_given,
57 PublicationActions *pubactions,
58 bool *publish_via_partition_root_given,
59 bool *publish_via_partition_root)
60 {
61 ListCell *lc;
62
63 *publish_given = false;
64 *publish_via_partition_root_given = false;
65
66 /* defaults */
67 pubactions->pubinsert = true;
68 pubactions->pubupdate = true;
69 pubactions->pubdelete = true;
70 pubactions->pubtruncate = true;
71 *publish_via_partition_root = false;
72
73 /* Parse options */
74 foreach(lc, options)
75 {
76 DefElem *defel = (DefElem *) lfirst(lc);
77
78 if (strcmp(defel->defname, "publish") == 0)
79 {
80 char *publish;
81 List *publish_list;
82 ListCell *lc;
83
84 if (*publish_given)
85 ereport(ERROR,
86 (errcode(ERRCODE_SYNTAX_ERROR),
87 errmsg("conflicting or redundant options")));
88
89 /*
90 * If publish option was given only the explicitly listed actions
91 * should be published.
92 */
93 pubactions->pubinsert = false;
94 pubactions->pubupdate = false;
95 pubactions->pubdelete = false;
96 pubactions->pubtruncate = false;
97
98 *publish_given = true;
99 publish = defGetString(defel);
100
101 if (!SplitIdentifierString(publish, ',', &publish_list))
102 ereport(ERROR,
103 (errcode(ERRCODE_SYNTAX_ERROR),
104 errmsg("invalid list syntax for \"publish\" option")));
105
106 /* Process the option list. */
107 foreach(lc, publish_list)
108 {
109 char *publish_opt = (char *) lfirst(lc);
110
111 if (strcmp(publish_opt, "insert") == 0)
112 pubactions->pubinsert = true;
113 else if (strcmp(publish_opt, "update") == 0)
114 pubactions->pubupdate = true;
115 else if (strcmp(publish_opt, "delete") == 0)
116 pubactions->pubdelete = true;
117 else if (strcmp(publish_opt, "truncate") == 0)
118 pubactions->pubtruncate = true;
119 else
120 ereport(ERROR,
121 (errcode(ERRCODE_SYNTAX_ERROR),
122 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123 }
124 }
125 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
126 {
127 if (*publish_via_partition_root_given)
128 ereport(ERROR,
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("conflicting or redundant options")));
131 *publish_via_partition_root_given = true;
132 *publish_via_partition_root = defGetBoolean(defel);
133 }
134 else
135 ereport(ERROR,
136 (errcode(ERRCODE_SYNTAX_ERROR),
137 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
138 }
139 }
140
141 /*
142 * Create new publication.
143 */
144 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)145 CreatePublication(CreatePublicationStmt *stmt)
146 {
147 Relation rel;
148 ObjectAddress myself;
149 Oid puboid;
150 bool nulls[Natts_pg_publication];
151 Datum values[Natts_pg_publication];
152 HeapTuple tup;
153 bool publish_given;
154 PublicationActions pubactions;
155 bool publish_via_partition_root_given;
156 bool publish_via_partition_root;
157 AclResult aclresult;
158
159 /* must have CREATE privilege on database */
160 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
161 if (aclresult != ACLCHECK_OK)
162 aclcheck_error(aclresult, OBJECT_DATABASE,
163 get_database_name(MyDatabaseId));
164
165 /* FOR ALL TABLES requires superuser */
166 if (stmt->for_all_tables && !superuser())
167 ereport(ERROR,
168 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
169 errmsg("must be superuser to create FOR ALL TABLES publication")));
170
171 rel = table_open(PublicationRelationId, RowExclusiveLock);
172
173 /* Check if name is used */
174 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
175 CStringGetDatum(stmt->pubname));
176 if (OidIsValid(puboid))
177 {
178 ereport(ERROR,
179 (errcode(ERRCODE_DUPLICATE_OBJECT),
180 errmsg("publication \"%s\" already exists",
181 stmt->pubname)));
182 }
183
184 /* Form a tuple. */
185 memset(values, 0, sizeof(values));
186 memset(nulls, false, sizeof(nulls));
187
188 values[Anum_pg_publication_pubname - 1] =
189 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
190 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
191
192 parse_publication_options(stmt->options,
193 &publish_given, &pubactions,
194 &publish_via_partition_root_given,
195 &publish_via_partition_root);
196
197 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
198 Anum_pg_publication_oid);
199 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
200 values[Anum_pg_publication_puballtables - 1] =
201 BoolGetDatum(stmt->for_all_tables);
202 values[Anum_pg_publication_pubinsert - 1] =
203 BoolGetDatum(pubactions.pubinsert);
204 values[Anum_pg_publication_pubupdate - 1] =
205 BoolGetDatum(pubactions.pubupdate);
206 values[Anum_pg_publication_pubdelete - 1] =
207 BoolGetDatum(pubactions.pubdelete);
208 values[Anum_pg_publication_pubtruncate - 1] =
209 BoolGetDatum(pubactions.pubtruncate);
210 values[Anum_pg_publication_pubviaroot - 1] =
211 BoolGetDatum(publish_via_partition_root);
212
213 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
214
215 /* Insert tuple into catalog. */
216 CatalogTupleInsert(rel, tup);
217 heap_freetuple(tup);
218
219 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
220
221 ObjectAddressSet(myself, PublicationRelationId, puboid);
222
223 /* Make the changes visible. */
224 CommandCounterIncrement();
225
226 if (stmt->tables)
227 {
228 List *rels;
229
230 Assert(list_length(stmt->tables) > 0);
231
232 rels = OpenTableList(stmt->tables);
233 PublicationAddTables(puboid, rels, true, NULL);
234 CloseTableList(rels);
235 }
236 else if (stmt->for_all_tables)
237 {
238 /* Invalidate relcache so that publication info is rebuilt. */
239 CacheInvalidateRelcacheAll();
240 }
241
242 table_close(rel, RowExclusiveLock);
243
244 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
245
246 if (wal_level != WAL_LEVEL_LOGICAL)
247 {
248 ereport(WARNING,
249 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
250 errmsg("wal_level is insufficient to publish logical changes"),
251 errhint("Set wal_level to logical before creating subscriptions.")));
252 }
253
254 return myself;
255 }
256
257 /*
258 * Change options of a publication.
259 */
260 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)261 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
262 HeapTuple tup)
263 {
264 bool nulls[Natts_pg_publication];
265 bool replaces[Natts_pg_publication];
266 Datum values[Natts_pg_publication];
267 bool publish_given;
268 PublicationActions pubactions;
269 bool publish_via_partition_root_given;
270 bool publish_via_partition_root;
271 ObjectAddress obj;
272 Form_pg_publication pubform;
273
274 parse_publication_options(stmt->options,
275 &publish_given, &pubactions,
276 &publish_via_partition_root_given,
277 &publish_via_partition_root);
278
279 /* Everything ok, form a new tuple. */
280 memset(values, 0, sizeof(values));
281 memset(nulls, false, sizeof(nulls));
282 memset(replaces, false, sizeof(replaces));
283
284 if (publish_given)
285 {
286 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
287 replaces[Anum_pg_publication_pubinsert - 1] = true;
288
289 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
290 replaces[Anum_pg_publication_pubupdate - 1] = true;
291
292 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
293 replaces[Anum_pg_publication_pubdelete - 1] = true;
294
295 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
296 replaces[Anum_pg_publication_pubtruncate - 1] = true;
297 }
298
299 if (publish_via_partition_root_given)
300 {
301 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
302 replaces[Anum_pg_publication_pubviaroot - 1] = true;
303 }
304
305 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
306 replaces);
307
308 /* Update the catalog. */
309 CatalogTupleUpdate(rel, &tup->t_self, tup);
310
311 CommandCounterIncrement();
312
313 pubform = (Form_pg_publication) GETSTRUCT(tup);
314
315 /* Invalidate the relcache. */
316 if (pubform->puballtables)
317 {
318 CacheInvalidateRelcacheAll();
319 }
320 else
321 {
322 /*
323 * For any partitioned tables contained in the publication, we must
324 * invalidate all partitions contained in the respective partition
325 * trees, not just those explicitly mentioned in the publication.
326 */
327 List *relids = GetPublicationRelations(pubform->oid,
328 PUBLICATION_PART_ALL);
329
330 InvalidatePublicationRels(relids);
331 }
332
333 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
334 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
335 (Node *) stmt);
336
337 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
338 }
339
340 /*
341 * Invalidate the relations.
342 */
343 void
InvalidatePublicationRels(List * relids)344 InvalidatePublicationRels(List *relids)
345 {
346 /*
347 * We don't want to send too many individual messages, at some point it's
348 * cheaper to just reset whole relcache.
349 */
350 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
351 {
352 ListCell *lc;
353
354 foreach(lc, relids)
355 CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
356 }
357 else
358 CacheInvalidateRelcacheAll();
359 }
360
361 /*
362 * Add or remove table to/from publication.
363 */
364 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)365 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
366 HeapTuple tup)
367 {
368 List *rels = NIL;
369 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
370 Oid pubid = pubform->oid;
371
372 /* Check that user is allowed to manipulate the publication tables. */
373 if (pubform->puballtables)
374 ereport(ERROR,
375 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
377 NameStr(pubform->pubname)),
378 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
379
380 Assert(list_length(stmt->tables) > 0);
381
382 rels = OpenTableList(stmt->tables);
383
384 if (stmt->tableAction == DEFELEM_ADD)
385 PublicationAddTables(pubid, rels, false, stmt);
386 else if (stmt->tableAction == DEFELEM_DROP)
387 PublicationDropTables(pubid, rels, false);
388 else /* DEFELEM_SET */
389 {
390 List *oldrelids = GetPublicationRelations(pubid,
391 PUBLICATION_PART_ROOT);
392 List *delrels = NIL;
393 ListCell *oldlc;
394
395 /* Calculate which relations to drop. */
396 foreach(oldlc, oldrelids)
397 {
398 Oid oldrelid = lfirst_oid(oldlc);
399 ListCell *newlc;
400 bool found = false;
401
402 foreach(newlc, rels)
403 {
404 Relation newrel = (Relation) lfirst(newlc);
405
406 if (RelationGetRelid(newrel) == oldrelid)
407 {
408 found = true;
409 break;
410 }
411 }
412
413 if (!found)
414 {
415 Relation oldrel = table_open(oldrelid,
416 ShareUpdateExclusiveLock);
417
418 delrels = lappend(delrels, oldrel);
419 }
420 }
421
422 /* And drop them. */
423 PublicationDropTables(pubid, delrels, true);
424
425 /*
426 * Don't bother calculating the difference for adding, we'll catch and
427 * skip existing ones when doing catalog update.
428 */
429 PublicationAddTables(pubid, rels, true, stmt);
430
431 CloseTableList(delrels);
432 }
433
434 CloseTableList(rels);
435 }
436
437 /*
438 * Alter the existing publication.
439 *
440 * This is dispatcher function for AlterPublicationOptions and
441 * AlterPublicationTables.
442 */
443 void
AlterPublication(AlterPublicationStmt * stmt)444 AlterPublication(AlterPublicationStmt *stmt)
445 {
446 Relation rel;
447 HeapTuple tup;
448 Form_pg_publication pubform;
449
450 rel = table_open(PublicationRelationId, RowExclusiveLock);
451
452 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
453 CStringGetDatum(stmt->pubname));
454
455 if (!HeapTupleIsValid(tup))
456 ereport(ERROR,
457 (errcode(ERRCODE_UNDEFINED_OBJECT),
458 errmsg("publication \"%s\" does not exist",
459 stmt->pubname)));
460
461 pubform = (Form_pg_publication) GETSTRUCT(tup);
462
463 /* must be owner */
464 if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
465 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
466 stmt->pubname);
467
468 if (stmt->options)
469 AlterPublicationOptions(stmt, rel, tup);
470 else
471 AlterPublicationTables(stmt, rel, tup);
472
473 /* Cleanup. */
474 heap_freetuple(tup);
475 table_close(rel, RowExclusiveLock);
476 }
477
478 /*
479 * Remove relation from publication by mapping OID.
480 */
481 void
RemovePublicationRelById(Oid proid)482 RemovePublicationRelById(Oid proid)
483 {
484 Relation rel;
485 HeapTuple tup;
486 Form_pg_publication_rel pubrel;
487 List *relids = NIL;
488
489 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
490
491 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
492
493 if (!HeapTupleIsValid(tup))
494 elog(ERROR, "cache lookup failed for publication table %u",
495 proid);
496
497 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
498
499 /*
500 * Invalidate relcache so that publication info is rebuilt.
501 *
502 * For the partitioned tables, we must invalidate all partitions contained
503 * in the respective partition hierarchies, not just the one explicitly
504 * mentioned in the publication. This is required because we implicitly
505 * publish the child tables when the parent table is published.
506 */
507 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
508 pubrel->prrelid);
509
510 InvalidatePublicationRels(relids);
511
512 CatalogTupleDelete(rel, &tup->t_self);
513
514 ReleaseSysCache(tup);
515
516 table_close(rel, RowExclusiveLock);
517 }
518
519 /*
520 * Remove the publication by mapping OID.
521 */
522 void
RemovePublicationById(Oid pubid)523 RemovePublicationById(Oid pubid)
524 {
525 Relation rel;
526 HeapTuple tup;
527 Form_pg_publication pubform;
528
529 rel = table_open(PublicationRelationId, RowExclusiveLock);
530
531 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
532 if (!HeapTupleIsValid(tup))
533 elog(ERROR, "cache lookup failed for publication %u", pubid);
534
535 pubform = (Form_pg_publication) GETSTRUCT(tup);
536
537 /* Invalidate relcache so that publication info is rebuilt. */
538 if (pubform->puballtables)
539 CacheInvalidateRelcacheAll();
540
541 CatalogTupleDelete(rel, &tup->t_self);
542
543 ReleaseSysCache(tup);
544
545 table_close(rel, RowExclusiveLock);
546 }
547
548 /*
549 * Open relations specified by a RangeVar list.
550 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
551 * add them to a publication.
552 */
553 static List *
OpenTableList(List * tables)554 OpenTableList(List *tables)
555 {
556 List *relids = NIL;
557 List *rels = NIL;
558 ListCell *lc;
559
560 /*
561 * Open, share-lock, and check all the explicitly-specified relations
562 */
563 foreach(lc, tables)
564 {
565 RangeVar *rv = castNode(RangeVar, lfirst(lc));
566 bool recurse = rv->inh;
567 Relation rel;
568 Oid myrelid;
569
570 /* Allow query cancel in case this takes a long time */
571 CHECK_FOR_INTERRUPTS();
572
573 rel = table_openrv(rv, ShareUpdateExclusiveLock);
574 myrelid = RelationGetRelid(rel);
575
576 /*
577 * Filter out duplicates if user specifies "foo, foo".
578 *
579 * Note that this algorithm is known to not be very efficient (O(N^2))
580 * but given that it only works on list of tables given to us by user
581 * it's deemed acceptable.
582 */
583 if (list_member_oid(relids, myrelid))
584 {
585 table_close(rel, ShareUpdateExclusiveLock);
586 continue;
587 }
588
589 rels = lappend(rels, rel);
590 relids = lappend_oid(relids, myrelid);
591
592 /*
593 * Add children of this rel, if requested, so that they too are added
594 * to the publication. A partitioned table can't have any inheritance
595 * children other than its partitions, which need not be explicitly
596 * added to the publication.
597 */
598 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
599 {
600 List *children;
601 ListCell *child;
602
603 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
604 NULL);
605
606 foreach(child, children)
607 {
608 Oid childrelid = lfirst_oid(child);
609
610 /* Allow query cancel in case this takes a long time */
611 CHECK_FOR_INTERRUPTS();
612
613 /*
614 * Skip duplicates if user specified both parent and child
615 * tables.
616 */
617 if (list_member_oid(relids, childrelid))
618 continue;
619
620 /* find_all_inheritors already got lock */
621 rel = table_open(childrelid, NoLock);
622 rels = lappend(rels, rel);
623 relids = lappend_oid(relids, childrelid);
624 }
625 }
626 }
627
628 list_free(relids);
629
630 return rels;
631 }
632
633 /*
634 * Close all relations in the list.
635 */
636 static void
CloseTableList(List * rels)637 CloseTableList(List *rels)
638 {
639 ListCell *lc;
640
641 foreach(lc, rels)
642 {
643 Relation rel = (Relation) lfirst(lc);
644
645 table_close(rel, NoLock);
646 }
647 }
648
649 /*
650 * Add listed tables to the publication.
651 */
652 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)653 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
654 AlterPublicationStmt *stmt)
655 {
656 ListCell *lc;
657
658 Assert(!stmt || !stmt->for_all_tables);
659
660 foreach(lc, rels)
661 {
662 Relation rel = (Relation) lfirst(lc);
663 ObjectAddress obj;
664
665 /* Must be owner of the table or superuser. */
666 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
667 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
668 RelationGetRelationName(rel));
669
670 obj = publication_add_relation(pubid, rel, if_not_exists);
671 if (stmt)
672 {
673 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
674 (Node *) stmt);
675
676 InvokeObjectPostCreateHook(PublicationRelRelationId,
677 obj.objectId, 0);
678 }
679 }
680 }
681
682 /*
683 * Remove listed tables from the publication.
684 */
685 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)686 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
687 {
688 ObjectAddress obj;
689 ListCell *lc;
690 Oid prid;
691
692 foreach(lc, rels)
693 {
694 Relation rel = (Relation) lfirst(lc);
695 Oid relid = RelationGetRelid(rel);
696
697 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
698 ObjectIdGetDatum(relid),
699 ObjectIdGetDatum(pubid));
700 if (!OidIsValid(prid))
701 {
702 if (missing_ok)
703 continue;
704
705 ereport(ERROR,
706 (errcode(ERRCODE_UNDEFINED_OBJECT),
707 errmsg("relation \"%s\" is not part of the publication",
708 RelationGetRelationName(rel))));
709 }
710
711 ObjectAddressSet(obj, PublicationRelRelationId, prid);
712 performDeletion(&obj, DROP_CASCADE, 0);
713 }
714 }
715
716 /*
717 * Internal workhorse for changing a publication owner
718 */
719 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)720 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
721 {
722 Form_pg_publication form;
723
724 form = (Form_pg_publication) GETSTRUCT(tup);
725
726 if (form->pubowner == newOwnerId)
727 return;
728
729 if (!superuser())
730 {
731 AclResult aclresult;
732
733 /* Must be owner */
734 if (!pg_publication_ownercheck(form->oid, GetUserId()))
735 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
736 NameStr(form->pubname));
737
738 /* Must be able to become new owner */
739 check_is_member_of_role(GetUserId(), newOwnerId);
740
741 /* New owner must have CREATE privilege on database */
742 aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
743 if (aclresult != ACLCHECK_OK)
744 aclcheck_error(aclresult, OBJECT_DATABASE,
745 get_database_name(MyDatabaseId));
746
747 if (form->puballtables && !superuser_arg(newOwnerId))
748 ereport(ERROR,
749 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
750 errmsg("permission denied to change owner of publication \"%s\"",
751 NameStr(form->pubname)),
752 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
753 }
754
755 form->pubowner = newOwnerId;
756 CatalogTupleUpdate(rel, &tup->t_self, tup);
757
758 /* Update owner dependency reference */
759 changeDependencyOnOwner(PublicationRelationId,
760 form->oid,
761 newOwnerId);
762
763 InvokeObjectPostAlterHook(PublicationRelationId,
764 form->oid, 0);
765 }
766
767 /*
768 * Change publication owner -- by name
769 */
770 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)771 AlterPublicationOwner(const char *name, Oid newOwnerId)
772 {
773 Oid subid;
774 HeapTuple tup;
775 Relation rel;
776 ObjectAddress address;
777 Form_pg_publication pubform;
778
779 rel = table_open(PublicationRelationId, RowExclusiveLock);
780
781 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
782
783 if (!HeapTupleIsValid(tup))
784 ereport(ERROR,
785 (errcode(ERRCODE_UNDEFINED_OBJECT),
786 errmsg("publication \"%s\" does not exist", name)));
787
788 pubform = (Form_pg_publication) GETSTRUCT(tup);
789 subid = pubform->oid;
790
791 AlterPublicationOwner_internal(rel, tup, newOwnerId);
792
793 ObjectAddressSet(address, PublicationRelationId, subid);
794
795 heap_freetuple(tup);
796
797 table_close(rel, RowExclusiveLock);
798
799 return address;
800 }
801
802 /*
803 * Change publication owner -- by OID
804 */
805 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)806 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
807 {
808 HeapTuple tup;
809 Relation rel;
810
811 rel = table_open(PublicationRelationId, RowExclusiveLock);
812
813 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
814
815 if (!HeapTupleIsValid(tup))
816 ereport(ERROR,
817 (errcode(ERRCODE_UNDEFINED_OBJECT),
818 errmsg("publication with OID %u does not exist", subid)));
819
820 AlterPublicationOwner_internal(rel, tup, newOwnerId);
821
822 heap_freetuple(tup);
823
824 table_close(rel, RowExclusiveLock);
825 }
826