1 /*-------------------------------------------------------------------------
2 *
3 * pg_publication.c
4 * publication C API manipulation
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * pg_publication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "funcapi.h"
18 #include "miscadmin.h"
19
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/index.h"
29 #include "catalog/indexing.h"
30 #include "catalog/namespace.h"
31 #include "catalog/objectaccess.h"
32 #include "catalog/objectaddress.h"
33 #include "catalog/pg_type.h"
34 #include "catalog/pg_publication.h"
35 #include "catalog/pg_publication_rel.h"
36
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45
46 /*
47 * Check if relation can be in given publication and throws appropriate
48 * error if not.
49 */
50 static void
check_publication_add_relation(Relation targetrel)51 check_publication_add_relation(Relation targetrel)
52 {
53 /* Give more specific error for partitioned tables */
54 if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55 ereport(ERROR,
56 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57 errmsg("\"%s\" is a partitioned table",
58 RelationGetRelationName(targetrel)),
59 errdetail("Adding partitioned tables to publications is not supported."),
60 errhint("You can add the table partitions individually.")));
61
62 /* Must be table */
63 if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64 ereport(ERROR,
65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 errmsg("\"%s\" is not a table",
67 RelationGetRelationName(targetrel)),
68 errdetail("Only tables can be added to publications.")));
69
70 /* Can't be system table */
71 if (IsCatalogRelation(targetrel))
72 ereport(ERROR,
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg("\"%s\" is a system table",
75 RelationGetRelationName(targetrel)),
76 errdetail("System tables cannot be added to publications.")));
77
78 /* UNLOGGED and TEMP relations cannot be part of publication. */
79 if (!RelationNeedsWAL(targetrel))
80 ereport(ERROR,
81 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 errmsg("table \"%s\" cannot be replicated",
83 RelationGetRelationName(targetrel)),
84 errdetail("Temporary and unlogged relations cannot be replicated.")));
85 }
86
87 /*
88 * Returns if relation represented by oid and Form_pg_class entry
89 * is publishable.
90 *
91 * Does same checks as the above, but does not need relation to be opened
92 * and also does not throw errors.
93 *
94 * Note this also excludes all tables with relid < FirstNormalObjectId,
95 * ie all tables created during initdb. This mainly affects the preinstalled
96 * information_schema. (IsCatalogClass() only checks for these inside
97 * pg_catalog and toast schemas.)
98 */
99 static bool
is_publishable_class(Oid relid,Form_pg_class reltuple)100 is_publishable_class(Oid relid, Form_pg_class reltuple)
101 {
102 return reltuple->relkind == RELKIND_RELATION &&
103 !IsCatalogClass(relid, reltuple) &&
104 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 relid >= FirstNormalObjectId;
106 }
107
108 /*
109 * Another variant of this, taking a Relation.
110 */
111 bool
is_publishable_relation(Relation rel)112 is_publishable_relation(Relation rel)
113 {
114 return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
115 }
116
117
118 /*
119 * SQL-callable variant of the above
120 *
121 * This returns null when the relation does not exist. This is intended to be
122 * used for example in psql to avoid gratuitous errors when there are
123 * concurrent catalog changes.
124 */
125 Datum
pg_relation_is_publishable(PG_FUNCTION_ARGS)126 pg_relation_is_publishable(PG_FUNCTION_ARGS)
127 {
128 Oid relid = PG_GETARG_OID(0);
129 HeapTuple tuple;
130 bool result;
131
132 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133 if (!tuple)
134 PG_RETURN_NULL();
135 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136 ReleaseSysCache(tuple);
137 PG_RETURN_BOOL(result);
138 }
139
140
141 /*
142 * Insert new publication / relation mapping.
143 */
144 ObjectAddress
publication_add_relation(Oid pubid,Relation targetrel,bool if_not_exists)145 publication_add_relation(Oid pubid, Relation targetrel,
146 bool if_not_exists)
147 {
148 Relation rel;
149 HeapTuple tup;
150 Datum values[Natts_pg_publication_rel];
151 bool nulls[Natts_pg_publication_rel];
152 Oid relid = RelationGetRelid(targetrel);
153 Oid prrelid;
154 Publication *pub = GetPublication(pubid);
155 ObjectAddress myself,
156 referenced;
157
158 rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
159
160 /*
161 * Check for duplicates. Note that this does not really prevent
162 * duplicates, it's here just to provide nicer error message in common
163 * case. The real protection is the unique key on the catalog.
164 */
165 if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
166 ObjectIdGetDatum(pubid)))
167 {
168 heap_close(rel, RowExclusiveLock);
169
170 if (if_not_exists)
171 return InvalidObjectAddress;
172
173 ereport(ERROR,
174 (errcode(ERRCODE_DUPLICATE_OBJECT),
175 errmsg("relation \"%s\" is already member of publication \"%s\"",
176 RelationGetRelationName(targetrel), pub->name)));
177 }
178
179 check_publication_add_relation(targetrel);
180
181 /* Form a tuple. */
182 memset(values, 0, sizeof(values));
183 memset(nulls, false, sizeof(nulls));
184
185 values[Anum_pg_publication_rel_prpubid - 1] =
186 ObjectIdGetDatum(pubid);
187 values[Anum_pg_publication_rel_prrelid - 1] =
188 ObjectIdGetDatum(relid);
189
190 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
191
192 /* Insert tuple into catalog. */
193 prrelid = CatalogTupleInsert(rel, tup);
194 heap_freetuple(tup);
195
196 ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
197
198 /* Add dependency on the publication */
199 ObjectAddressSet(referenced, PublicationRelationId, pubid);
200 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
201
202 /* Add dependency on the relation */
203 ObjectAddressSet(referenced, RelationRelationId, relid);
204 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
205
206 /* Close the table. */
207 heap_close(rel, RowExclusiveLock);
208
209 /* Invalidate relcache so that publication info is rebuilt. */
210 CacheInvalidateRelcache(targetrel);
211
212 return myself;
213 }
214
215
216 /*
217 * Gets list of publication oids for a relation oid.
218 */
219 List *
GetRelationPublications(Oid relid)220 GetRelationPublications(Oid relid)
221 {
222 List *result = NIL;
223 CatCList *pubrellist;
224 int i;
225
226 /* Find all publications associated with the relation. */
227 pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
228 ObjectIdGetDatum(relid));
229 for (i = 0; i < pubrellist->n_members; i++)
230 {
231 HeapTuple tup = &pubrellist->members[i]->tuple;
232 Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
233
234 result = lappend_oid(result, pubid);
235 }
236
237 ReleaseSysCacheList(pubrellist);
238
239 return result;
240 }
241
242 /*
243 * Gets list of relation oids for a publication.
244 *
245 * This should only be used for normal publications, the FOR ALL TABLES
246 * should use GetAllTablesPublicationRelations().
247 */
248 List *
GetPublicationRelations(Oid pubid)249 GetPublicationRelations(Oid pubid)
250 {
251 List *result;
252 Relation pubrelsrel;
253 ScanKeyData scankey;
254 SysScanDesc scan;
255 HeapTuple tup;
256
257 /* Find all publications associated with the relation. */
258 pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
259
260 ScanKeyInit(&scankey,
261 Anum_pg_publication_rel_prpubid,
262 BTEqualStrategyNumber, F_OIDEQ,
263 ObjectIdGetDatum(pubid));
264
265 scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
266 true, NULL, 1, &scankey);
267
268 result = NIL;
269 while (HeapTupleIsValid(tup = systable_getnext(scan)))
270 {
271 Form_pg_publication_rel pubrel;
272
273 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
274
275 result = lappend_oid(result, pubrel->prrelid);
276 }
277
278 systable_endscan(scan);
279 heap_close(pubrelsrel, AccessShareLock);
280
281 return result;
282 }
283
284 /*
285 * Gets list of publication oids for publications marked as FOR ALL TABLES.
286 */
287 List *
GetAllTablesPublications(void)288 GetAllTablesPublications(void)
289 {
290 List *result;
291 Relation rel;
292 ScanKeyData scankey;
293 SysScanDesc scan;
294 HeapTuple tup;
295
296 /* Find all publications that are marked as for all tables. */
297 rel = heap_open(PublicationRelationId, AccessShareLock);
298
299 ScanKeyInit(&scankey,
300 Anum_pg_publication_puballtables,
301 BTEqualStrategyNumber, F_BOOLEQ,
302 BoolGetDatum(true));
303
304 scan = systable_beginscan(rel, InvalidOid, false,
305 NULL, 1, &scankey);
306
307 result = NIL;
308 while (HeapTupleIsValid(tup = systable_getnext(scan)))
309 result = lappend_oid(result, HeapTupleGetOid(tup));
310
311 systable_endscan(scan);
312 heap_close(rel, AccessShareLock);
313
314 return result;
315 }
316
317 /*
318 * Gets list of all relation published by FOR ALL TABLES publication(s).
319 */
320 List *
GetAllTablesPublicationRelations(void)321 GetAllTablesPublicationRelations(void)
322 {
323 Relation classRel;
324 ScanKeyData key[1];
325 HeapScanDesc scan;
326 HeapTuple tuple;
327 List *result = NIL;
328
329 classRel = heap_open(RelationRelationId, AccessShareLock);
330
331 ScanKeyInit(&key[0],
332 Anum_pg_class_relkind,
333 BTEqualStrategyNumber, F_CHAREQ,
334 CharGetDatum(RELKIND_RELATION));
335
336 scan = heap_beginscan_catalog(classRel, 1, key);
337
338 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
339 {
340 Oid relid = HeapTupleGetOid(tuple);
341 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
342
343 if (is_publishable_class(relid, relForm))
344 result = lappend_oid(result, relid);
345 }
346
347 heap_endscan(scan);
348 heap_close(classRel, AccessShareLock);
349
350 return result;
351 }
352
353 /*
354 * Get publication using oid
355 *
356 * The Publication struct and its data are palloc'ed here.
357 */
358 Publication *
GetPublication(Oid pubid)359 GetPublication(Oid pubid)
360 {
361 HeapTuple tup;
362 Publication *pub;
363 Form_pg_publication pubform;
364
365 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
366
367 if (!HeapTupleIsValid(tup))
368 elog(ERROR, "cache lookup failed for publication %u", pubid);
369
370 pubform = (Form_pg_publication) GETSTRUCT(tup);
371
372 pub = (Publication *) palloc(sizeof(Publication));
373 pub->oid = pubid;
374 pub->name = pstrdup(NameStr(pubform->pubname));
375 pub->alltables = pubform->puballtables;
376 pub->pubactions.pubinsert = pubform->pubinsert;
377 pub->pubactions.pubupdate = pubform->pubupdate;
378 pub->pubactions.pubdelete = pubform->pubdelete;
379
380 ReleaseSysCache(tup);
381
382 return pub;
383 }
384
385
386 /*
387 * Get Publication using name.
388 */
389 Publication *
GetPublicationByName(const char * pubname,bool missing_ok)390 GetPublicationByName(const char *pubname, bool missing_ok)
391 {
392 Oid oid;
393
394 oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
395 if (!OidIsValid(oid))
396 {
397 if (missing_ok)
398 return NULL;
399
400 ereport(ERROR,
401 (errcode(ERRCODE_UNDEFINED_OBJECT),
402 errmsg("publication \"%s\" does not exist", pubname)));
403 }
404
405 return GetPublication(oid);
406 }
407
408 /*
409 * get_publication_oid - given a publication name, look up the OID
410 *
411 * If missing_ok is false, throw an error if name not found. If true, just
412 * return InvalidOid.
413 */
414 Oid
get_publication_oid(const char * pubname,bool missing_ok)415 get_publication_oid(const char *pubname, bool missing_ok)
416 {
417 Oid oid;
418
419 oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
420 if (!OidIsValid(oid) && !missing_ok)
421 ereport(ERROR,
422 (errcode(ERRCODE_UNDEFINED_OBJECT),
423 errmsg("publication \"%s\" does not exist", pubname)));
424 return oid;
425 }
426
427 /*
428 * get_publication_name - given a publication Oid, look up the name
429 */
430 char *
get_publication_name(Oid pubid)431 get_publication_name(Oid pubid)
432 {
433 HeapTuple tup;
434 char *pubname;
435 Form_pg_publication pubform;
436
437 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
438
439 if (!HeapTupleIsValid(tup))
440 elog(ERROR, "cache lookup failed for publication %u", pubid);
441
442 pubform = (Form_pg_publication) GETSTRUCT(tup);
443 pubname = pstrdup(NameStr(pubform->pubname));
444
445 ReleaseSysCache(tup);
446
447 return pubname;
448 }
449
450 /*
451 * Returns Oids of tables in a publication.
452 */
453 Datum
pg_get_publication_tables(PG_FUNCTION_ARGS)454 pg_get_publication_tables(PG_FUNCTION_ARGS)
455 {
456 FuncCallContext *funcctx;
457 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
458 Publication *publication;
459 List *tables;
460 ListCell **lcp;
461
462 /* stuff done only on the first call of the function */
463 if (SRF_IS_FIRSTCALL())
464 {
465 MemoryContext oldcontext;
466
467 /* create a function context for cross-call persistence */
468 funcctx = SRF_FIRSTCALL_INIT();
469
470 /* switch to memory context appropriate for multiple function calls */
471 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
472
473 publication = GetPublicationByName(pubname, false);
474 if (publication->alltables)
475 tables = GetAllTablesPublicationRelations();
476 else
477 tables = GetPublicationRelations(publication->oid);
478 lcp = (ListCell **) palloc(sizeof(ListCell *));
479 *lcp = list_head(tables);
480 funcctx->user_fctx = (void *) lcp;
481
482 MemoryContextSwitchTo(oldcontext);
483 }
484
485 /* stuff done on every call of the function */
486 funcctx = SRF_PERCALL_SETUP();
487 lcp = (ListCell **) funcctx->user_fctx;
488
489 while (*lcp != NULL)
490 {
491 Oid relid = lfirst_oid(*lcp);
492
493 *lcp = lnext(*lcp);
494 SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
495 }
496
497 SRF_RETURN_DONE(funcctx);
498 }
499