1 /*-------------------------------------------------------------------------
2 * relation.c
3 * PostgreSQL logical replication
4 *
5 * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/relation.c
9 *
10 * NOTES
11 * This file contains helper functions for logical replication relation
12 * mapping cache.
13 *
14 *-------------------------------------------------------------------------
15 */
16
17 #include "postgres.h"
18
19 #include "access/heapam.h"
20 #include "access/sysattr.h"
21 #include "catalog/namespace.h"
22 #include "catalog/pg_subscription_rel.h"
23 #include "executor/executor.h"
24 #include "nodes/makefuncs.h"
25 #include "replication/logicalrelation.h"
26 #include "replication/worker_internal.h"
27 #include "utils/inval.h"
28 #include "utils/memutils.h"
29
30
31 static MemoryContext LogicalRepRelMapContext = NULL;
32
33 static HTAB *LogicalRepRelMap = NULL;
34
35
36 /*
37 * Relcache invalidation callback for our relation map cache.
38 */
39 static void
logicalrep_relmap_invalidate_cb(Datum arg,Oid reloid)40 logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
41 {
42 LogicalRepRelMapEntry *entry;
43
44 /* Just to be sure. */
45 if (LogicalRepRelMap == NULL)
46 return;
47
48 if (reloid != InvalidOid)
49 {
50 HASH_SEQ_STATUS status;
51
52 hash_seq_init(&status, LogicalRepRelMap);
53
54 /* TODO, use inverse lookup hashtable? */
55 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
56 {
57 if (entry->localreloid == reloid)
58 {
59 entry->localrelvalid = false;
60 hash_seq_term(&status);
61 break;
62 }
63 }
64 }
65 else
66 {
67 /* invalidate all cache entries */
68 HASH_SEQ_STATUS status;
69
70 hash_seq_init(&status, LogicalRepRelMap);
71
72 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
73 entry->localrelvalid = false;
74 }
75 }
76
77 /*
78 * Initialize the relation map cache.
79 */
80 static void
logicalrep_relmap_init(void)81 logicalrep_relmap_init(void)
82 {
83 HASHCTL ctl;
84
85 if (!LogicalRepRelMapContext)
86 LogicalRepRelMapContext =
87 AllocSetContextCreate(CacheMemoryContext,
88 "LogicalRepRelMapContext",
89 ALLOCSET_DEFAULT_SIZES);
90
91 /* Initialize the relation hash table. */
92 MemSet(&ctl, 0, sizeof(ctl));
93 ctl.keysize = sizeof(LogicalRepRelId);
94 ctl.entrysize = sizeof(LogicalRepRelMapEntry);
95 ctl.hcxt = LogicalRepRelMapContext;
96
97 LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
98 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
99
100 /* Watch for invalidation events. */
101 CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
102 (Datum) 0);
103 }
104
105 /*
106 * Free the entry of a relation map cache.
107 */
108 static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry * entry)109 logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
110 {
111 LogicalRepRelation *remoterel;
112
113 remoterel = &entry->remoterel;
114
115 pfree(remoterel->nspname);
116 pfree(remoterel->relname);
117
118 if (remoterel->natts > 0)
119 {
120 int i;
121
122 for (i = 0; i < remoterel->natts; i++)
123 pfree(remoterel->attnames[i]);
124
125 pfree(remoterel->attnames);
126 pfree(remoterel->atttyps);
127 }
128 bms_free(remoterel->attkeys);
129
130 if (entry->attrmap)
131 pfree(entry->attrmap);
132 }
133
134 /*
135 * Add new entry or update existing entry in the relation map cache.
136 *
137 * Called when new relation mapping is sent by the publisher to update
138 * our expected view of incoming data from said publisher.
139 */
140 void
logicalrep_relmap_update(LogicalRepRelation * remoterel)141 logicalrep_relmap_update(LogicalRepRelation *remoterel)
142 {
143 MemoryContext oldctx;
144 LogicalRepRelMapEntry *entry;
145 bool found;
146 int i;
147
148 if (LogicalRepRelMap == NULL)
149 logicalrep_relmap_init();
150
151 /*
152 * HASH_ENTER returns the existing entry if present or creates a new one.
153 */
154 entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
155 HASH_ENTER, &found);
156
157 if (found)
158 logicalrep_relmap_free_entry(entry);
159
160 memset(entry, 0, sizeof(LogicalRepRelMapEntry));
161
162 /* Make cached copy of the data */
163 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
164 entry->remoterel.remoteid = remoterel->remoteid;
165 entry->remoterel.nspname = pstrdup(remoterel->nspname);
166 entry->remoterel.relname = pstrdup(remoterel->relname);
167 entry->remoterel.natts = remoterel->natts;
168 entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
169 entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
170 for (i = 0; i < remoterel->natts; i++)
171 {
172 entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
173 entry->remoterel.atttyps[i] = remoterel->atttyps[i];
174 }
175 entry->remoterel.replident = remoterel->replident;
176 entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
177 MemoryContextSwitchTo(oldctx);
178 }
179
180 /*
181 * Find attribute index in TupleDesc struct by attribute name.
182 *
183 * Returns -1 if not found.
184 */
185 static int
logicalrep_rel_att_by_name(LogicalRepRelation * remoterel,const char * attname)186 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
187 {
188 int i;
189
190 for (i = 0; i < remoterel->natts; i++)
191 {
192 if (strcmp(remoterel->attnames[i], attname) == 0)
193 return i;
194 }
195
196 return -1;
197 }
198
199 /*
200 * Open the local relation associated with the remote one.
201 *
202 * Rebuilds the Relcache mapping if it was invalidated by local DDL.
203 */
204 LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid,LOCKMODE lockmode)205 logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
206 {
207 LogicalRepRelMapEntry *entry;
208 bool found;
209 LogicalRepRelation *remoterel;
210
211 if (LogicalRepRelMap == NULL)
212 logicalrep_relmap_init();
213
214 /* Search for existing entry. */
215 entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
216 HASH_FIND, &found);
217
218 if (!found)
219 elog(ERROR, "no relation map entry for remote relation ID %u",
220 remoteid);
221
222 remoterel = &entry->remoterel;
223
224 /* Ensure we don't leak a relcache refcount. */
225 if (entry->localrel)
226 elog(ERROR, "remote relation ID %u is already open", remoteid);
227
228 /*
229 * When opening and locking a relation, pending invalidation messages are
230 * processed which can invalidate the relation. Hence, if the entry is
231 * currently considered valid, try to open the local relation by OID and
232 * see if invalidation ensues.
233 */
234 if (entry->localrelvalid)
235 {
236 entry->localrel = try_relation_open(entry->localreloid, lockmode);
237 if (!entry->localrel)
238 {
239 /* Table was renamed or dropped. */
240 entry->localrelvalid = false;
241 }
242 else if (!entry->localrelvalid)
243 {
244 /* Note we release the no-longer-useful lock here. */
245 heap_close(entry->localrel, lockmode);
246 entry->localrel = NULL;
247 }
248 }
249
250 /*
251 * If the entry has been marked invalid since we last had lock on it,
252 * re-open the local relation by name and rebuild all derived data.
253 */
254 if (!entry->localrelvalid)
255 {
256 Oid relid;
257 int found;
258 Bitmapset *idkey;
259 TupleDesc desc;
260 MemoryContext oldctx;
261 int i;
262
263 /* Try to find and lock the relation by name. */
264 relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
265 remoterel->relname, -1),
266 lockmode, true);
267 if (!OidIsValid(relid))
268 ereport(ERROR,
269 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
270 errmsg("logical replication target relation \"%s.%s\" does not exist",
271 remoterel->nspname, remoterel->relname)));
272 entry->localrel = heap_open(relid, NoLock);
273 entry->localreloid = relid;
274
275 /* Check for supported relkind. */
276 CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
277 remoterel->nspname, remoterel->relname);
278
279 /*
280 * Build the mapping of local attribute numbers to remote attribute
281 * numbers and validate that we don't miss any replicated columns as
282 * that would result in potentially unwanted data loss.
283 */
284 desc = RelationGetDescr(entry->localrel);
285 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
286 entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
287 MemoryContextSwitchTo(oldctx);
288
289 found = 0;
290 for (i = 0; i < desc->natts; i++)
291 {
292 int attnum;
293
294 if (desc->attrs[i]->attisdropped)
295 {
296 entry->attrmap[i] = -1;
297 continue;
298 }
299
300 attnum = logicalrep_rel_att_by_name(remoterel,
301 NameStr(desc->attrs[i]->attname));
302
303 entry->attrmap[i] = attnum;
304 if (attnum >= 0)
305 found++;
306 }
307
308 /* TODO, detail message with names of missing columns */
309 if (found < remoterel->natts)
310 ereport(ERROR,
311 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
312 errmsg("logical replication target relation \"%s.%s\" is missing "
313 "some replicated columns",
314 remoterel->nspname, remoterel->relname)));
315
316 /*
317 * Check that replica identity matches. We allow for stricter replica
318 * identity (fewer columns) on subscriber as that will not stop us
319 * from finding unique tuple. IE, if publisher has identity
320 * (id,timestamp) and subscriber just (id) this will not be a problem,
321 * but in the opposite scenario it will.
322 *
323 * Don't throw any error here just mark the relation entry as not
324 * updatable, as replica identity is only for updates and deletes but
325 * inserts can be replicated even without it.
326 */
327 entry->updatable = true;
328 idkey = RelationGetIndexAttrBitmap(entry->localrel,
329 INDEX_ATTR_BITMAP_IDENTITY_KEY);
330 /* fallback to PK if no replica identity */
331 if (idkey == NULL)
332 {
333 idkey = RelationGetIndexAttrBitmap(entry->localrel,
334 INDEX_ATTR_BITMAP_PRIMARY_KEY);
335
336 /*
337 * If no replica identity index and no PK, the published table
338 * must have replica identity FULL.
339 */
340 if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
341 entry->updatable = false;
342 }
343
344 i = -1;
345 while ((i = bms_next_member(idkey, i)) >= 0)
346 {
347 int attnum = i + FirstLowInvalidHeapAttributeNumber;
348
349 if (!AttrNumberIsForUserDefinedAttr(attnum))
350 ereport(ERROR,
351 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 errmsg("logical replication target relation \"%s.%s\" uses "
353 "system columns in REPLICA IDENTITY index",
354 remoterel->nspname, remoterel->relname)));
355
356 attnum = AttrNumberGetAttrOffset(attnum);
357
358 if (entry->attrmap[attnum] < 0 ||
359 !bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
360 {
361 entry->updatable = false;
362 break;
363 }
364 }
365
366 entry->localrelvalid = true;
367 }
368
369 if (entry->state != SUBREL_STATE_READY)
370 entry->state = GetSubscriptionRelState(MySubscription->oid,
371 entry->localreloid,
372 &entry->statelsn,
373 true);
374
375 return entry;
376 }
377
378 /*
379 * Close the previously opened logical relation.
380 */
381 void
logicalrep_rel_close(LogicalRepRelMapEntry * rel,LOCKMODE lockmode)382 logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
383 {
384 heap_close(rel->localrel, lockmode);
385 rel->localrel = NULL;
386 }
387