1 /*-------------------------------------------------------------------------
2 * relation.c
3 * PostgreSQL logical replication
4 *
5 * Copyright (c) 2016-2018, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/relation.c
9 *
10 * NOTES
11 * This file contains helper functions for logical replication relation
12 * mapping cache.
13 *
14 *-------------------------------------------------------------------------
15 */
16
17 #include "postgres.h"
18
19 #include "access/heapam.h"
20 #include "access/sysattr.h"
21 #include "catalog/namespace.h"
22 #include "catalog/pg_subscription_rel.h"
23 #include "executor/executor.h"
24 #include "nodes/makefuncs.h"
25 #include "replication/logicalrelation.h"
26 #include "replication/worker_internal.h"
27 #include "utils/inval.h"
28
29
30 static MemoryContext LogicalRepRelMapContext = NULL;
31
32 static HTAB *LogicalRepRelMap = NULL;
33
34
35 /*
36 * Relcache invalidation callback for our relation map cache.
37 */
38 static void
logicalrep_relmap_invalidate_cb(Datum arg,Oid reloid)39 logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
40 {
41 LogicalRepRelMapEntry *entry;
42
43 /* Just to be sure. */
44 if (LogicalRepRelMap == NULL)
45 return;
46
47 if (reloid != InvalidOid)
48 {
49 HASH_SEQ_STATUS status;
50
51 hash_seq_init(&status, LogicalRepRelMap);
52
53 /* TODO, use inverse lookup hashtable? */
54 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
55 {
56 if (entry->localreloid == reloid)
57 {
58 entry->localrelvalid = false;
59 hash_seq_term(&status);
60 break;
61 }
62 }
63 }
64 else
65 {
66 /* invalidate all cache entries */
67 HASH_SEQ_STATUS status;
68
69 hash_seq_init(&status, LogicalRepRelMap);
70
71 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
72 entry->localrelvalid = false;
73 }
74 }
75
76 /*
77 * Initialize the relation map cache.
78 */
79 static void
logicalrep_relmap_init(void)80 logicalrep_relmap_init(void)
81 {
82 HASHCTL ctl;
83
84 if (!LogicalRepRelMapContext)
85 LogicalRepRelMapContext =
86 AllocSetContextCreate(CacheMemoryContext,
87 "LogicalRepRelMapContext",
88 ALLOCSET_DEFAULT_SIZES);
89
90 /* Initialize the relation hash table. */
91 MemSet(&ctl, 0, sizeof(ctl));
92 ctl.keysize = sizeof(LogicalRepRelId);
93 ctl.entrysize = sizeof(LogicalRepRelMapEntry);
94 ctl.hcxt = LogicalRepRelMapContext;
95
96 LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
97 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
98
99 /* Watch for invalidation events. */
100 CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
101 (Datum) 0);
102 }
103
104 /*
105 * Free the entry of a relation map cache.
106 */
107 static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry * entry)108 logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
109 {
110 LogicalRepRelation *remoterel;
111
112 remoterel = &entry->remoterel;
113
114 pfree(remoterel->nspname);
115 pfree(remoterel->relname);
116
117 if (remoterel->natts > 0)
118 {
119 int i;
120
121 for (i = 0; i < remoterel->natts; i++)
122 pfree(remoterel->attnames[i]);
123
124 pfree(remoterel->attnames);
125 pfree(remoterel->atttyps);
126 }
127 bms_free(remoterel->attkeys);
128
129 if (entry->attrmap)
130 pfree(entry->attrmap);
131 }
132
133 /*
134 * Add new entry or update existing entry in the relation map cache.
135 *
136 * Called when new relation mapping is sent by the publisher to update
137 * our expected view of incoming data from said publisher.
138 */
139 void
logicalrep_relmap_update(LogicalRepRelation * remoterel)140 logicalrep_relmap_update(LogicalRepRelation *remoterel)
141 {
142 MemoryContext oldctx;
143 LogicalRepRelMapEntry *entry;
144 bool found;
145 int i;
146
147 if (LogicalRepRelMap == NULL)
148 logicalrep_relmap_init();
149
150 /*
151 * HASH_ENTER returns the existing entry if present or creates a new one.
152 */
153 entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
154 HASH_ENTER, &found);
155
156 if (found)
157 logicalrep_relmap_free_entry(entry);
158
159 memset(entry, 0, sizeof(LogicalRepRelMapEntry));
160
161 /* Make cached copy of the data */
162 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
163 entry->remoterel.remoteid = remoterel->remoteid;
164 entry->remoterel.nspname = pstrdup(remoterel->nspname);
165 entry->remoterel.relname = pstrdup(remoterel->relname);
166 entry->remoterel.natts = remoterel->natts;
167 entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
168 entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
169 for (i = 0; i < remoterel->natts; i++)
170 {
171 entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
172 entry->remoterel.atttyps[i] = remoterel->atttyps[i];
173 }
174 entry->remoterel.replident = remoterel->replident;
175 entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
176 MemoryContextSwitchTo(oldctx);
177 }
178
179 /*
180 * Find attribute index in TupleDesc struct by attribute name.
181 *
182 * Returns -1 if not found.
183 */
184 static int
logicalrep_rel_att_by_name(LogicalRepRelation * remoterel,const char * attname)185 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
186 {
187 int i;
188
189 for (i = 0; i < remoterel->natts; i++)
190 {
191 if (strcmp(remoterel->attnames[i], attname) == 0)
192 return i;
193 }
194
195 return -1;
196 }
197
198 /*
199 * Open the local relation associated with the remote one.
200 *
201 * Rebuilds the Relcache mapping if it was invalidated by local DDL.
202 */
203 LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid,LOCKMODE lockmode)204 logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
205 {
206 LogicalRepRelMapEntry *entry;
207 bool found;
208 LogicalRepRelation *remoterel;
209
210 if (LogicalRepRelMap == NULL)
211 logicalrep_relmap_init();
212
213 /* Search for existing entry. */
214 entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
215 HASH_FIND, &found);
216
217 if (!found)
218 elog(ERROR, "no relation map entry for remote relation ID %u",
219 remoteid);
220
221 remoterel = &entry->remoterel;
222
223 /* Ensure we don't leak a relcache refcount. */
224 if (entry->localrel)
225 elog(ERROR, "remote relation ID %u is already open", remoteid);
226
227 /*
228 * When opening and locking a relation, pending invalidation messages are
229 * processed which can invalidate the relation. Hence, if the entry is
230 * currently considered valid, try to open the local relation by OID and
231 * see if invalidation ensues.
232 */
233 if (entry->localrelvalid)
234 {
235 entry->localrel = try_relation_open(entry->localreloid, lockmode);
236 if (!entry->localrel)
237 {
238 /* Table was renamed or dropped. */
239 entry->localrelvalid = false;
240 }
241 else if (!entry->localrelvalid)
242 {
243 /* Note we release the no-longer-useful lock here. */
244 heap_close(entry->localrel, lockmode);
245 entry->localrel = NULL;
246 }
247 }
248
249 /*
250 * If the entry has been marked invalid since we last had lock on it,
251 * re-open the local relation by name and rebuild all derived data.
252 */
253 if (!entry->localrelvalid)
254 {
255 Oid relid;
256 int found;
257 Bitmapset *idkey;
258 TupleDesc desc;
259 MemoryContext oldctx;
260 int i;
261
262 /* Try to find and lock the relation by name. */
263 relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
264 remoterel->relname, -1),
265 lockmode, true);
266 if (!OidIsValid(relid))
267 ereport(ERROR,
268 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
269 errmsg("logical replication target relation \"%s.%s\" does not exist",
270 remoterel->nspname, remoterel->relname)));
271 entry->localrel = heap_open(relid, NoLock);
272 entry->localreloid = relid;
273
274 /* Check for supported relkind. */
275 CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
276 remoterel->nspname, remoterel->relname);
277
278 /*
279 * Build the mapping of local attribute numbers to remote attribute
280 * numbers and validate that we don't miss any replicated columns as
281 * that would result in potentially unwanted data loss.
282 */
283 desc = RelationGetDescr(entry->localrel);
284 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
285 entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
286 MemoryContextSwitchTo(oldctx);
287
288 found = 0;
289 for (i = 0; i < desc->natts; i++)
290 {
291 int attnum;
292 Form_pg_attribute attr = TupleDescAttr(desc, i);
293
294 if (attr->attisdropped)
295 {
296 entry->attrmap[i] = -1;
297 continue;
298 }
299
300 attnum = logicalrep_rel_att_by_name(remoterel,
301 NameStr(attr->attname));
302
303 entry->attrmap[i] = attnum;
304 if (attnum >= 0)
305 found++;
306 }
307
308 /* TODO, detail message with names of missing columns */
309 if (found < remoterel->natts)
310 ereport(ERROR,
311 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
312 errmsg("logical replication target relation \"%s.%s\" is missing "
313 "some replicated columns",
314 remoterel->nspname, remoterel->relname)));
315
316 /*
317 * Check that replica identity matches. We allow for stricter replica
318 * identity (fewer columns) on subscriber as that will not stop us
319 * from finding unique tuple. IE, if publisher has identity
320 * (id,timestamp) and subscriber just (id) this will not be a problem,
321 * but in the opposite scenario it will.
322 *
323 * Don't throw any error here just mark the relation entry as not
324 * updatable, as replica identity is only for updates and deletes but
325 * inserts can be replicated even without it.
326 */
327 entry->updatable = true;
328 idkey = RelationGetIndexAttrBitmap(entry->localrel,
329 INDEX_ATTR_BITMAP_IDENTITY_KEY);
330 /* fallback to PK if no replica identity */
331 if (idkey == NULL)
332 {
333 idkey = RelationGetIndexAttrBitmap(entry->localrel,
334 INDEX_ATTR_BITMAP_PRIMARY_KEY);
335
336 /*
337 * If no replica identity index and no PK, the published table
338 * must have replica identity FULL.
339 */
340 if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
341 entry->updatable = false;
342 }
343
344 i = -1;
345 while ((i = bms_next_member(idkey, i)) >= 0)
346 {
347 int attnum = i + FirstLowInvalidHeapAttributeNumber;
348
349 if (!AttrNumberIsForUserDefinedAttr(attnum))
350 ereport(ERROR,
351 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 errmsg("logical replication target relation \"%s.%s\" uses "
353 "system columns in REPLICA IDENTITY index",
354 remoterel->nspname, remoterel->relname)));
355
356 attnum = AttrNumberGetAttrOffset(attnum);
357
358 if (entry->attrmap[attnum] < 0 ||
359 !bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
360 {
361 entry->updatable = false;
362 break;
363 }
364 }
365
366 entry->localrelvalid = true;
367 }
368
369 if (entry->state != SUBREL_STATE_READY)
370 entry->state = GetSubscriptionRelState(MySubscription->oid,
371 entry->localreloid,
372 &entry->statelsn,
373 true);
374
375 return entry;
376 }
377
378 /*
379 * Close the previously opened logical relation.
380 */
381 void
logicalrep_rel_close(LogicalRepRelMapEntry * rel,LOCKMODE lockmode)382 logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
383 {
384 heap_close(rel->localrel, lockmode);
385 rel->localrel = NULL;
386 }
387