1 /*-------------------------------------------------------------------------
2  * relation.c
3  *	   PostgreSQL logical replication
4  *
5  * Copyright (c) 2016-2018, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/relation.c
9  *
10  * NOTES
11  *	  This file contains helper functions for logical replication relation
12  *	  mapping cache.
13  *
14  *-------------------------------------------------------------------------
15  */
16 
17 #include "postgres.h"
18 
19 #include "access/heapam.h"
20 #include "access/sysattr.h"
21 #include "catalog/namespace.h"
22 #include "catalog/pg_subscription_rel.h"
23 #include "executor/executor.h"
24 #include "nodes/makefuncs.h"
25 #include "replication/logicalrelation.h"
26 #include "replication/worker_internal.h"
27 #include "utils/inval.h"
28 
29 
30 static MemoryContext LogicalRepRelMapContext = NULL;
31 
32 static HTAB *LogicalRepRelMap = NULL;
33 
34 
35 /*
36  * Relcache invalidation callback for our relation map cache.
37  */
38 static void
logicalrep_relmap_invalidate_cb(Datum arg,Oid reloid)39 logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
40 {
41 	LogicalRepRelMapEntry *entry;
42 
43 	/* Just to be sure. */
44 	if (LogicalRepRelMap == NULL)
45 		return;
46 
47 	if (reloid != InvalidOid)
48 	{
49 		HASH_SEQ_STATUS status;
50 
51 		hash_seq_init(&status, LogicalRepRelMap);
52 
53 		/* TODO, use inverse lookup hashtable? */
54 		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
55 		{
56 			if (entry->localreloid == reloid)
57 			{
58 				entry->localrelvalid = false;
59 				hash_seq_term(&status);
60 				break;
61 			}
62 		}
63 	}
64 	else
65 	{
66 		/* invalidate all cache entries */
67 		HASH_SEQ_STATUS status;
68 
69 		hash_seq_init(&status, LogicalRepRelMap);
70 
71 		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
72 			entry->localrelvalid = false;
73 	}
74 }
75 
76 /*
77  * Initialize the relation map cache.
78  */
79 static void
logicalrep_relmap_init(void)80 logicalrep_relmap_init(void)
81 {
82 	HASHCTL		ctl;
83 
84 	if (!LogicalRepRelMapContext)
85 		LogicalRepRelMapContext =
86 			AllocSetContextCreate(CacheMemoryContext,
87 								  "LogicalRepRelMapContext",
88 								  ALLOCSET_DEFAULT_SIZES);
89 
90 	/* Initialize the relation hash table. */
91 	MemSet(&ctl, 0, sizeof(ctl));
92 	ctl.keysize = sizeof(LogicalRepRelId);
93 	ctl.entrysize = sizeof(LogicalRepRelMapEntry);
94 	ctl.hcxt = LogicalRepRelMapContext;
95 
96 	LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
97 								   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
98 
99 	/* Watch for invalidation events. */
100 	CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
101 								  (Datum) 0);
102 }
103 
104 /*
105  * Free the entry of a relation map cache.
106  */
107 static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry * entry)108 logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
109 {
110 	LogicalRepRelation *remoterel;
111 
112 	remoterel = &entry->remoterel;
113 
114 	pfree(remoterel->nspname);
115 	pfree(remoterel->relname);
116 
117 	if (remoterel->natts > 0)
118 	{
119 		int			i;
120 
121 		for (i = 0; i < remoterel->natts; i++)
122 			pfree(remoterel->attnames[i]);
123 
124 		pfree(remoterel->attnames);
125 		pfree(remoterel->atttyps);
126 	}
127 	bms_free(remoterel->attkeys);
128 
129 	if (entry->attrmap)
130 		pfree(entry->attrmap);
131 }
132 
133 /*
134  * Add new entry or update existing entry in the relation map cache.
135  *
136  * Called when new relation mapping is sent by the publisher to update
137  * our expected view of incoming data from said publisher.
138  */
139 void
logicalrep_relmap_update(LogicalRepRelation * remoterel)140 logicalrep_relmap_update(LogicalRepRelation *remoterel)
141 {
142 	MemoryContext oldctx;
143 	LogicalRepRelMapEntry *entry;
144 	bool		found;
145 	int			i;
146 
147 	if (LogicalRepRelMap == NULL)
148 		logicalrep_relmap_init();
149 
150 	/*
151 	 * HASH_ENTER returns the existing entry if present or creates a new one.
152 	 */
153 	entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
154 						HASH_ENTER, &found);
155 
156 	if (found)
157 		logicalrep_relmap_free_entry(entry);
158 
159 	memset(entry, 0, sizeof(LogicalRepRelMapEntry));
160 
161 	/* Make cached copy of the data */
162 	oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
163 	entry->remoterel.remoteid = remoterel->remoteid;
164 	entry->remoterel.nspname = pstrdup(remoterel->nspname);
165 	entry->remoterel.relname = pstrdup(remoterel->relname);
166 	entry->remoterel.natts = remoterel->natts;
167 	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
168 	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
169 	for (i = 0; i < remoterel->natts; i++)
170 	{
171 		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
172 		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
173 	}
174 	entry->remoterel.replident = remoterel->replident;
175 	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
176 	MemoryContextSwitchTo(oldctx);
177 }
178 
179 /*
180  * Find attribute index in TupleDesc struct by attribute name.
181  *
182  * Returns -1 if not found.
183  */
184 static int
logicalrep_rel_att_by_name(LogicalRepRelation * remoterel,const char * attname)185 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
186 {
187 	int			i;
188 
189 	for (i = 0; i < remoterel->natts; i++)
190 	{
191 		if (strcmp(remoterel->attnames[i], attname) == 0)
192 			return i;
193 	}
194 
195 	return -1;
196 }
197 
198 /*
199  * Open the local relation associated with the remote one.
200  *
201  * Rebuilds the Relcache mapping if it was invalidated by local DDL.
202  */
203 LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid,LOCKMODE lockmode)204 logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
205 {
206 	LogicalRepRelMapEntry *entry;
207 	bool		found;
208 	LogicalRepRelation *remoterel;
209 
210 	if (LogicalRepRelMap == NULL)
211 		logicalrep_relmap_init();
212 
213 	/* Search for existing entry. */
214 	entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
215 						HASH_FIND, &found);
216 
217 	if (!found)
218 		elog(ERROR, "no relation map entry for remote relation ID %u",
219 			 remoteid);
220 
221 	remoterel = &entry->remoterel;
222 
223 	/* Ensure we don't leak a relcache refcount. */
224 	if (entry->localrel)
225 		elog(ERROR, "remote relation ID %u is already open", remoteid);
226 
227 	/*
228 	 * When opening and locking a relation, pending invalidation messages are
229 	 * processed which can invalidate the relation.  Hence, if the entry is
230 	 * currently considered valid, try to open the local relation by OID and
231 	 * see if invalidation ensues.
232 	 */
233 	if (entry->localrelvalid)
234 	{
235 		entry->localrel = try_relation_open(entry->localreloid, lockmode);
236 		if (!entry->localrel)
237 		{
238 			/* Table was renamed or dropped. */
239 			entry->localrelvalid = false;
240 		}
241 		else if (!entry->localrelvalid)
242 		{
243 			/* Note we release the no-longer-useful lock here. */
244 			heap_close(entry->localrel, lockmode);
245 			entry->localrel = NULL;
246 		}
247 	}
248 
249 	/*
250 	 * If the entry has been marked invalid since we last had lock on it,
251 	 * re-open the local relation by name and rebuild all derived data.
252 	 */
253 	if (!entry->localrelvalid)
254 	{
255 		Oid			relid;
256 		int			found;
257 		Bitmapset  *idkey;
258 		TupleDesc	desc;
259 		MemoryContext oldctx;
260 		int			i;
261 
262 		/* Try to find and lock the relation by name. */
263 		relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
264 											  remoterel->relname, -1),
265 								 lockmode, true);
266 		if (!OidIsValid(relid))
267 			ereport(ERROR,
268 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
269 					 errmsg("logical replication target relation \"%s.%s\" does not exist",
270 							remoterel->nspname, remoterel->relname)));
271 		entry->localrel = heap_open(relid, NoLock);
272 		entry->localreloid = relid;
273 
274 		/* Check for supported relkind. */
275 		CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
276 								 remoterel->nspname, remoterel->relname);
277 
278 		/*
279 		 * Build the mapping of local attribute numbers to remote attribute
280 		 * numbers and validate that we don't miss any replicated columns as
281 		 * that would result in potentially unwanted data loss.
282 		 */
283 		desc = RelationGetDescr(entry->localrel);
284 		oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
285 		entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
286 		MemoryContextSwitchTo(oldctx);
287 
288 		found = 0;
289 		for (i = 0; i < desc->natts; i++)
290 		{
291 			int			attnum;
292 			Form_pg_attribute attr = TupleDescAttr(desc, i);
293 
294 			if (attr->attisdropped)
295 			{
296 				entry->attrmap[i] = -1;
297 				continue;
298 			}
299 
300 			attnum = logicalrep_rel_att_by_name(remoterel,
301 												NameStr(attr->attname));
302 
303 			entry->attrmap[i] = attnum;
304 			if (attnum >= 0)
305 				found++;
306 		}
307 
308 		/* TODO, detail message with names of missing columns */
309 		if (found < remoterel->natts)
310 			ereport(ERROR,
311 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
312 					 errmsg("logical replication target relation \"%s.%s\" is missing "
313 							"some replicated columns",
314 							remoterel->nspname, remoterel->relname)));
315 
316 		/*
317 		 * Check that replica identity matches. We allow for stricter replica
318 		 * identity (fewer columns) on subscriber as that will not stop us
319 		 * from finding unique tuple. IE, if publisher has identity
320 		 * (id,timestamp) and subscriber just (id) this will not be a problem,
321 		 * but in the opposite scenario it will.
322 		 *
323 		 * Don't throw any error here just mark the relation entry as not
324 		 * updatable, as replica identity is only for updates and deletes but
325 		 * inserts can be replicated even without it.
326 		 */
327 		entry->updatable = true;
328 		idkey = RelationGetIndexAttrBitmap(entry->localrel,
329 										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
330 		/* fallback to PK if no replica identity */
331 		if (idkey == NULL)
332 		{
333 			idkey = RelationGetIndexAttrBitmap(entry->localrel,
334 											   INDEX_ATTR_BITMAP_PRIMARY_KEY);
335 
336 			/*
337 			 * If no replica identity index and no PK, the published table
338 			 * must have replica identity FULL.
339 			 */
340 			if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
341 				entry->updatable = false;
342 		}
343 
344 		i = -1;
345 		while ((i = bms_next_member(idkey, i)) >= 0)
346 		{
347 			int			attnum = i + FirstLowInvalidHeapAttributeNumber;
348 
349 			if (!AttrNumberIsForUserDefinedAttr(attnum))
350 				ereport(ERROR,
351 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 						 errmsg("logical replication target relation \"%s.%s\" uses "
353 								"system columns in REPLICA IDENTITY index",
354 								remoterel->nspname, remoterel->relname)));
355 
356 			attnum = AttrNumberGetAttrOffset(attnum);
357 
358 			if (entry->attrmap[attnum] < 0 ||
359 				!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
360 			{
361 				entry->updatable = false;
362 				break;
363 			}
364 		}
365 
366 		entry->localrelvalid = true;
367 	}
368 
369 	if (entry->state != SUBREL_STATE_READY)
370 		entry->state = GetSubscriptionRelState(MySubscription->oid,
371 											   entry->localreloid,
372 											   &entry->statelsn,
373 											   true);
374 
375 	return entry;
376 }
377 
378 /*
379  * Close the previously opened logical relation.
380  */
381 void
logicalrep_rel_close(LogicalRepRelMapEntry * rel,LOCKMODE lockmode)382 logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
383 {
384 	heap_close(rel->localrel, lockmode);
385 	rel->localrel = NULL;
386 }
387