1 /*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_publication.h"
29 #include "catalog/pg_publication_rel.h"
30 #include "catalog/pg_type.h"
31 #include "commands/dbcommands.h"
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/acl.h"
38 #include "utils/array.h"
39 #include "utils/builtins.h"
40 #include "utils/catcache.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/lsyscache.h"
44 #include "utils/rel.h"
45 #include "utils/syscache.h"
46 #include "utils/varlena.h"
47
48 static List *OpenTableList(List *tables);
49 static void CloseTableList(List *rels);
50 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
51 AlterPublicationStmt *stmt);
52 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
53
54 static void
parse_publication_options(List * options,bool * publish_given,PublicationActions * pubactions,bool * publish_via_partition_root_given,bool * publish_via_partition_root)55 parse_publication_options(List *options,
56 bool *publish_given,
57 PublicationActions *pubactions,
58 bool *publish_via_partition_root_given,
59 bool *publish_via_partition_root)
60 {
61 ListCell *lc;
62
63 *publish_given = false;
64 *publish_via_partition_root_given = false;
65
66 /* defaults */
67 pubactions->pubinsert = true;
68 pubactions->pubupdate = true;
69 pubactions->pubdelete = true;
70 pubactions->pubtruncate = true;
71 *publish_via_partition_root = false;
72
73 /* Parse options */
74 foreach(lc, options)
75 {
76 DefElem *defel = (DefElem *) lfirst(lc);
77
78 if (strcmp(defel->defname, "publish") == 0)
79 {
80 char *publish;
81 List *publish_list;
82 ListCell *lc;
83
84 if (*publish_given)
85 ereport(ERROR,
86 (errcode(ERRCODE_SYNTAX_ERROR),
87 errmsg("conflicting or redundant options")));
88
89 /*
90 * If publish option was given only the explicitly listed actions
91 * should be published.
92 */
93 pubactions->pubinsert = false;
94 pubactions->pubupdate = false;
95 pubactions->pubdelete = false;
96 pubactions->pubtruncate = false;
97
98 *publish_given = true;
99 publish = defGetString(defel);
100
101 if (!SplitIdentifierString(publish, ',', &publish_list))
102 ereport(ERROR,
103 (errcode(ERRCODE_SYNTAX_ERROR),
104 errmsg("invalid list syntax for \"publish\" option")));
105
106 /* Process the option list. */
107 foreach(lc, publish_list)
108 {
109 char *publish_opt = (char *) lfirst(lc);
110
111 if (strcmp(publish_opt, "insert") == 0)
112 pubactions->pubinsert = true;
113 else if (strcmp(publish_opt, "update") == 0)
114 pubactions->pubupdate = true;
115 else if (strcmp(publish_opt, "delete") == 0)
116 pubactions->pubdelete = true;
117 else if (strcmp(publish_opt, "truncate") == 0)
118 pubactions->pubtruncate = true;
119 else
120 ereport(ERROR,
121 (errcode(ERRCODE_SYNTAX_ERROR),
122 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123 }
124 }
125 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
126 {
127 if (*publish_via_partition_root_given)
128 ereport(ERROR,
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("conflicting or redundant options")));
131 *publish_via_partition_root_given = true;
132 *publish_via_partition_root = defGetBoolean(defel);
133 }
134 else
135 ereport(ERROR,
136 (errcode(ERRCODE_SYNTAX_ERROR),
137 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
138 }
139 }
140
141 /*
142 * Create new publication.
143 */
144 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)145 CreatePublication(CreatePublicationStmt *stmt)
146 {
147 Relation rel;
148 ObjectAddress myself;
149 Oid puboid;
150 bool nulls[Natts_pg_publication];
151 Datum values[Natts_pg_publication];
152 HeapTuple tup;
153 bool publish_given;
154 PublicationActions pubactions;
155 bool publish_via_partition_root_given;
156 bool publish_via_partition_root;
157 AclResult aclresult;
158
159 /* must have CREATE privilege on database */
160 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
161 if (aclresult != ACLCHECK_OK)
162 aclcheck_error(aclresult, OBJECT_DATABASE,
163 get_database_name(MyDatabaseId));
164
165 /* FOR ALL TABLES requires superuser */
166 if (stmt->for_all_tables && !superuser())
167 ereport(ERROR,
168 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
169 errmsg("must be superuser to create FOR ALL TABLES publication")));
170
171 rel = table_open(PublicationRelationId, RowExclusiveLock);
172
173 /* Check if name is used */
174 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
175 CStringGetDatum(stmt->pubname));
176 if (OidIsValid(puboid))
177 {
178 ereport(ERROR,
179 (errcode(ERRCODE_DUPLICATE_OBJECT),
180 errmsg("publication \"%s\" already exists",
181 stmt->pubname)));
182 }
183
184 /* Form a tuple. */
185 memset(values, 0, sizeof(values));
186 memset(nulls, false, sizeof(nulls));
187
188 values[Anum_pg_publication_pubname - 1] =
189 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
190 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
191
192 parse_publication_options(stmt->options,
193 &publish_given, &pubactions,
194 &publish_via_partition_root_given,
195 &publish_via_partition_root);
196
197 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
198 Anum_pg_publication_oid);
199 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
200 values[Anum_pg_publication_puballtables - 1] =
201 BoolGetDatum(stmt->for_all_tables);
202 values[Anum_pg_publication_pubinsert - 1] =
203 BoolGetDatum(pubactions.pubinsert);
204 values[Anum_pg_publication_pubupdate - 1] =
205 BoolGetDatum(pubactions.pubupdate);
206 values[Anum_pg_publication_pubdelete - 1] =
207 BoolGetDatum(pubactions.pubdelete);
208 values[Anum_pg_publication_pubtruncate - 1] =
209 BoolGetDatum(pubactions.pubtruncate);
210 values[Anum_pg_publication_pubviaroot - 1] =
211 BoolGetDatum(publish_via_partition_root);
212
213 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
214
215 /* Insert tuple into catalog. */
216 CatalogTupleInsert(rel, tup);
217 heap_freetuple(tup);
218
219 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
220
221 ObjectAddressSet(myself, PublicationRelationId, puboid);
222
223 /* Make the changes visible. */
224 CommandCounterIncrement();
225
226 if (stmt->tables)
227 {
228 List *rels;
229
230 Assert(list_length(stmt->tables) > 0);
231
232 rels = OpenTableList(stmt->tables);
233 PublicationAddTables(puboid, rels, true, NULL);
234 CloseTableList(rels);
235 }
236 else if (stmt->for_all_tables)
237 {
238 /* Invalidate relcache so that publication info is rebuilt. */
239 CacheInvalidateRelcacheAll();
240 }
241
242 table_close(rel, RowExclusiveLock);
243
244 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
245
246 if (wal_level != WAL_LEVEL_LOGICAL)
247 {
248 ereport(WARNING,
249 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
250 errmsg("wal_level is insufficient to publish logical changes"),
251 errhint("Set wal_level to logical before creating subscriptions.")));
252 }
253
254 return myself;
255 }
256
257 /*
258 * Change options of a publication.
259 */
260 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)261 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
262 HeapTuple tup)
263 {
264 bool nulls[Natts_pg_publication];
265 bool replaces[Natts_pg_publication];
266 Datum values[Natts_pg_publication];
267 bool publish_given;
268 PublicationActions pubactions;
269 bool publish_via_partition_root_given;
270 bool publish_via_partition_root;
271 ObjectAddress obj;
272 Form_pg_publication pubform;
273
274 parse_publication_options(stmt->options,
275 &publish_given, &pubactions,
276 &publish_via_partition_root_given,
277 &publish_via_partition_root);
278
279 /* Everything ok, form a new tuple. */
280 memset(values, 0, sizeof(values));
281 memset(nulls, false, sizeof(nulls));
282 memset(replaces, false, sizeof(replaces));
283
284 if (publish_given)
285 {
286 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
287 replaces[Anum_pg_publication_pubinsert - 1] = true;
288
289 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
290 replaces[Anum_pg_publication_pubupdate - 1] = true;
291
292 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
293 replaces[Anum_pg_publication_pubdelete - 1] = true;
294
295 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
296 replaces[Anum_pg_publication_pubtruncate - 1] = true;
297 }
298
299 if (publish_via_partition_root_given)
300 {
301 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
302 replaces[Anum_pg_publication_pubviaroot - 1] = true;
303 }
304
305 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
306 replaces);
307
308 /* Update the catalog. */
309 CatalogTupleUpdate(rel, &tup->t_self, tup);
310
311 CommandCounterIncrement();
312
313 pubform = (Form_pg_publication) GETSTRUCT(tup);
314
315 /* Invalidate the relcache. */
316 if (pubform->puballtables)
317 {
318 CacheInvalidateRelcacheAll();
319 }
320 else
321 {
322 /*
323 * For any partitioned tables contained in the publication, we must
324 * invalidate all partitions contained in the respective partition
325 * trees, not just those explicitly mentioned in the publication.
326 */
327 List *relids = GetPublicationRelations(pubform->oid,
328 PUBLICATION_PART_ALL);
329
330 InvalidatePublicationRels(relids);
331 }
332
333 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
334 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
335 (Node *) stmt);
336
337 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
338 }
339
340 /*
341 * Invalidate the relations.
342 */
343 void
InvalidatePublicationRels(List * relids)344 InvalidatePublicationRels(List *relids)
345 {
346 /*
347 * We don't want to send too many individual messages, at some point it's
348 * cheaper to just reset whole relcache.
349 */
350 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
351 {
352 ListCell *lc;
353
354 foreach(lc, relids)
355 CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
356 }
357 else
358 CacheInvalidateRelcacheAll();
359 }
360
361 /*
362 * Add or remove table to/from publication.
363 */
364 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)365 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
366 HeapTuple tup)
367 {
368 List *rels = NIL;
369 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
370 Oid pubid = pubform->oid;
371
372 /* Check that user is allowed to manipulate the publication tables. */
373 if (pubform->puballtables)
374 ereport(ERROR,
375 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
377 NameStr(pubform->pubname)),
378 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
379
380 Assert(list_length(stmt->tables) > 0);
381
382 rels = OpenTableList(stmt->tables);
383
384 if (stmt->tableAction == DEFELEM_ADD)
385 PublicationAddTables(pubid, rels, false, stmt);
386 else if (stmt->tableAction == DEFELEM_DROP)
387 PublicationDropTables(pubid, rels, false);
388 else /* DEFELEM_SET */
389 {
390 List *oldrelids = GetPublicationRelations(pubid,
391 PUBLICATION_PART_ROOT);
392 List *delrels = NIL;
393 ListCell *oldlc;
394
395 /* Calculate which relations to drop. */
396 foreach(oldlc, oldrelids)
397 {
398 Oid oldrelid = lfirst_oid(oldlc);
399 ListCell *newlc;
400 bool found = false;
401
402 foreach(newlc, rels)
403 {
404 Relation newrel = (Relation) lfirst(newlc);
405
406 if (RelationGetRelid(newrel) == oldrelid)
407 {
408 found = true;
409 break;
410 }
411 }
412
413 if (!found)
414 {
415 Relation oldrel = table_open(oldrelid,
416 ShareUpdateExclusiveLock);
417
418 delrels = lappend(delrels, oldrel);
419 }
420 }
421
422 /* And drop them. */
423 PublicationDropTables(pubid, delrels, true);
424
425 /*
426 * Don't bother calculating the difference for adding, we'll catch and
427 * skip existing ones when doing catalog update.
428 */
429 PublicationAddTables(pubid, rels, true, stmt);
430
431 CloseTableList(delrels);
432 }
433
434 CloseTableList(rels);
435 }
436
437 /*
438 * Alter the existing publication.
439 *
440 * This is dispatcher function for AlterPublicationOptions and
441 * AlterPublicationTables.
442 */
443 void
AlterPublication(AlterPublicationStmt * stmt)444 AlterPublication(AlterPublicationStmt *stmt)
445 {
446 Relation rel;
447 HeapTuple tup;
448 Form_pg_publication pubform;
449
450 rel = table_open(PublicationRelationId, RowExclusiveLock);
451
452 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
453 CStringGetDatum(stmt->pubname));
454
455 if (!HeapTupleIsValid(tup))
456 ereport(ERROR,
457 (errcode(ERRCODE_UNDEFINED_OBJECT),
458 errmsg("publication \"%s\" does not exist",
459 stmt->pubname)));
460
461 pubform = (Form_pg_publication) GETSTRUCT(tup);
462
463 /* must be owner */
464 if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
465 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
466 stmt->pubname);
467
468 if (stmt->options)
469 AlterPublicationOptions(stmt, rel, tup);
470 else
471 AlterPublicationTables(stmt, rel, tup);
472
473 /* Cleanup. */
474 heap_freetuple(tup);
475 table_close(rel, RowExclusiveLock);
476 }
477
478 /*
479 * Remove the publication by mapping OID.
480 */
481 void
RemovePublicationById(Oid pubid)482 RemovePublicationById(Oid pubid)
483 {
484 Relation rel;
485 HeapTuple tup;
486 Form_pg_publication pubform;
487
488 rel = table_open(PublicationRelationId, RowExclusiveLock);
489
490 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
491
492 if (!HeapTupleIsValid(tup))
493 elog(ERROR, "cache lookup failed for publication %u", pubid);
494
495 pubform = (Form_pg_publication) GETSTRUCT(tup);
496
497 /* Invalidate relcache so that publication info is rebuilt. */
498 if (pubform->puballtables)
499 CacheInvalidateRelcacheAll();
500
501 CatalogTupleDelete(rel, &tup->t_self);
502
503 ReleaseSysCache(tup);
504
505 table_close(rel, RowExclusiveLock);
506 }
507
508 /*
509 * Remove relation from publication by mapping OID.
510 */
511 void
RemovePublicationRelById(Oid proid)512 RemovePublicationRelById(Oid proid)
513 {
514 Relation rel;
515 HeapTuple tup;
516 Form_pg_publication_rel pubrel;
517 List *relids = NIL;
518
519 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
520
521 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
522
523 if (!HeapTupleIsValid(tup))
524 elog(ERROR, "cache lookup failed for publication table %u",
525 proid);
526
527 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
528
529 /*
530 * Invalidate relcache so that publication info is rebuilt.
531 *
532 * For the partitioned tables, we must invalidate all partitions contained
533 * in the respective partition hierarchies, not just the one explicitly
534 * mentioned in the publication. This is required because we implicitly
535 * publish the child tables when the parent table is published.
536 */
537 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538 pubrel->prrelid);
539
540 InvalidatePublicationRels(relids);
541
542 CatalogTupleDelete(rel, &tup->t_self);
543
544 ReleaseSysCache(tup);
545
546 table_close(rel, RowExclusiveLock);
547 }
548
549 /*
550 * Open relations specified by a RangeVar list.
551 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
552 * add them to a publication.
553 */
554 static List *
OpenTableList(List * tables)555 OpenTableList(List *tables)
556 {
557 List *relids = NIL;
558 List *rels = NIL;
559 ListCell *lc;
560
561 /*
562 * Open, share-lock, and check all the explicitly-specified relations
563 */
564 foreach(lc, tables)
565 {
566 RangeVar *rv = castNode(RangeVar, lfirst(lc));
567 bool recurse = rv->inh;
568 Relation rel;
569 Oid myrelid;
570
571 /* Allow query cancel in case this takes a long time */
572 CHECK_FOR_INTERRUPTS();
573
574 rel = table_openrv(rv, ShareUpdateExclusiveLock);
575 myrelid = RelationGetRelid(rel);
576
577 /*
578 * Filter out duplicates if user specifies "foo, foo".
579 *
580 * Note that this algorithm is known to not be very efficient (O(N^2))
581 * but given that it only works on list of tables given to us by user
582 * it's deemed acceptable.
583 */
584 if (list_member_oid(relids, myrelid))
585 {
586 table_close(rel, ShareUpdateExclusiveLock);
587 continue;
588 }
589
590 rels = lappend(rels, rel);
591 relids = lappend_oid(relids, myrelid);
592
593 /*
594 * Add children of this rel, if requested, so that they too are added
595 * to the publication. A partitioned table can't have any inheritance
596 * children other than its partitions, which need not be explicitly
597 * added to the publication.
598 */
599 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
600 {
601 List *children;
602 ListCell *child;
603
604 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
605 NULL);
606
607 foreach(child, children)
608 {
609 Oid childrelid = lfirst_oid(child);
610
611 /* Allow query cancel in case this takes a long time */
612 CHECK_FOR_INTERRUPTS();
613
614 /*
615 * Skip duplicates if user specified both parent and child
616 * tables.
617 */
618 if (list_member_oid(relids, childrelid))
619 continue;
620
621 /* find_all_inheritors already got lock */
622 rel = table_open(childrelid, NoLock);
623 rels = lappend(rels, rel);
624 relids = lappend_oid(relids, childrelid);
625 }
626 }
627 }
628
629 list_free(relids);
630
631 return rels;
632 }
633
634 /*
635 * Close all relations in the list.
636 */
637 static void
CloseTableList(List * rels)638 CloseTableList(List *rels)
639 {
640 ListCell *lc;
641
642 foreach(lc, rels)
643 {
644 Relation rel = (Relation) lfirst(lc);
645
646 table_close(rel, NoLock);
647 }
648 }
649
650 /*
651 * Add listed tables to the publication.
652 */
653 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)654 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
655 AlterPublicationStmt *stmt)
656 {
657 ListCell *lc;
658
659 Assert(!stmt || !stmt->for_all_tables);
660
661 foreach(lc, rels)
662 {
663 Relation rel = (Relation) lfirst(lc);
664 ObjectAddress obj;
665
666 /* Must be owner of the table or superuser. */
667 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
668 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
669 RelationGetRelationName(rel));
670
671 obj = publication_add_relation(pubid, rel, if_not_exists);
672 if (stmt)
673 {
674 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
675 (Node *) stmt);
676
677 InvokeObjectPostCreateHook(PublicationRelRelationId,
678 obj.objectId, 0);
679 }
680 }
681 }
682
683 /*
684 * Remove listed tables from the publication.
685 */
686 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)687 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
688 {
689 ObjectAddress obj;
690 ListCell *lc;
691 Oid prid;
692
693 foreach(lc, rels)
694 {
695 Relation rel = (Relation) lfirst(lc);
696 Oid relid = RelationGetRelid(rel);
697
698 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
699 ObjectIdGetDatum(relid),
700 ObjectIdGetDatum(pubid));
701 if (!OidIsValid(prid))
702 {
703 if (missing_ok)
704 continue;
705
706 ereport(ERROR,
707 (errcode(ERRCODE_UNDEFINED_OBJECT),
708 errmsg("relation \"%s\" is not part of the publication",
709 RelationGetRelationName(rel))));
710 }
711
712 ObjectAddressSet(obj, PublicationRelRelationId, prid);
713 performDeletion(&obj, DROP_CASCADE, 0);
714 }
715 }
716
717 /*
718 * Internal workhorse for changing a publication owner
719 */
720 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)721 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
722 {
723 Form_pg_publication form;
724
725 form = (Form_pg_publication) GETSTRUCT(tup);
726
727 if (form->pubowner == newOwnerId)
728 return;
729
730 if (!superuser())
731 {
732 AclResult aclresult;
733
734 /* Must be owner */
735 if (!pg_publication_ownercheck(form->oid, GetUserId()))
736 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
737 NameStr(form->pubname));
738
739 /* Must be able to become new owner */
740 check_is_member_of_role(GetUserId(), newOwnerId);
741
742 /* New owner must have CREATE privilege on database */
743 aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
744 if (aclresult != ACLCHECK_OK)
745 aclcheck_error(aclresult, OBJECT_DATABASE,
746 get_database_name(MyDatabaseId));
747
748 if (form->puballtables && !superuser_arg(newOwnerId))
749 ereport(ERROR,
750 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
751 errmsg("permission denied to change owner of publication \"%s\"",
752 NameStr(form->pubname)),
753 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
754 }
755
756 form->pubowner = newOwnerId;
757 CatalogTupleUpdate(rel, &tup->t_self, tup);
758
759 /* Update owner dependency reference */
760 changeDependencyOnOwner(PublicationRelationId,
761 form->oid,
762 newOwnerId);
763
764 InvokeObjectPostAlterHook(PublicationRelationId,
765 form->oid, 0);
766 }
767
768 /*
769 * Change publication owner -- by name
770 */
771 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)772 AlterPublicationOwner(const char *name, Oid newOwnerId)
773 {
774 Oid subid;
775 HeapTuple tup;
776 Relation rel;
777 ObjectAddress address;
778 Form_pg_publication pubform;
779
780 rel = table_open(PublicationRelationId, RowExclusiveLock);
781
782 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
783
784 if (!HeapTupleIsValid(tup))
785 ereport(ERROR,
786 (errcode(ERRCODE_UNDEFINED_OBJECT),
787 errmsg("publication \"%s\" does not exist", name)));
788
789 pubform = (Form_pg_publication) GETSTRUCT(tup);
790 subid = pubform->oid;
791
792 AlterPublicationOwner_internal(rel, tup, newOwnerId);
793
794 ObjectAddressSet(address, PublicationRelationId, subid);
795
796 heap_freetuple(tup);
797
798 table_close(rel, RowExclusiveLock);
799
800 return address;
801 }
802
803 /*
804 * Change publication owner -- by OID
805 */
806 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)807 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
808 {
809 HeapTuple tup;
810 Relation rel;
811
812 rel = table_open(PublicationRelationId, RowExclusiveLock);
813
814 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
815
816 if (!HeapTupleIsValid(tup))
817 ereport(ERROR,
818 (errcode(ERRCODE_UNDEFINED_OBJECT),
819 errmsg("publication with OID %u does not exist", subid)));
820
821 AlterPublicationOwner_internal(rel, tup, newOwnerId);
822
823 heap_freetuple(tup);
824
825 table_close(rel, RowExclusiveLock);
826 }
827