1 /*------------------------------------------------------------------------- 2 * 3 * pg_publication.c 4 * publication C API manipulation 5 * 6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * IDENTIFICATION 10 * pg_publication.c 11 * 12 *------------------------------------------------------------------------- 13 */ 14 15 #include "postgres.h" 16 17 #include "access/genam.h" 18 #include "access/heapam.h" 19 #include "access/htup_details.h" 20 #include "access/tableam.h" 21 #include "access/xact.h" 22 #include "catalog/catalog.h" 23 #include "catalog/dependency.h" 24 #include "catalog/index.h" 25 #include "catalog/indexing.h" 26 #include "catalog/namespace.h" 27 #include "catalog/partition.h" 28 #include "catalog/objectaccess.h" 29 #include "catalog/objectaddress.h" 30 #include "catalog/pg_inherits.h" 31 #include "catalog/pg_publication.h" 32 #include "catalog/pg_publication_rel.h" 33 #include "catalog/pg_type.h" 34 #include "commands/publicationcmds.h" 35 #include "funcapi.h" 36 #include "miscadmin.h" 37 #include "utils/array.h" 38 #include "utils/builtins.h" 39 #include "utils/catcache.h" 40 #include "utils/fmgroids.h" 41 #include "utils/inval.h" 42 #include "utils/lsyscache.h" 43 #include "utils/rel.h" 44 #include "utils/syscache.h" 45 46 /* 47 * Check if relation can be in given publication and throws appropriate 48 * error if not. 49 */ 50 static void 51 check_publication_add_relation(Relation targetrel) 52 { 53 /* Must be a regular or partitioned table */ 54 if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && 55 RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) 56 ereport(ERROR, 57 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 58 errmsg("\"%s\" is not a table", 59 RelationGetRelationName(targetrel)), 60 errdetail("Only tables can be added to publications."))); 61 62 /* Can't be system table */ 63 if (IsCatalogRelation(targetrel)) 64 ereport(ERROR, 65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 66 errmsg("\"%s\" is a system table", 67 RelationGetRelationName(targetrel)), 68 errdetail("System tables cannot be added to publications."))); 69 70 /* UNLOGGED and TEMP relations cannot be part of publication. */ 71 if (targetrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT) 72 ereport(ERROR, 73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 74 errmsg("table \"%s\" cannot be replicated", 75 RelationGetRelationName(targetrel)), 76 errdetail("Temporary and unlogged relations cannot be replicated."))); 77 } 78 79 /* 80 * Returns if relation represented by oid and Form_pg_class entry 81 * is publishable. 82 * 83 * Does same checks as the above, but does not need relation to be opened 84 * and also does not throw errors. 85 * 86 * XXX This also excludes all tables with relid < FirstNormalObjectId, 87 * ie all tables created during initdb. This mainly affects the preinstalled 88 * information_schema. IsCatalogRelationOid() only excludes tables with 89 * relid < FirstBootstrapObjectId, making that test rather redundant, 90 * but really we should get rid of the FirstNormalObjectId test not 91 * IsCatalogRelationOid. We can't do so today because we don't want 92 * information_schema tables to be considered publishable; but this test 93 * is really inadequate for that, since the information_schema could be 94 * dropped and reloaded and then it'll be considered publishable. The best 95 * long-term solution may be to add a "relispublishable" bool to pg_class, 96 * and depend on that instead of OID checks. 97 */ 98 static bool 99 is_publishable_class(Oid relid, Form_pg_class reltuple) 100 { 101 return (reltuple->relkind == RELKIND_RELATION || 102 reltuple->relkind == RELKIND_PARTITIONED_TABLE) && 103 !IsCatalogRelationOid(relid) && 104 reltuple->relpersistence == RELPERSISTENCE_PERMANENT && 105 relid >= FirstNormalObjectId; 106 } 107 108 /* 109 * Another variant of this, taking a Relation. 110 */ 111 bool 112 is_publishable_relation(Relation rel) 113 { 114 return is_publishable_class(RelationGetRelid(rel), rel->rd_rel); 115 } 116 117 118 /* 119 * SQL-callable variant of the above 120 * 121 * This returns null when the relation does not exist. This is intended to be 122 * used for example in psql to avoid gratuitous errors when there are 123 * concurrent catalog changes. 124 */ 125 Datum 126 pg_relation_is_publishable(PG_FUNCTION_ARGS) 127 { 128 Oid relid = PG_GETARG_OID(0); 129 HeapTuple tuple; 130 bool result; 131 132 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); 133 if (!HeapTupleIsValid(tuple)) 134 PG_RETURN_NULL(); 135 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); 136 ReleaseSysCache(tuple); 137 PG_RETURN_BOOL(result); 138 } 139 140 /* 141 * Gets the relations based on the publication partition option for a specified 142 * relation. 143 */ 144 List * 145 GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, 146 Oid relid) 147 { 148 if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE && 149 pub_partopt != PUBLICATION_PART_ROOT) 150 { 151 List *all_parts = find_all_inheritors(relid, NoLock, 152 NULL); 153 154 if (pub_partopt == PUBLICATION_PART_ALL) 155 result = list_concat(result, all_parts); 156 else if (pub_partopt == PUBLICATION_PART_LEAF) 157 { 158 ListCell *lc; 159 160 foreach(lc, all_parts) 161 { 162 Oid partOid = lfirst_oid(lc); 163 164 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) 165 result = lappend_oid(result, partOid); 166 } 167 } 168 else 169 Assert(false); 170 } 171 else 172 result = lappend_oid(result, relid); 173 174 return result; 175 } 176 177 /* 178 * Insert new publication / relation mapping. 179 */ 180 ObjectAddress 181 publication_add_relation(Oid pubid, Relation targetrel, 182 bool if_not_exists) 183 { 184 Relation rel; 185 HeapTuple tup; 186 Datum values[Natts_pg_publication_rel]; 187 bool nulls[Natts_pg_publication_rel]; 188 Oid relid = RelationGetRelid(targetrel); 189 Oid prrelid; 190 Publication *pub = GetPublication(pubid); 191 ObjectAddress myself, 192 referenced; 193 List *relids = NIL; 194 195 rel = table_open(PublicationRelRelationId, RowExclusiveLock); 196 197 /* 198 * Check for duplicates. Note that this does not really prevent 199 * duplicates, it's here just to provide nicer error message in common 200 * case. The real protection is the unique key on the catalog. 201 */ 202 if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), 203 ObjectIdGetDatum(pubid))) 204 { 205 table_close(rel, RowExclusiveLock); 206 207 if (if_not_exists) 208 return InvalidObjectAddress; 209 210 ereport(ERROR, 211 (errcode(ERRCODE_DUPLICATE_OBJECT), 212 errmsg("relation \"%s\" is already member of publication \"%s\"", 213 RelationGetRelationName(targetrel), pub->name))); 214 } 215 216 check_publication_add_relation(targetrel); 217 218 /* Form a tuple. */ 219 memset(values, 0, sizeof(values)); 220 memset(nulls, false, sizeof(nulls)); 221 222 prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, 223 Anum_pg_publication_rel_oid); 224 values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid); 225 values[Anum_pg_publication_rel_prpubid - 1] = 226 ObjectIdGetDatum(pubid); 227 values[Anum_pg_publication_rel_prrelid - 1] = 228 ObjectIdGetDatum(relid); 229 230 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); 231 232 /* Insert tuple into catalog. */ 233 CatalogTupleInsert(rel, tup); 234 heap_freetuple(tup); 235 236 ObjectAddressSet(myself, PublicationRelRelationId, prrelid); 237 238 /* Add dependency on the publication */ 239 ObjectAddressSet(referenced, PublicationRelationId, pubid); 240 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); 241 242 /* Add dependency on the relation */ 243 ObjectAddressSet(referenced, RelationRelationId, relid); 244 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); 245 246 /* Close the table. */ 247 table_close(rel, RowExclusiveLock); 248 249 /* 250 * Invalidate relcache so that publication info is rebuilt. 251 * 252 * For the partitioned tables, we must invalidate all partitions contained 253 * in the respective partition hierarchies, not just the one explicitly 254 * mentioned in the publication. This is required because we implicitly 255 * publish the child tables when the parent table is published. 256 */ 257 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, 258 relid); 259 260 InvalidatePublicationRels(relids); 261 262 return myself; 263 } 264 265 /* Gets list of publication oids for a relation */ 266 List * 267 GetRelationPublications(Oid relid) 268 { 269 List *result = NIL; 270 CatCList *pubrellist; 271 int i; 272 273 /* Find all publications associated with the relation. */ 274 pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, 275 ObjectIdGetDatum(relid)); 276 for (i = 0; i < pubrellist->n_members; i++) 277 { 278 HeapTuple tup = &pubrellist->members[i]->tuple; 279 Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; 280 281 result = lappend_oid(result, pubid); 282 } 283 284 ReleaseSysCacheList(pubrellist); 285 286 return result; 287 } 288 289 /* 290 * Gets list of relation oids for a publication. 291 * 292 * This should only be used FOR TABLE publications, the FOR ALL TABLES 293 * should use GetAllTablesPublicationRelations(). 294 */ 295 List * 296 GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) 297 { 298 List *result; 299 Relation pubrelsrel; 300 ScanKeyData scankey; 301 SysScanDesc scan; 302 HeapTuple tup; 303 304 /* Find all publications associated with the relation. */ 305 pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); 306 307 ScanKeyInit(&scankey, 308 Anum_pg_publication_rel_prpubid, 309 BTEqualStrategyNumber, F_OIDEQ, 310 ObjectIdGetDatum(pubid)); 311 312 scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, 313 true, NULL, 1, &scankey); 314 315 result = NIL; 316 while (HeapTupleIsValid(tup = systable_getnext(scan))) 317 { 318 Form_pg_publication_rel pubrel; 319 320 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); 321 result = GetPubPartitionOptionRelations(result, pub_partopt, 322 pubrel->prrelid); 323 } 324 325 systable_endscan(scan); 326 table_close(pubrelsrel, AccessShareLock); 327 328 return result; 329 } 330 331 /* 332 * Gets list of publication oids for publications marked as FOR ALL TABLES. 333 */ 334 List * 335 GetAllTablesPublications(void) 336 { 337 List *result; 338 Relation rel; 339 ScanKeyData scankey; 340 SysScanDesc scan; 341 HeapTuple tup; 342 343 /* Find all publications that are marked as for all tables. */ 344 rel = table_open(PublicationRelationId, AccessShareLock); 345 346 ScanKeyInit(&scankey, 347 Anum_pg_publication_puballtables, 348 BTEqualStrategyNumber, F_BOOLEQ, 349 BoolGetDatum(true)); 350 351 scan = systable_beginscan(rel, InvalidOid, false, 352 NULL, 1, &scankey); 353 354 result = NIL; 355 while (HeapTupleIsValid(tup = systable_getnext(scan))) 356 { 357 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; 358 359 result = lappend_oid(result, oid); 360 } 361 362 systable_endscan(scan); 363 table_close(rel, AccessShareLock); 364 365 return result; 366 } 367 368 /* 369 * Gets list of all relation published by FOR ALL TABLES publication(s). 370 * 371 * If the publication publishes partition changes via their respective root 372 * partitioned tables, we must exclude partitions in favor of including the 373 * root partitioned tables. 374 */ 375 List * 376 GetAllTablesPublicationRelations(bool pubviaroot) 377 { 378 Relation classRel; 379 ScanKeyData key[1]; 380 TableScanDesc scan; 381 HeapTuple tuple; 382 List *result = NIL; 383 384 classRel = table_open(RelationRelationId, AccessShareLock); 385 386 ScanKeyInit(&key[0], 387 Anum_pg_class_relkind, 388 BTEqualStrategyNumber, F_CHAREQ, 389 CharGetDatum(RELKIND_RELATION)); 390 391 scan = table_beginscan_catalog(classRel, 1, key); 392 393 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) 394 { 395 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); 396 Oid relid = relForm->oid; 397 398 if (is_publishable_class(relid, relForm) && 399 !(relForm->relispartition && pubviaroot)) 400 result = lappend_oid(result, relid); 401 } 402 403 table_endscan(scan); 404 405 if (pubviaroot) 406 { 407 ScanKeyInit(&key[0], 408 Anum_pg_class_relkind, 409 BTEqualStrategyNumber, F_CHAREQ, 410 CharGetDatum(RELKIND_PARTITIONED_TABLE)); 411 412 scan = table_beginscan_catalog(classRel, 1, key); 413 414 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) 415 { 416 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); 417 Oid relid = relForm->oid; 418 419 if (is_publishable_class(relid, relForm) && 420 !relForm->relispartition) 421 result = lappend_oid(result, relid); 422 } 423 424 table_endscan(scan); 425 } 426 427 table_close(classRel, AccessShareLock); 428 return result; 429 } 430 431 /* 432 * Get publication using oid 433 * 434 * The Publication struct and its data are palloc'ed here. 435 */ 436 Publication * 437 GetPublication(Oid pubid) 438 { 439 HeapTuple tup; 440 Publication *pub; 441 Form_pg_publication pubform; 442 443 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); 444 if (!HeapTupleIsValid(tup)) 445 elog(ERROR, "cache lookup failed for publication %u", pubid); 446 447 pubform = (Form_pg_publication) GETSTRUCT(tup); 448 449 pub = (Publication *) palloc(sizeof(Publication)); 450 pub->oid = pubid; 451 pub->name = pstrdup(NameStr(pubform->pubname)); 452 pub->alltables = pubform->puballtables; 453 pub->pubactions.pubinsert = pubform->pubinsert; 454 pub->pubactions.pubupdate = pubform->pubupdate; 455 pub->pubactions.pubdelete = pubform->pubdelete; 456 pub->pubactions.pubtruncate = pubform->pubtruncate; 457 pub->pubviaroot = pubform->pubviaroot; 458 459 ReleaseSysCache(tup); 460 461 return pub; 462 } 463 464 465 /* 466 * Get Publication using name. 467 */ 468 Publication * 469 GetPublicationByName(const char *pubname, bool missing_ok) 470 { 471 Oid oid; 472 473 oid = get_publication_oid(pubname, missing_ok); 474 475 return OidIsValid(oid) ? GetPublication(oid) : NULL; 476 } 477 478 /* 479 * get_publication_oid - given a publication name, look up the OID 480 * 481 * If missing_ok is false, throw an error if name not found. If true, just 482 * return InvalidOid. 483 */ 484 Oid 485 get_publication_oid(const char *pubname, bool missing_ok) 486 { 487 Oid oid; 488 489 oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid, 490 CStringGetDatum(pubname)); 491 if (!OidIsValid(oid) && !missing_ok) 492 ereport(ERROR, 493 (errcode(ERRCODE_UNDEFINED_OBJECT), 494 errmsg("publication \"%s\" does not exist", pubname))); 495 return oid; 496 } 497 498 /* 499 * get_publication_name - given a publication Oid, look up the name 500 * 501 * If missing_ok is false, throw an error if name not found. If true, just 502 * return NULL. 503 */ 504 char * 505 get_publication_name(Oid pubid, bool missing_ok) 506 { 507 HeapTuple tup; 508 char *pubname; 509 Form_pg_publication pubform; 510 511 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); 512 513 if (!HeapTupleIsValid(tup)) 514 { 515 if (!missing_ok) 516 elog(ERROR, "cache lookup failed for publication %u", pubid); 517 return NULL; 518 } 519 520 pubform = (Form_pg_publication) GETSTRUCT(tup); 521 pubname = pstrdup(NameStr(pubform->pubname)); 522 523 ReleaseSysCache(tup); 524 525 return pubname; 526 } 527 528 /* 529 * Returns Oids of tables in a publication. 530 */ 531 Datum 532 pg_get_publication_tables(PG_FUNCTION_ARGS) 533 { 534 FuncCallContext *funcctx; 535 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); 536 Publication *publication; 537 List *tables; 538 539 /* stuff done only on the first call of the function */ 540 if (SRF_IS_FIRSTCALL()) 541 { 542 MemoryContext oldcontext; 543 544 /* create a function context for cross-call persistence */ 545 funcctx = SRF_FIRSTCALL_INIT(); 546 547 /* switch to memory context appropriate for multiple function calls */ 548 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); 549 550 publication = GetPublicationByName(pubname, false); 551 552 /* 553 * Publications support partitioned tables, although all changes are 554 * replicated using leaf partition identity and schema, so we only 555 * need those. 556 */ 557 if (publication->alltables) 558 tables = GetAllTablesPublicationRelations(publication->pubviaroot); 559 else 560 tables = GetPublicationRelations(publication->oid, 561 publication->pubviaroot ? 562 PUBLICATION_PART_ROOT : 563 PUBLICATION_PART_LEAF); 564 funcctx->user_fctx = (void *) tables; 565 566 MemoryContextSwitchTo(oldcontext); 567 } 568 569 /* stuff done on every call of the function */ 570 funcctx = SRF_PERCALL_SETUP(); 571 tables = (List *) funcctx->user_fctx; 572 573 if (funcctx->call_cntr < list_length(tables)) 574 { 575 Oid relid = list_nth_oid(tables, funcctx->call_cntr); 576 577 SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); 578 } 579 580 SRF_RETURN_DONE(funcctx); 581 } 582