1 /*-------------------------------------------------------------------------
2 *
3 * pg_publication.c
4 * publication C API manipulation
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * pg_publication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/partition.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/objectaddress.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_publication.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45
46 /*
47 * Check if relation can be in given publication and throws appropriate
48 * error if not.
49 */
50 static void
check_publication_add_relation(Relation targetrel)51 check_publication_add_relation(Relation targetrel)
52 {
53 /* Must be a regular or partitioned table */
54 if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
55 RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
56 ereport(ERROR,
57 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
58 errmsg("\"%s\" is not a table",
59 RelationGetRelationName(targetrel)),
60 errdetail("Only tables can be added to publications.")));
61
62 /* Can't be system table */
63 if (IsCatalogRelation(targetrel))
64 ereport(ERROR,
65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 errmsg("\"%s\" is a system table",
67 RelationGetRelationName(targetrel)),
68 errdetail("System tables cannot be added to publications.")));
69
70 /* UNLOGGED and TEMP relations cannot be part of publication. */
71 if (!RelationIsPermanent(targetrel))
72 ereport(ERROR,
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg("table \"%s\" cannot be replicated",
75 RelationGetRelationName(targetrel)),
76 errdetail("Temporary and unlogged relations cannot be replicated.")));
77 }
78
79 /*
80 * Returns if relation represented by oid and Form_pg_class entry
81 * is publishable.
82 *
83 * Does same checks as the above, but does not need relation to be opened
84 * and also does not throw errors.
85 *
86 * XXX This also excludes all tables with relid < FirstNormalObjectId,
87 * ie all tables created during initdb. This mainly affects the preinstalled
88 * information_schema. IsCatalogRelationOid() only excludes tables with
89 * relid < FirstBootstrapObjectId, making that test rather redundant,
90 * but really we should get rid of the FirstNormalObjectId test not
91 * IsCatalogRelationOid. We can't do so today because we don't want
92 * information_schema tables to be considered publishable; but this test
93 * is really inadequate for that, since the information_schema could be
94 * dropped and reloaded and then it'll be considered publishable. The best
95 * long-term solution may be to add a "relispublishable" bool to pg_class,
96 * and depend on that instead of OID checks.
97 */
98 static bool
is_publishable_class(Oid relid,Form_pg_class reltuple)99 is_publishable_class(Oid relid, Form_pg_class reltuple)
100 {
101 return (reltuple->relkind == RELKIND_RELATION ||
102 reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
103 !IsCatalogRelationOid(relid) &&
104 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 relid >= FirstNormalObjectId;
106 }
107
108 /*
109 * Another variant of this, taking a Relation.
110 */
111 bool
is_publishable_relation(Relation rel)112 is_publishable_relation(Relation rel)
113 {
114 return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
115 }
116
117
118 /*
119 * SQL-callable variant of the above
120 *
121 * This returns null when the relation does not exist. This is intended to be
122 * used for example in psql to avoid gratuitous errors when there are
123 * concurrent catalog changes.
124 */
125 Datum
pg_relation_is_publishable(PG_FUNCTION_ARGS)126 pg_relation_is_publishable(PG_FUNCTION_ARGS)
127 {
128 Oid relid = PG_GETARG_OID(0);
129 HeapTuple tuple;
130 bool result;
131
132 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133 if (!HeapTupleIsValid(tuple))
134 PG_RETURN_NULL();
135 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136 ReleaseSysCache(tuple);
137 PG_RETURN_BOOL(result);
138 }
139
140 /*
141 * Gets the relations based on the publication partition option for a specified
142 * relation.
143 */
144 List *
GetPubPartitionOptionRelations(List * result,PublicationPartOpt pub_partopt,Oid relid)145 GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
146 Oid relid)
147 {
148 if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
149 pub_partopt != PUBLICATION_PART_ROOT)
150 {
151 List *all_parts = find_all_inheritors(relid, NoLock,
152 NULL);
153
154 if (pub_partopt == PUBLICATION_PART_ALL)
155 result = list_concat(result, all_parts);
156 else if (pub_partopt == PUBLICATION_PART_LEAF)
157 {
158 ListCell *lc;
159
160 foreach(lc, all_parts)
161 {
162 Oid partOid = lfirst_oid(lc);
163
164 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
165 result = lappend_oid(result, partOid);
166 }
167 }
168 else
169 Assert(false);
170 }
171 else
172 result = lappend_oid(result, relid);
173
174 return result;
175 }
176
177 /*
178 * Insert new publication / relation mapping.
179 */
180 ObjectAddress
publication_add_relation(Oid pubid,Relation targetrel,bool if_not_exists)181 publication_add_relation(Oid pubid, Relation targetrel,
182 bool if_not_exists)
183 {
184 Relation rel;
185 HeapTuple tup;
186 Datum values[Natts_pg_publication_rel];
187 bool nulls[Natts_pg_publication_rel];
188 Oid relid = RelationGetRelid(targetrel);
189 Oid prrelid;
190 Publication *pub = GetPublication(pubid);
191 ObjectAddress myself,
192 referenced;
193 List *relids = NIL;
194
195 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
196
197 /*
198 * Check for duplicates. Note that this does not really prevent
199 * duplicates, it's here just to provide nicer error message in common
200 * case. The real protection is the unique key on the catalog.
201 */
202 if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
203 ObjectIdGetDatum(pubid)))
204 {
205 table_close(rel, RowExclusiveLock);
206
207 if (if_not_exists)
208 return InvalidObjectAddress;
209
210 ereport(ERROR,
211 (errcode(ERRCODE_DUPLICATE_OBJECT),
212 errmsg("relation \"%s\" is already member of publication \"%s\"",
213 RelationGetRelationName(targetrel), pub->name)));
214 }
215
216 check_publication_add_relation(targetrel);
217
218 /* Form a tuple. */
219 memset(values, 0, sizeof(values));
220 memset(nulls, false, sizeof(nulls));
221
222 prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
223 Anum_pg_publication_rel_oid);
224 values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
225 values[Anum_pg_publication_rel_prpubid - 1] =
226 ObjectIdGetDatum(pubid);
227 values[Anum_pg_publication_rel_prrelid - 1] =
228 ObjectIdGetDatum(relid);
229
230 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
231
232 /* Insert tuple into catalog. */
233 CatalogTupleInsert(rel, tup);
234 heap_freetuple(tup);
235
236 ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
237
238 /* Add dependency on the publication */
239 ObjectAddressSet(referenced, PublicationRelationId, pubid);
240 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
241
242 /* Add dependency on the relation */
243 ObjectAddressSet(referenced, RelationRelationId, relid);
244 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
245
246 /* Close the table. */
247 table_close(rel, RowExclusiveLock);
248
249 /*
250 * Invalidate relcache so that publication info is rebuilt.
251 *
252 * For the partitioned tables, we must invalidate all partitions contained
253 * in the respective partition hierarchies, not just the one explicitly
254 * mentioned in the publication. This is required because we implicitly
255 * publish the child tables when the parent table is published.
256 */
257 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
258 relid);
259
260 InvalidatePublicationRels(relids);
261
262 return myself;
263 }
264
265 /* Gets list of publication oids for a relation */
266 List *
GetRelationPublications(Oid relid)267 GetRelationPublications(Oid relid)
268 {
269 List *result = NIL;
270 CatCList *pubrellist;
271 int i;
272
273 /* Find all publications associated with the relation. */
274 pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
275 ObjectIdGetDatum(relid));
276 for (i = 0; i < pubrellist->n_members; i++)
277 {
278 HeapTuple tup = &pubrellist->members[i]->tuple;
279 Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
280
281 result = lappend_oid(result, pubid);
282 }
283
284 ReleaseSysCacheList(pubrellist);
285
286 return result;
287 }
288
289 /*
290 * Gets list of relation oids for a publication.
291 *
292 * This should only be used FOR TABLE publications, the FOR ALL TABLES
293 * should use GetAllTablesPublicationRelations().
294 */
295 List *
GetPublicationRelations(Oid pubid,PublicationPartOpt pub_partopt)296 GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
297 {
298 List *result;
299 Relation pubrelsrel;
300 ScanKeyData scankey;
301 SysScanDesc scan;
302 HeapTuple tup;
303
304 /* Find all publications associated with the relation. */
305 pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
306
307 ScanKeyInit(&scankey,
308 Anum_pg_publication_rel_prpubid,
309 BTEqualStrategyNumber, F_OIDEQ,
310 ObjectIdGetDatum(pubid));
311
312 scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
313 true, NULL, 1, &scankey);
314
315 result = NIL;
316 while (HeapTupleIsValid(tup = systable_getnext(scan)))
317 {
318 Form_pg_publication_rel pubrel;
319
320 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
321 result = GetPubPartitionOptionRelations(result, pub_partopt,
322 pubrel->prrelid);
323 }
324
325 systable_endscan(scan);
326 table_close(pubrelsrel, AccessShareLock);
327
328 return result;
329 }
330
331 /*
332 * Gets list of publication oids for publications marked as FOR ALL TABLES.
333 */
334 List *
GetAllTablesPublications(void)335 GetAllTablesPublications(void)
336 {
337 List *result;
338 Relation rel;
339 ScanKeyData scankey;
340 SysScanDesc scan;
341 HeapTuple tup;
342
343 /* Find all publications that are marked as for all tables. */
344 rel = table_open(PublicationRelationId, AccessShareLock);
345
346 ScanKeyInit(&scankey,
347 Anum_pg_publication_puballtables,
348 BTEqualStrategyNumber, F_BOOLEQ,
349 BoolGetDatum(true));
350
351 scan = systable_beginscan(rel, InvalidOid, false,
352 NULL, 1, &scankey);
353
354 result = NIL;
355 while (HeapTupleIsValid(tup = systable_getnext(scan)))
356 {
357 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
358
359 result = lappend_oid(result, oid);
360 }
361
362 systable_endscan(scan);
363 table_close(rel, AccessShareLock);
364
365 return result;
366 }
367
368 /*
369 * Gets list of all relation published by FOR ALL TABLES publication(s).
370 *
371 * If the publication publishes partition changes via their respective root
372 * partitioned tables, we must exclude partitions in favor of including the
373 * root partitioned tables.
374 */
375 List *
GetAllTablesPublicationRelations(bool pubviaroot)376 GetAllTablesPublicationRelations(bool pubviaroot)
377 {
378 Relation classRel;
379 ScanKeyData key[1];
380 TableScanDesc scan;
381 HeapTuple tuple;
382 List *result = NIL;
383
384 classRel = table_open(RelationRelationId, AccessShareLock);
385
386 ScanKeyInit(&key[0],
387 Anum_pg_class_relkind,
388 BTEqualStrategyNumber, F_CHAREQ,
389 CharGetDatum(RELKIND_RELATION));
390
391 scan = table_beginscan_catalog(classRel, 1, key);
392
393 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
394 {
395 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
396 Oid relid = relForm->oid;
397
398 if (is_publishable_class(relid, relForm) &&
399 !(relForm->relispartition && pubviaroot))
400 result = lappend_oid(result, relid);
401 }
402
403 table_endscan(scan);
404
405 if (pubviaroot)
406 {
407 ScanKeyInit(&key[0],
408 Anum_pg_class_relkind,
409 BTEqualStrategyNumber, F_CHAREQ,
410 CharGetDatum(RELKIND_PARTITIONED_TABLE));
411
412 scan = table_beginscan_catalog(classRel, 1, key);
413
414 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
415 {
416 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
417 Oid relid = relForm->oid;
418
419 if (is_publishable_class(relid, relForm) &&
420 !relForm->relispartition)
421 result = lappend_oid(result, relid);
422 }
423
424 table_endscan(scan);
425 }
426
427 table_close(classRel, AccessShareLock);
428 return result;
429 }
430
431 /*
432 * Get publication using oid
433 *
434 * The Publication struct and its data are palloc'ed here.
435 */
436 Publication *
GetPublication(Oid pubid)437 GetPublication(Oid pubid)
438 {
439 HeapTuple tup;
440 Publication *pub;
441 Form_pg_publication pubform;
442
443 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
444 if (!HeapTupleIsValid(tup))
445 elog(ERROR, "cache lookup failed for publication %u", pubid);
446
447 pubform = (Form_pg_publication) GETSTRUCT(tup);
448
449 pub = (Publication *) palloc(sizeof(Publication));
450 pub->oid = pubid;
451 pub->name = pstrdup(NameStr(pubform->pubname));
452 pub->alltables = pubform->puballtables;
453 pub->pubactions.pubinsert = pubform->pubinsert;
454 pub->pubactions.pubupdate = pubform->pubupdate;
455 pub->pubactions.pubdelete = pubform->pubdelete;
456 pub->pubactions.pubtruncate = pubform->pubtruncate;
457 pub->pubviaroot = pubform->pubviaroot;
458
459 ReleaseSysCache(tup);
460
461 return pub;
462 }
463
464
465 /*
466 * Get Publication using name.
467 */
468 Publication *
GetPublicationByName(const char * pubname,bool missing_ok)469 GetPublicationByName(const char *pubname, bool missing_ok)
470 {
471 Oid oid;
472
473 oid = get_publication_oid(pubname, missing_ok);
474
475 return OidIsValid(oid) ? GetPublication(oid) : NULL;
476 }
477
478 /*
479 * get_publication_oid - given a publication name, look up the OID
480 *
481 * If missing_ok is false, throw an error if name not found. If true, just
482 * return InvalidOid.
483 */
484 Oid
get_publication_oid(const char * pubname,bool missing_ok)485 get_publication_oid(const char *pubname, bool missing_ok)
486 {
487 Oid oid;
488
489 oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
490 CStringGetDatum(pubname));
491 if (!OidIsValid(oid) && !missing_ok)
492 ereport(ERROR,
493 (errcode(ERRCODE_UNDEFINED_OBJECT),
494 errmsg("publication \"%s\" does not exist", pubname)));
495 return oid;
496 }
497
498 /*
499 * get_publication_name - given a publication Oid, look up the name
500 *
501 * If missing_ok is false, throw an error if name not found. If true, just
502 * return NULL.
503 */
504 char *
get_publication_name(Oid pubid,bool missing_ok)505 get_publication_name(Oid pubid, bool missing_ok)
506 {
507 HeapTuple tup;
508 char *pubname;
509 Form_pg_publication pubform;
510
511 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
512
513 if (!HeapTupleIsValid(tup))
514 {
515 if (!missing_ok)
516 elog(ERROR, "cache lookup failed for publication %u", pubid);
517 return NULL;
518 }
519
520 pubform = (Form_pg_publication) GETSTRUCT(tup);
521 pubname = pstrdup(NameStr(pubform->pubname));
522
523 ReleaseSysCache(tup);
524
525 return pubname;
526 }
527
528 /*
529 * Returns Oids of tables in a publication.
530 */
531 Datum
pg_get_publication_tables(PG_FUNCTION_ARGS)532 pg_get_publication_tables(PG_FUNCTION_ARGS)
533 {
534 FuncCallContext *funcctx;
535 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
536 Publication *publication;
537 List *tables;
538
539 /* stuff done only on the first call of the function */
540 if (SRF_IS_FIRSTCALL())
541 {
542 MemoryContext oldcontext;
543
544 /* create a function context for cross-call persistence */
545 funcctx = SRF_FIRSTCALL_INIT();
546
547 /* switch to memory context appropriate for multiple function calls */
548 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
549
550 publication = GetPublicationByName(pubname, false);
551
552 /*
553 * Publications support partitioned tables, although all changes are
554 * replicated using leaf partition identity and schema, so we only
555 * need those.
556 */
557 if (publication->alltables)
558 tables = GetAllTablesPublicationRelations(publication->pubviaroot);
559 else
560 tables = GetPublicationRelations(publication->oid,
561 publication->pubviaroot ?
562 PUBLICATION_PART_ROOT :
563 PUBLICATION_PART_LEAF);
564 funcctx->user_fctx = (void *) tables;
565
566 MemoryContextSwitchTo(oldcontext);
567 }
568
569 /* stuff done on every call of the function */
570 funcctx = SRF_PERCALL_SETUP();
571 tables = (List *) funcctx->user_fctx;
572
573 if (funcctx->call_cntr < list_length(tables))
574 {
575 Oid relid = list_nth_oid(tables, funcctx->call_cntr);
576
577 SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
578 }
579
580 SRF_RETURN_DONE(funcctx);
581 }
582