1 /*-------------------------------------------------------------------------
2  * relation.c
3  *	   PostgreSQL logical replication
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/relation.c
9  *
10  * NOTES
11  *	  This file contains helper functions for logical replication relation
12  *	  mapping cache.
13  *
14  *-------------------------------------------------------------------------
15  */
16 
17 #include "postgres.h"
18 
19 #include "access/heapam.h"
20 #include "access/sysattr.h"
21 #include "catalog/namespace.h"
22 #include "catalog/pg_subscription_rel.h"
23 #include "executor/executor.h"
24 #include "nodes/makefuncs.h"
25 #include "replication/logicalrelation.h"
26 #include "replication/worker_internal.h"
27 #include "utils/inval.h"
28 #include "utils/memutils.h"
29 
30 
31 static MemoryContext LogicalRepRelMapContext = NULL;
32 
33 static HTAB *LogicalRepRelMap = NULL;
34 
35 
36 /*
37  * Relcache invalidation callback for our relation map cache.
38  */
39 static void
logicalrep_relmap_invalidate_cb(Datum arg,Oid reloid)40 logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
41 {
42 	LogicalRepRelMapEntry *entry;
43 
44 	/* Just to be sure. */
45 	if (LogicalRepRelMap == NULL)
46 		return;
47 
48 	if (reloid != InvalidOid)
49 	{
50 		HASH_SEQ_STATUS status;
51 
52 		hash_seq_init(&status, LogicalRepRelMap);
53 
54 		/* TODO, use inverse lookup hashtable? */
55 		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
56 		{
57 			if (entry->localreloid == reloid)
58 			{
59 				entry->localrelvalid = false;
60 				hash_seq_term(&status);
61 				break;
62 			}
63 		}
64 	}
65 	else
66 	{
67 		/* invalidate all cache entries */
68 		HASH_SEQ_STATUS status;
69 
70 		hash_seq_init(&status, LogicalRepRelMap);
71 
72 		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
73 			entry->localrelvalid = false;
74 	}
75 }
76 
77 /*
78  * Initialize the relation map cache.
79  */
80 static void
logicalrep_relmap_init(void)81 logicalrep_relmap_init(void)
82 {
83 	HASHCTL		ctl;
84 
85 	if (!LogicalRepRelMapContext)
86 		LogicalRepRelMapContext =
87 			AllocSetContextCreate(CacheMemoryContext,
88 								  "LogicalRepRelMapContext",
89 								  ALLOCSET_DEFAULT_SIZES);
90 
91 	/* Initialize the relation hash table. */
92 	MemSet(&ctl, 0, sizeof(ctl));
93 	ctl.keysize = sizeof(LogicalRepRelId);
94 	ctl.entrysize = sizeof(LogicalRepRelMapEntry);
95 	ctl.hcxt = LogicalRepRelMapContext;
96 
97 	LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
98 								   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
99 
100 	/* Watch for invalidation events. */
101 	CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
102 								  (Datum) 0);
103 }
104 
105 /*
106  * Free the entry of a relation map cache.
107  */
108 static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry * entry)109 logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
110 {
111 	LogicalRepRelation *remoterel;
112 
113 	remoterel = &entry->remoterel;
114 
115 	pfree(remoterel->nspname);
116 	pfree(remoterel->relname);
117 
118 	if (remoterel->natts > 0)
119 	{
120 		int			i;
121 
122 		for (i = 0; i < remoterel->natts; i++)
123 			pfree(remoterel->attnames[i]);
124 
125 		pfree(remoterel->attnames);
126 		pfree(remoterel->atttyps);
127 	}
128 	bms_free(remoterel->attkeys);
129 
130 	if (entry->attrmap)
131 		pfree(entry->attrmap);
132 }
133 
134 /*
135  * Add new entry or update existing entry in the relation map cache.
136  *
137  * Called when new relation mapping is sent by the publisher to update
138  * our expected view of incoming data from said publisher.
139  */
140 void
logicalrep_relmap_update(LogicalRepRelation * remoterel)141 logicalrep_relmap_update(LogicalRepRelation *remoterel)
142 {
143 	MemoryContext oldctx;
144 	LogicalRepRelMapEntry *entry;
145 	bool		found;
146 	int			i;
147 
148 	if (LogicalRepRelMap == NULL)
149 		logicalrep_relmap_init();
150 
151 	/*
152 	 * HASH_ENTER returns the existing entry if present or creates a new one.
153 	 */
154 	entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
155 						HASH_ENTER, &found);
156 
157 	if (found)
158 		logicalrep_relmap_free_entry(entry);
159 
160 	memset(entry, 0, sizeof(LogicalRepRelMapEntry));
161 
162 	/* Make cached copy of the data */
163 	oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
164 	entry->remoterel.remoteid = remoterel->remoteid;
165 	entry->remoterel.nspname = pstrdup(remoterel->nspname);
166 	entry->remoterel.relname = pstrdup(remoterel->relname);
167 	entry->remoterel.natts = remoterel->natts;
168 	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
169 	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
170 	for (i = 0; i < remoterel->natts; i++)
171 	{
172 		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
173 		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
174 	}
175 	entry->remoterel.replident = remoterel->replident;
176 	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
177 	MemoryContextSwitchTo(oldctx);
178 }
179 
180 /*
181  * Find attribute index in TupleDesc struct by attribute name.
182  *
183  * Returns -1 if not found.
184  */
185 static int
logicalrep_rel_att_by_name(LogicalRepRelation * remoterel,const char * attname)186 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
187 {
188 	int			i;
189 
190 	for (i = 0; i < remoterel->natts; i++)
191 	{
192 		if (strcmp(remoterel->attnames[i], attname) == 0)
193 			return i;
194 	}
195 
196 	return -1;
197 }
198 
199 /*
200  * Open the local relation associated with the remote one.
201  *
202  * Rebuilds the Relcache mapping if it was invalidated by local DDL.
203  */
204 LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid,LOCKMODE lockmode)205 logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
206 {
207 	LogicalRepRelMapEntry *entry;
208 	bool		found;
209 	LogicalRepRelation *remoterel;
210 
211 	if (LogicalRepRelMap == NULL)
212 		logicalrep_relmap_init();
213 
214 	/* Search for existing entry. */
215 	entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
216 						HASH_FIND, &found);
217 
218 	if (!found)
219 		elog(ERROR, "no relation map entry for remote relation ID %u",
220 			 remoteid);
221 
222 	remoterel = &entry->remoterel;
223 
224 	/* Ensure we don't leak a relcache refcount. */
225 	if (entry->localrel)
226 		elog(ERROR, "remote relation ID %u is already open", remoteid);
227 
228 	/*
229 	 * When opening and locking a relation, pending invalidation messages are
230 	 * processed which can invalidate the relation.  Hence, if the entry is
231 	 * currently considered valid, try to open the local relation by OID and
232 	 * see if invalidation ensues.
233 	 */
234 	if (entry->localrelvalid)
235 	{
236 		entry->localrel = try_relation_open(entry->localreloid, lockmode);
237 		if (!entry->localrel)
238 		{
239 			/* Table was renamed or dropped. */
240 			entry->localrelvalid = false;
241 		}
242 		else if (!entry->localrelvalid)
243 		{
244 			/* Note we release the no-longer-useful lock here. */
245 			heap_close(entry->localrel, lockmode);
246 			entry->localrel = NULL;
247 		}
248 	}
249 
250 	/*
251 	 * If the entry has been marked invalid since we last had lock on it,
252 	 * re-open the local relation by name and rebuild all derived data.
253 	 */
254 	if (!entry->localrelvalid)
255 	{
256 		Oid			relid;
257 		int			found;
258 		Bitmapset  *idkey;
259 		TupleDesc	desc;
260 		MemoryContext oldctx;
261 		int			i;
262 
263 		/* Try to find and lock the relation by name. */
264 		relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
265 											  remoterel->relname, -1),
266 								 lockmode, true);
267 		if (!OidIsValid(relid))
268 			ereport(ERROR,
269 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
270 					 errmsg("logical replication target relation \"%s.%s\" does not exist",
271 							remoterel->nspname, remoterel->relname)));
272 		entry->localrel = heap_open(relid, NoLock);
273 		entry->localreloid = relid;
274 
275 		/* Check for supported relkind. */
276 		CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
277 								 remoterel->nspname, remoterel->relname);
278 
279 		/*
280 		 * Build the mapping of local attribute numbers to remote attribute
281 		 * numbers and validate that we don't miss any replicated columns as
282 		 * that would result in potentially unwanted data loss.
283 		 */
284 		desc = RelationGetDescr(entry->localrel);
285 		oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
286 		entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
287 		MemoryContextSwitchTo(oldctx);
288 
289 		found = 0;
290 		for (i = 0; i < desc->natts; i++)
291 		{
292 			int			attnum;
293 
294 			if (desc->attrs[i]->attisdropped)
295 			{
296 				entry->attrmap[i] = -1;
297 				continue;
298 			}
299 
300 			attnum = logicalrep_rel_att_by_name(remoterel,
301 												NameStr(desc->attrs[i]->attname));
302 
303 			entry->attrmap[i] = attnum;
304 			if (attnum >= 0)
305 				found++;
306 		}
307 
308 		/* TODO, detail message with names of missing columns */
309 		if (found < remoterel->natts)
310 			ereport(ERROR,
311 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
312 					 errmsg("logical replication target relation \"%s.%s\" is missing "
313 							"some replicated columns",
314 							remoterel->nspname, remoterel->relname)));
315 
316 		/*
317 		 * Check that replica identity matches. We allow for stricter replica
318 		 * identity (fewer columns) on subscriber as that will not stop us
319 		 * from finding unique tuple. IE, if publisher has identity
320 		 * (id,timestamp) and subscriber just (id) this will not be a problem,
321 		 * but in the opposite scenario it will.
322 		 *
323 		 * Don't throw any error here just mark the relation entry as not
324 		 * updatable, as replica identity is only for updates and deletes but
325 		 * inserts can be replicated even without it.
326 		 */
327 		entry->updatable = true;
328 		idkey = RelationGetIndexAttrBitmap(entry->localrel,
329 										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
330 		/* fallback to PK if no replica identity */
331 		if (idkey == NULL)
332 		{
333 			idkey = RelationGetIndexAttrBitmap(entry->localrel,
334 											   INDEX_ATTR_BITMAP_PRIMARY_KEY);
335 
336 			/*
337 			 * If no replica identity index and no PK, the published table
338 			 * must have replica identity FULL.
339 			 */
340 			if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
341 				entry->updatable = false;
342 		}
343 
344 		i = -1;
345 		while ((i = bms_next_member(idkey, i)) >= 0)
346 		{
347 			int			attnum = i + FirstLowInvalidHeapAttributeNumber;
348 
349 			if (!AttrNumberIsForUserDefinedAttr(attnum))
350 				ereport(ERROR,
351 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 						 errmsg("logical replication target relation \"%s.%s\" uses "
353 								"system columns in REPLICA IDENTITY index",
354 								remoterel->nspname, remoterel->relname)));
355 
356 			attnum = AttrNumberGetAttrOffset(attnum);
357 
358 			if (entry->attrmap[attnum] < 0 ||
359 				!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
360 			{
361 				entry->updatable = false;
362 				break;
363 			}
364 		}
365 
366 		entry->localrelvalid = true;
367 	}
368 
369 	if (entry->state != SUBREL_STATE_READY)
370 		entry->state = GetSubscriptionRelState(MySubscription->oid,
371 											   entry->localreloid,
372 											   &entry->statelsn,
373 											   true);
374 
375 	return entry;
376 }
377 
378 /*
379  * Close the previously opened logical relation.
380  */
381 void
logicalrep_rel_close(LogicalRepRelMapEntry * rel,LOCKMODE lockmode)382 logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
383 {
384 	heap_close(rel->localrel, lockmode);
385 	rel->localrel = NULL;
386 }
387