1 /*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "funcapi.h"
18 #include "miscadmin.h"
19
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25
26 #include "catalog/catalog.h"
27 #include "catalog/indexing.h"
28 #include "catalog/namespace.h"
29 #include "catalog/objectaccess.h"
30 #include "catalog/objectaddress.h"
31 #include "catalog/pg_inherits.h"
32 #include "catalog/pg_type.h"
33 #include "catalog/pg_publication.h"
34 #include "catalog/pg_publication_rel.h"
35
36 #include "commands/dbcommands.h"
37 #include "commands/defrem.h"
38 #include "commands/event_trigger.h"
39 #include "commands/publicationcmds.h"
40
41 #include "utils/array.h"
42 #include "utils/builtins.h"
43 #include "utils/catcache.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/rel.h"
48 #include "utils/syscache.h"
49 #include "utils/varlena.h"
50
51 /* Same as MAXNUMMESSAGES in sinvaladt.c */
52 #define MAX_RELCACHE_INVAL_MSGS 4096
53
54 static List *OpenTableList(List *tables);
55 static void CloseTableList(List *rels);
56 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
57 AlterPublicationStmt *stmt);
58 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
59
60 static void
parse_publication_options(List * options,bool * publish_given,bool * publish_insert,bool * publish_update,bool * publish_delete,bool * publish_truncate)61 parse_publication_options(List *options,
62 bool *publish_given,
63 bool *publish_insert,
64 bool *publish_update,
65 bool *publish_delete,
66 bool *publish_truncate)
67 {
68 ListCell *lc;
69
70 *publish_given = false;
71
72 /* Defaults are true */
73 *publish_insert = true;
74 *publish_update = true;
75 *publish_delete = true;
76 *publish_truncate = true;
77
78 /* Parse options */
79 foreach(lc, options)
80 {
81 DefElem *defel = (DefElem *) lfirst(lc);
82
83 if (strcmp(defel->defname, "publish") == 0)
84 {
85 char *publish;
86 List *publish_list;
87 ListCell *lc;
88
89 if (*publish_given)
90 ereport(ERROR,
91 (errcode(ERRCODE_SYNTAX_ERROR),
92 errmsg("conflicting or redundant options")));
93
94 /*
95 * If publish option was given only the explicitly listed actions
96 * should be published.
97 */
98 *publish_insert = false;
99 *publish_update = false;
100 *publish_delete = false;
101 *publish_truncate = false;
102
103 *publish_given = true;
104 publish = defGetString(defel);
105
106 if (!SplitIdentifierString(publish, ',', &publish_list))
107 ereport(ERROR,
108 (errcode(ERRCODE_SYNTAX_ERROR),
109 errmsg("invalid list syntax for \"publish\" option")));
110
111 /* Process the option list. */
112 foreach(lc, publish_list)
113 {
114 char *publish_opt = (char *) lfirst(lc);
115
116 if (strcmp(publish_opt, "insert") == 0)
117 *publish_insert = true;
118 else if (strcmp(publish_opt, "update") == 0)
119 *publish_update = true;
120 else if (strcmp(publish_opt, "delete") == 0)
121 *publish_delete = true;
122 else if (strcmp(publish_opt, "truncate") == 0)
123 *publish_truncate = true;
124 else
125 ereport(ERROR,
126 (errcode(ERRCODE_SYNTAX_ERROR),
127 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
128 }
129 }
130 else
131 ereport(ERROR,
132 (errcode(ERRCODE_SYNTAX_ERROR),
133 errmsg("unrecognized publication parameter: %s", defel->defname)));
134 }
135 }
136
137 /*
138 * Create new publication.
139 */
140 ObjectAddress
CreatePublication(CreatePublicationStmt * stmt)141 CreatePublication(CreatePublicationStmt *stmt)
142 {
143 Relation rel;
144 ObjectAddress myself;
145 Oid puboid;
146 bool nulls[Natts_pg_publication];
147 Datum values[Natts_pg_publication];
148 HeapTuple tup;
149 bool publish_given;
150 bool publish_insert;
151 bool publish_update;
152 bool publish_delete;
153 bool publish_truncate;
154 AclResult aclresult;
155
156 /* must have CREATE privilege on database */
157 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
158 if (aclresult != ACLCHECK_OK)
159 aclcheck_error(aclresult, OBJECT_DATABASE,
160 get_database_name(MyDatabaseId));
161
162 /* FOR ALL TABLES requires superuser */
163 if (stmt->for_all_tables && !superuser())
164 ereport(ERROR,
165 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
166 (errmsg("must be superuser to create FOR ALL TABLES publication"))));
167
168 rel = heap_open(PublicationRelationId, RowExclusiveLock);
169
170 /* Check if name is used */
171 puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
172 if (OidIsValid(puboid))
173 {
174 ereport(ERROR,
175 (errcode(ERRCODE_DUPLICATE_OBJECT),
176 errmsg("publication \"%s\" already exists",
177 stmt->pubname)));
178 }
179
180 /* Form a tuple. */
181 memset(values, 0, sizeof(values));
182 memset(nulls, false, sizeof(nulls));
183
184 values[Anum_pg_publication_pubname - 1] =
185 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
186 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
187
188 parse_publication_options(stmt->options,
189 &publish_given, &publish_insert,
190 &publish_update, &publish_delete,
191 &publish_truncate);
192
193 values[Anum_pg_publication_puballtables - 1] =
194 BoolGetDatum(stmt->for_all_tables);
195 values[Anum_pg_publication_pubinsert - 1] =
196 BoolGetDatum(publish_insert);
197 values[Anum_pg_publication_pubupdate - 1] =
198 BoolGetDatum(publish_update);
199 values[Anum_pg_publication_pubdelete - 1] =
200 BoolGetDatum(publish_delete);
201 values[Anum_pg_publication_pubtruncate - 1] =
202 BoolGetDatum(publish_truncate);
203
204 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
205
206 /* Insert tuple into catalog. */
207 puboid = CatalogTupleInsert(rel, tup);
208 heap_freetuple(tup);
209
210 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
211
212 ObjectAddressSet(myself, PublicationRelationId, puboid);
213
214 /* Make the changes visible. */
215 CommandCounterIncrement();
216
217 if (stmt->tables)
218 {
219 List *rels;
220
221 Assert(list_length(stmt->tables) > 0);
222
223 rels = OpenTableList(stmt->tables);
224 PublicationAddTables(puboid, rels, true, NULL);
225 CloseTableList(rels);
226 }
227 else if (stmt->for_all_tables)
228 {
229 /* Invalidate relcache so that publication info is rebuilt. */
230 CacheInvalidateRelcacheAll();
231 }
232
233 heap_close(rel, RowExclusiveLock);
234
235 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
236
237 return myself;
238 }
239
240 /*
241 * Change options of a publication.
242 */
243 static void
AlterPublicationOptions(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)244 AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
245 HeapTuple tup)
246 {
247 bool nulls[Natts_pg_publication];
248 bool replaces[Natts_pg_publication];
249 Datum values[Natts_pg_publication];
250 bool publish_given;
251 bool publish_insert;
252 bool publish_update;
253 bool publish_delete;
254 bool publish_truncate;
255 ObjectAddress obj;
256
257 parse_publication_options(stmt->options,
258 &publish_given, &publish_insert,
259 &publish_update, &publish_delete,
260 &publish_truncate);
261
262 /* Everything ok, form a new tuple. */
263 memset(values, 0, sizeof(values));
264 memset(nulls, false, sizeof(nulls));
265 memset(replaces, false, sizeof(replaces));
266
267 if (publish_given)
268 {
269 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
270 replaces[Anum_pg_publication_pubinsert - 1] = true;
271
272 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
273 replaces[Anum_pg_publication_pubupdate - 1] = true;
274
275 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
276 replaces[Anum_pg_publication_pubdelete - 1] = true;
277
278 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
279 replaces[Anum_pg_publication_pubtruncate - 1] = true;
280 }
281
282 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
283 replaces);
284
285 /* Update the catalog. */
286 CatalogTupleUpdate(rel, &tup->t_self, tup);
287
288 CommandCounterIncrement();
289
290 /* Invalidate the relcache. */
291 if (((Form_pg_publication) GETSTRUCT(tup))->puballtables)
292 {
293 CacheInvalidateRelcacheAll();
294 }
295 else
296 {
297 List *relids = GetPublicationRelations(HeapTupleGetOid(tup));
298
299 /*
300 * We don't want to send too many individual messages, at some point
301 * it's cheaper to just reset whole relcache.
302 */
303 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
304 {
305 ListCell *lc;
306
307 foreach(lc, relids)
308 {
309 Oid relid = lfirst_oid(lc);
310
311 CacheInvalidateRelcacheByRelid(relid);
312 }
313 }
314 else
315 CacheInvalidateRelcacheAll();
316 }
317
318 ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup));
319 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
320 (Node *) stmt);
321
322 InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0);
323 }
324
325 /*
326 * Add or remove table to/from publication.
327 */
328 static void
AlterPublicationTables(AlterPublicationStmt * stmt,Relation rel,HeapTuple tup)329 AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
330 HeapTuple tup)
331 {
332 Oid pubid = HeapTupleGetOid(tup);
333 List *rels = NIL;
334 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
335
336 /* Check that user is allowed to manipulate the publication tables. */
337 if (pubform->puballtables)
338 ereport(ERROR,
339 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
340 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
341 NameStr(pubform->pubname)),
342 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
343
344 Assert(list_length(stmt->tables) > 0);
345
346 rels = OpenTableList(stmt->tables);
347
348 if (stmt->tableAction == DEFELEM_ADD)
349 PublicationAddTables(pubid, rels, false, stmt);
350 else if (stmt->tableAction == DEFELEM_DROP)
351 PublicationDropTables(pubid, rels, false);
352 else /* DEFELEM_SET */
353 {
354 List *oldrelids = GetPublicationRelations(pubid);
355 List *delrels = NIL;
356 ListCell *oldlc;
357
358 /* Calculate which relations to drop. */
359 foreach(oldlc, oldrelids)
360 {
361 Oid oldrelid = lfirst_oid(oldlc);
362 ListCell *newlc;
363 bool found = false;
364
365 foreach(newlc, rels)
366 {
367 Relation newrel = (Relation) lfirst(newlc);
368
369 if (RelationGetRelid(newrel) == oldrelid)
370 {
371 found = true;
372 break;
373 }
374 }
375
376 if (!found)
377 {
378 Relation oldrel = heap_open(oldrelid,
379 ShareUpdateExclusiveLock);
380
381 delrels = lappend(delrels, oldrel);
382 }
383 }
384
385 /* And drop them. */
386 PublicationDropTables(pubid, delrels, true);
387
388 /*
389 * Don't bother calculating the difference for adding, we'll catch and
390 * skip existing ones when doing catalog update.
391 */
392 PublicationAddTables(pubid, rels, true, stmt);
393
394 CloseTableList(delrels);
395 }
396
397 CloseTableList(rels);
398 }
399
400 /*
401 * Alter the existing publication.
402 *
403 * This is dispatcher function for AlterPublicationOptions and
404 * AlterPublicationTables.
405 */
406 void
AlterPublication(AlterPublicationStmt * stmt)407 AlterPublication(AlterPublicationStmt *stmt)
408 {
409 Relation rel;
410 HeapTuple tup;
411
412 rel = heap_open(PublicationRelationId, RowExclusiveLock);
413
414 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
415 CStringGetDatum(stmt->pubname));
416
417 if (!HeapTupleIsValid(tup))
418 ereport(ERROR,
419 (errcode(ERRCODE_UNDEFINED_OBJECT),
420 errmsg("publication \"%s\" does not exist",
421 stmt->pubname)));
422
423 /* must be owner */
424 if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
425 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
426 stmt->pubname);
427
428 if (stmt->options)
429 AlterPublicationOptions(stmt, rel, tup);
430 else
431 AlterPublicationTables(stmt, rel, tup);
432
433 /* Cleanup. */
434 heap_freetuple(tup);
435 heap_close(rel, RowExclusiveLock);
436 }
437
438 /*
439 * Drop publication by OID
440 */
441 void
RemovePublicationById(Oid pubid)442 RemovePublicationById(Oid pubid)
443 {
444 Relation rel;
445 HeapTuple tup;
446 Form_pg_publication pubform;
447
448 rel = heap_open(PublicationRelationId, RowExclusiveLock);
449
450 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
451
452 if (!HeapTupleIsValid(tup))
453 elog(ERROR, "cache lookup failed for publication %u", pubid);
454
455 pubform = (Form_pg_publication) GETSTRUCT(tup);
456
457 /* Invalidate relcache so that publication info is rebuilt. */
458 if (pubform->puballtables)
459 CacheInvalidateRelcacheAll();
460
461 CatalogTupleDelete(rel, &tup->t_self);
462
463 ReleaseSysCache(tup);
464
465 heap_close(rel, RowExclusiveLock);
466 }
467
468 /*
469 * Remove relation from publication by mapping OID.
470 */
471 void
RemovePublicationRelById(Oid proid)472 RemovePublicationRelById(Oid proid)
473 {
474 Relation rel;
475 HeapTuple tup;
476 Form_pg_publication_rel pubrel;
477
478 rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
479
480 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
481
482 if (!HeapTupleIsValid(tup))
483 elog(ERROR, "cache lookup failed for publication table %u",
484 proid);
485
486 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
487
488 /* Invalidate relcache so that publication info is rebuilt. */
489 CacheInvalidateRelcacheByRelid(pubrel->prrelid);
490
491 CatalogTupleDelete(rel, &tup->t_self);
492
493 ReleaseSysCache(tup);
494
495 heap_close(rel, RowExclusiveLock);
496 }
497
498 /*
499 * Open relations specified by a RangeVar list.
500 * The returned tables are locked in ShareUpdateExclusiveLock mode.
501 */
502 static List *
OpenTableList(List * tables)503 OpenTableList(List *tables)
504 {
505 List *relids = NIL;
506 List *rels = NIL;
507 ListCell *lc;
508
509 /*
510 * Open, share-lock, and check all the explicitly-specified relations
511 */
512 foreach(lc, tables)
513 {
514 RangeVar *rv = castNode(RangeVar, lfirst(lc));
515 bool recurse = rv->inh;
516 Relation rel;
517 Oid myrelid;
518
519 /* Allow query cancel in case this takes a long time */
520 CHECK_FOR_INTERRUPTS();
521
522 rel = heap_openrv(rv, ShareUpdateExclusiveLock);
523 myrelid = RelationGetRelid(rel);
524
525 /*
526 * Filter out duplicates if user specifies "foo, foo".
527 *
528 * Note that this algorithm is known to not be very efficient (O(N^2))
529 * but given that it only works on list of tables given to us by user
530 * it's deemed acceptable.
531 */
532 if (list_member_oid(relids, myrelid))
533 {
534 heap_close(rel, ShareUpdateExclusiveLock);
535 continue;
536 }
537
538 rels = lappend(rels, rel);
539 relids = lappend_oid(relids, myrelid);
540
541 /* Add children of this rel, if requested */
542 if (recurse)
543 {
544 List *children;
545 ListCell *child;
546
547 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
548 NULL);
549
550 foreach(child, children)
551 {
552 Oid childrelid = lfirst_oid(child);
553
554 /* Allow query cancel in case this takes a long time */
555 CHECK_FOR_INTERRUPTS();
556
557 /*
558 * Skip duplicates if user specified both parent and child
559 * tables.
560 */
561 if (list_member_oid(relids, childrelid))
562 continue;
563
564 /* find_all_inheritors already got lock */
565 rel = heap_open(childrelid, NoLock);
566 rels = lappend(rels, rel);
567 relids = lappend_oid(relids, childrelid);
568 }
569 }
570 }
571
572 list_free(relids);
573
574 return rels;
575 }
576
577 /*
578 * Close all relations in the list.
579 */
580 static void
CloseTableList(List * rels)581 CloseTableList(List *rels)
582 {
583 ListCell *lc;
584
585 foreach(lc, rels)
586 {
587 Relation rel = (Relation) lfirst(lc);
588
589 heap_close(rel, NoLock);
590 }
591 }
592
593 /*
594 * Add listed tables to the publication.
595 */
596 static void
PublicationAddTables(Oid pubid,List * rels,bool if_not_exists,AlterPublicationStmt * stmt)597 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
598 AlterPublicationStmt *stmt)
599 {
600 ListCell *lc;
601
602 Assert(!stmt || !stmt->for_all_tables);
603
604 foreach(lc, rels)
605 {
606 Relation rel = (Relation) lfirst(lc);
607 ObjectAddress obj;
608
609 /* Must be owner of the table or superuser. */
610 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
611 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
612 RelationGetRelationName(rel));
613
614 obj = publication_add_relation(pubid, rel, if_not_exists);
615 if (stmt)
616 {
617 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
618 (Node *) stmt);
619
620 InvokeObjectPostCreateHook(PublicationRelRelationId,
621 obj.objectId, 0);
622 }
623 }
624 }
625
626 /*
627 * Remove listed tables from the publication.
628 */
629 static void
PublicationDropTables(Oid pubid,List * rels,bool missing_ok)630 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
631 {
632 ObjectAddress obj;
633 ListCell *lc;
634 Oid prid;
635
636 foreach(lc, rels)
637 {
638 Relation rel = (Relation) lfirst(lc);
639 Oid relid = RelationGetRelid(rel);
640
641 prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
642 ObjectIdGetDatum(pubid));
643 if (!OidIsValid(prid))
644 {
645 if (missing_ok)
646 continue;
647
648 ereport(ERROR,
649 (errcode(ERRCODE_UNDEFINED_OBJECT),
650 errmsg("relation \"%s\" is not part of the publication",
651 RelationGetRelationName(rel))));
652 }
653
654 ObjectAddressSet(obj, PublicationRelRelationId, prid);
655 performDeletion(&obj, DROP_CASCADE, 0);
656 }
657 }
658
659 /*
660 * Internal workhorse for changing a publication owner
661 */
662 static void
AlterPublicationOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)663 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
664 {
665 Form_pg_publication form;
666
667 form = (Form_pg_publication) GETSTRUCT(tup);
668
669 if (form->pubowner == newOwnerId)
670 return;
671
672 if (!superuser())
673 {
674 AclResult aclresult;
675
676 /* Must be owner */
677 if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
678 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
679 NameStr(form->pubname));
680
681 /* Must be able to become new owner */
682 check_is_member_of_role(GetUserId(), newOwnerId);
683
684 /* New owner must have CREATE privilege on database */
685 aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
686 if (aclresult != ACLCHECK_OK)
687 aclcheck_error(aclresult, OBJECT_DATABASE,
688 get_database_name(MyDatabaseId));
689
690 if (form->puballtables && !superuser_arg(newOwnerId))
691 ereport(ERROR,
692 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
693 errmsg("permission denied to change owner of publication \"%s\"",
694 NameStr(form->pubname)),
695 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
696 }
697
698 form->pubowner = newOwnerId;
699 CatalogTupleUpdate(rel, &tup->t_self, tup);
700
701 /* Update owner dependency reference */
702 changeDependencyOnOwner(PublicationRelationId,
703 HeapTupleGetOid(tup),
704 newOwnerId);
705
706 InvokeObjectPostAlterHook(PublicationRelationId,
707 HeapTupleGetOid(tup), 0);
708 }
709
710 /*
711 * Change publication owner -- by name
712 */
713 ObjectAddress
AlterPublicationOwner(const char * name,Oid newOwnerId)714 AlterPublicationOwner(const char *name, Oid newOwnerId)
715 {
716 Oid subid;
717 HeapTuple tup;
718 Relation rel;
719 ObjectAddress address;
720
721 rel = heap_open(PublicationRelationId, RowExclusiveLock);
722
723 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
724
725 if (!HeapTupleIsValid(tup))
726 ereport(ERROR,
727 (errcode(ERRCODE_UNDEFINED_OBJECT),
728 errmsg("publication \"%s\" does not exist", name)));
729
730 subid = HeapTupleGetOid(tup);
731
732 AlterPublicationOwner_internal(rel, tup, newOwnerId);
733
734 ObjectAddressSet(address, PublicationRelationId, subid);
735
736 heap_freetuple(tup);
737
738 heap_close(rel, RowExclusiveLock);
739
740 return address;
741 }
742
743 /*
744 * Change publication owner -- by OID
745 */
746 void
AlterPublicationOwner_oid(Oid subid,Oid newOwnerId)747 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
748 {
749 HeapTuple tup;
750 Relation rel;
751
752 rel = heap_open(PublicationRelationId, RowExclusiveLock);
753
754 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
755
756 if (!HeapTupleIsValid(tup))
757 ereport(ERROR,
758 (errcode(ERRCODE_UNDEFINED_OBJECT),
759 errmsg("publication with OID %u does not exist", subid)));
760
761 AlterPublicationOwner_internal(rel, tup, newOwnerId);
762
763 heap_freetuple(tup);
764
765 heap_close(rel, RowExclusiveLock);
766 }
767