1 /*------------------------------------------------------------------------- 2 * 3 * pg_subscription.c 4 * replication subscriptions 5 * 6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * IDENTIFICATION 10 * src/backend/catalog/pg_subscription.c 11 * 12 *------------------------------------------------------------------------- 13 */ 14 15 #include "postgres.h" 16 17 #include "access/genam.h" 18 #include "access/heapam.h" 19 #include "access/htup_details.h" 20 #include "access/tableam.h" 21 #include "access/xact.h" 22 #include "catalog/indexing.h" 23 #include "catalog/pg_subscription.h" 24 #include "catalog/pg_subscription_rel.h" 25 #include "catalog/pg_type.h" 26 #include "miscadmin.h" 27 #include "nodes/makefuncs.h" 28 #include "storage/lmgr.h" 29 #include "utils/array.h" 30 #include "utils/builtins.h" 31 #include "utils/fmgroids.h" 32 #include "utils/pg_lsn.h" 33 #include "utils/rel.h" 34 #include "utils/syscache.h" 35 36 static List *textarray_to_stringlist(ArrayType *textarray); 37 38 /* 39 * Fetch the subscription from the syscache. 40 */ 41 Subscription * 42 GetSubscription(Oid subid, bool missing_ok) 43 { 44 HeapTuple tup; 45 Subscription *sub; 46 Form_pg_subscription subform; 47 Datum datum; 48 bool isnull; 49 50 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); 51 52 if (!HeapTupleIsValid(tup)) 53 { 54 if (missing_ok) 55 return NULL; 56 57 elog(ERROR, "cache lookup failed for subscription %u", subid); 58 } 59 60 subform = (Form_pg_subscription) GETSTRUCT(tup); 61 62 sub = (Subscription *) palloc(sizeof(Subscription)); 63 sub->oid = subid; 64 sub->dbid = subform->subdbid; 65 sub->name = pstrdup(NameStr(subform->subname)); 66 sub->owner = subform->subowner; 67 sub->enabled = subform->subenabled; 68 69 /* Get conninfo */ 70 datum = SysCacheGetAttr(SUBSCRIPTIONOID, 71 tup, 72 Anum_pg_subscription_subconninfo, 73 &isnull); 74 Assert(!isnull); 75 sub->conninfo = TextDatumGetCString(datum); 76 77 /* Get slotname */ 78 datum = SysCacheGetAttr(SUBSCRIPTIONOID, 79 tup, 80 Anum_pg_subscription_subslotname, 81 &isnull); 82 if (!isnull) 83 sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); 84 else 85 sub->slotname = NULL; 86 87 /* Get synccommit */ 88 datum = SysCacheGetAttr(SUBSCRIPTIONOID, 89 tup, 90 Anum_pg_subscription_subsynccommit, 91 &isnull); 92 Assert(!isnull); 93 sub->synccommit = TextDatumGetCString(datum); 94 95 /* Get publications */ 96 datum = SysCacheGetAttr(SUBSCRIPTIONOID, 97 tup, 98 Anum_pg_subscription_subpublications, 99 &isnull); 100 Assert(!isnull); 101 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); 102 103 ReleaseSysCache(tup); 104 105 return sub; 106 } 107 108 /* 109 * Return number of subscriptions defined in given database. 110 * Used by dropdb() to check if database can indeed be dropped. 111 */ 112 int 113 CountDBSubscriptions(Oid dbid) 114 { 115 int nsubs = 0; 116 Relation rel; 117 ScanKeyData scankey; 118 SysScanDesc scan; 119 HeapTuple tup; 120 121 rel = table_open(SubscriptionRelationId, RowExclusiveLock); 122 123 ScanKeyInit(&scankey, 124 Anum_pg_subscription_subdbid, 125 BTEqualStrategyNumber, F_OIDEQ, 126 ObjectIdGetDatum(dbid)); 127 128 scan = systable_beginscan(rel, InvalidOid, false, 129 NULL, 1, &scankey); 130 131 while (HeapTupleIsValid(tup = systable_getnext(scan))) 132 nsubs++; 133 134 systable_endscan(scan); 135 136 table_close(rel, NoLock); 137 138 return nsubs; 139 } 140 141 /* 142 * Free memory allocated by subscription struct. 143 */ 144 void 145 FreeSubscription(Subscription *sub) 146 { 147 pfree(sub->name); 148 pfree(sub->conninfo); 149 if (sub->slotname) 150 pfree(sub->slotname); 151 list_free_deep(sub->publications); 152 pfree(sub); 153 } 154 155 /* 156 * get_subscription_oid - given a subscription name, look up the OID 157 * 158 * If missing_ok is false, throw an error if name not found. If true, just 159 * return InvalidOid. 160 */ 161 Oid 162 get_subscription_oid(const char *subname, bool missing_ok) 163 { 164 Oid oid; 165 166 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid, 167 MyDatabaseId, CStringGetDatum(subname)); 168 if (!OidIsValid(oid) && !missing_ok) 169 ereport(ERROR, 170 (errcode(ERRCODE_UNDEFINED_OBJECT), 171 errmsg("subscription \"%s\" does not exist", subname))); 172 return oid; 173 } 174 175 /* 176 * get_subscription_name - given a subscription OID, look up the name 177 * 178 * If missing_ok is false, throw an error if name not found. If true, just 179 * return NULL. 180 */ 181 char * 182 get_subscription_name(Oid subid, bool missing_ok) 183 { 184 HeapTuple tup; 185 char *subname; 186 Form_pg_subscription subform; 187 188 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); 189 190 if (!HeapTupleIsValid(tup)) 191 { 192 if (!missing_ok) 193 elog(ERROR, "cache lookup failed for subscription %u", subid); 194 return NULL; 195 } 196 197 subform = (Form_pg_subscription) GETSTRUCT(tup); 198 subname = pstrdup(NameStr(subform->subname)); 199 200 ReleaseSysCache(tup); 201 202 return subname; 203 } 204 205 /* 206 * Convert text array to list of strings. 207 * 208 * Note: the resulting list of strings is pallocated here. 209 */ 210 static List * 211 textarray_to_stringlist(ArrayType *textarray) 212 { 213 Datum *elems; 214 int nelems, 215 i; 216 List *res = NIL; 217 218 deconstruct_array(textarray, 219 TEXTOID, -1, false, TYPALIGN_INT, 220 &elems, NULL, &nelems); 221 222 if (nelems == 0) 223 return NIL; 224 225 for (i = 0; i < nelems; i++) 226 res = lappend(res, makeString(TextDatumGetCString(elems[i]))); 227 228 return res; 229 } 230 231 /* 232 * Add new state record for a subscription table. 233 */ 234 void 235 AddSubscriptionRelState(Oid subid, Oid relid, char state, 236 XLogRecPtr sublsn) 237 { 238 Relation rel; 239 HeapTuple tup; 240 bool nulls[Natts_pg_subscription_rel]; 241 Datum values[Natts_pg_subscription_rel]; 242 243 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); 244 245 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); 246 247 /* Try finding existing mapping. */ 248 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, 249 ObjectIdGetDatum(relid), 250 ObjectIdGetDatum(subid)); 251 if (HeapTupleIsValid(tup)) 252 elog(ERROR, "subscription table %u in subscription %u already exists", 253 relid, subid); 254 255 /* Form the tuple. */ 256 memset(values, 0, sizeof(values)); 257 memset(nulls, false, sizeof(nulls)); 258 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); 259 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); 260 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); 261 if (sublsn != InvalidXLogRecPtr) 262 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); 263 else 264 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; 265 266 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); 267 268 /* Insert tuple into catalog. */ 269 CatalogTupleInsert(rel, tup); 270 271 heap_freetuple(tup); 272 273 /* Cleanup. */ 274 table_close(rel, NoLock); 275 } 276 277 /* 278 * Update the state of a subscription table. 279 */ 280 void 281 UpdateSubscriptionRelState(Oid subid, Oid relid, char state, 282 XLogRecPtr sublsn) 283 { 284 Relation rel; 285 HeapTuple tup; 286 bool nulls[Natts_pg_subscription_rel]; 287 Datum values[Natts_pg_subscription_rel]; 288 bool replaces[Natts_pg_subscription_rel]; 289 290 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); 291 292 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); 293 294 /* Try finding existing mapping. */ 295 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, 296 ObjectIdGetDatum(relid), 297 ObjectIdGetDatum(subid)); 298 if (!HeapTupleIsValid(tup)) 299 elog(ERROR, "subscription table %u in subscription %u does not exist", 300 relid, subid); 301 302 /* Update the tuple. */ 303 memset(values, 0, sizeof(values)); 304 memset(nulls, false, sizeof(nulls)); 305 memset(replaces, false, sizeof(replaces)); 306 307 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; 308 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); 309 310 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; 311 if (sublsn != InvalidXLogRecPtr) 312 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); 313 else 314 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; 315 316 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, 317 replaces); 318 319 /* Update the catalog. */ 320 CatalogTupleUpdate(rel, &tup->t_self, tup); 321 322 /* Cleanup. */ 323 table_close(rel, NoLock); 324 } 325 326 /* 327 * Get state of subscription table. 328 * 329 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true. 330 */ 331 char 332 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, 333 bool missing_ok) 334 { 335 Relation rel; 336 HeapTuple tup; 337 char substate; 338 bool isnull; 339 Datum d; 340 341 rel = table_open(SubscriptionRelRelationId, AccessShareLock); 342 343 /* Try finding the mapping. */ 344 tup = SearchSysCache2(SUBSCRIPTIONRELMAP, 345 ObjectIdGetDatum(relid), 346 ObjectIdGetDatum(subid)); 347 348 if (!HeapTupleIsValid(tup)) 349 { 350 if (missing_ok) 351 { 352 table_close(rel, AccessShareLock); 353 *sublsn = InvalidXLogRecPtr; 354 return SUBREL_STATE_UNKNOWN; 355 } 356 357 elog(ERROR, "subscription table %u in subscription %u does not exist", 358 relid, subid); 359 } 360 361 /* Get the state. */ 362 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, 363 Anum_pg_subscription_rel_srsubstate, &isnull); 364 Assert(!isnull); 365 substate = DatumGetChar(d); 366 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, 367 Anum_pg_subscription_rel_srsublsn, &isnull); 368 if (isnull) 369 *sublsn = InvalidXLogRecPtr; 370 else 371 *sublsn = DatumGetLSN(d); 372 373 /* Cleanup */ 374 ReleaseSysCache(tup); 375 table_close(rel, AccessShareLock); 376 377 return substate; 378 } 379 380 /* 381 * Drop subscription relation mapping. These can be for a particular 382 * subscription, or for a particular relation, or both. 383 */ 384 void 385 RemoveSubscriptionRel(Oid subid, Oid relid) 386 { 387 Relation rel; 388 TableScanDesc scan; 389 ScanKeyData skey[2]; 390 HeapTuple tup; 391 int nkeys = 0; 392 393 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); 394 395 if (OidIsValid(subid)) 396 { 397 ScanKeyInit(&skey[nkeys++], 398 Anum_pg_subscription_rel_srsubid, 399 BTEqualStrategyNumber, 400 F_OIDEQ, 401 ObjectIdGetDatum(subid)); 402 } 403 404 if (OidIsValid(relid)) 405 { 406 ScanKeyInit(&skey[nkeys++], 407 Anum_pg_subscription_rel_srrelid, 408 BTEqualStrategyNumber, 409 F_OIDEQ, 410 ObjectIdGetDatum(relid)); 411 } 412 413 /* Do the search and delete what we found. */ 414 scan = table_beginscan_catalog(rel, nkeys, skey); 415 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) 416 { 417 CatalogTupleDelete(rel, &tup->t_self); 418 } 419 table_endscan(scan); 420 421 table_close(rel, RowExclusiveLock); 422 } 423 424 425 /* 426 * Get all relations for subscription. 427 * 428 * Returned list is palloc'ed in current memory context. 429 */ 430 List * 431 GetSubscriptionRelations(Oid subid) 432 { 433 List *res = NIL; 434 Relation rel; 435 HeapTuple tup; 436 int nkeys = 0; 437 ScanKeyData skey[2]; 438 SysScanDesc scan; 439 440 rel = table_open(SubscriptionRelRelationId, AccessShareLock); 441 442 ScanKeyInit(&skey[nkeys++], 443 Anum_pg_subscription_rel_srsubid, 444 BTEqualStrategyNumber, F_OIDEQ, 445 ObjectIdGetDatum(subid)); 446 447 scan = systable_beginscan(rel, InvalidOid, false, 448 NULL, nkeys, skey); 449 450 while (HeapTupleIsValid(tup = systable_getnext(scan))) 451 { 452 Form_pg_subscription_rel subrel; 453 SubscriptionRelState *relstate; 454 Datum d; 455 bool isnull; 456 457 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); 458 459 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); 460 relstate->relid = subrel->srrelid; 461 relstate->state = subrel->srsubstate; 462 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, 463 Anum_pg_subscription_rel_srsublsn, &isnull); 464 if (isnull) 465 relstate->lsn = InvalidXLogRecPtr; 466 else 467 relstate->lsn = DatumGetLSN(d); 468 469 res = lappend(res, relstate); 470 } 471 472 /* Cleanup */ 473 systable_endscan(scan); 474 table_close(rel, AccessShareLock); 475 476 return res; 477 } 478 479 /* 480 * Get all relations for subscription that are not in a ready state. 481 * 482 * Returned list is palloc'ed in current memory context. 483 */ 484 List * 485 GetSubscriptionNotReadyRelations(Oid subid) 486 { 487 List *res = NIL; 488 Relation rel; 489 HeapTuple tup; 490 int nkeys = 0; 491 ScanKeyData skey[2]; 492 SysScanDesc scan; 493 494 rel = table_open(SubscriptionRelRelationId, AccessShareLock); 495 496 ScanKeyInit(&skey[nkeys++], 497 Anum_pg_subscription_rel_srsubid, 498 BTEqualStrategyNumber, F_OIDEQ, 499 ObjectIdGetDatum(subid)); 500 501 ScanKeyInit(&skey[nkeys++], 502 Anum_pg_subscription_rel_srsubstate, 503 BTEqualStrategyNumber, F_CHARNE, 504 CharGetDatum(SUBREL_STATE_READY)); 505 506 scan = systable_beginscan(rel, InvalidOid, false, 507 NULL, nkeys, skey); 508 509 while (HeapTupleIsValid(tup = systable_getnext(scan))) 510 { 511 Form_pg_subscription_rel subrel; 512 SubscriptionRelState *relstate; 513 Datum d; 514 bool isnull; 515 516 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); 517 518 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); 519 relstate->relid = subrel->srrelid; 520 relstate->state = subrel->srsubstate; 521 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, 522 Anum_pg_subscription_rel_srsublsn, &isnull); 523 if (isnull) 524 relstate->lsn = InvalidXLogRecPtr; 525 else 526 relstate->lsn = DatumGetLSN(d); 527 528 res = lappend(res, relstate); 529 } 530 531 /* Cleanup */ 532 systable_endscan(scan); 533 table_close(rel, AccessShareLock); 534 535 return res; 536 } 537