1 /*-------------------------------------------------------------------------
2 *
3 * pg_inherits.c
4 * routines to support manipulation of the pg_inherits relation
5 *
6 * Note: currently, this module only contains inquiry functions; the actual
7 * creation and deletion of pg_inherits entries is done in tablecmds.c.
8 * Perhaps someday that code should be moved here, but it'd have to be
9 * disentangled from other stuff such as pg_depend updates.
10 *
11 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/catalog/pg_inherits.c
17 *
18 *-------------------------------------------------------------------------
19 */
20 #include "postgres.h"
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "catalog/indexing.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_inherits_fn.h"
28 #include "parser/parse_type.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
34 #include "utils/tqual.h"
35
36 /*
37 * Entry of a hash table used in find_all_inheritors. See below.
38 */
39 typedef struct SeenRelsEntry
40 {
41 Oid rel_id; /* relation oid */
42 ListCell *numparents_cell; /* corresponding list cell */
43 } SeenRelsEntry;
44
45 /*
46 * find_inheritance_children
47 *
48 * Returns a list containing the OIDs of all relations which
49 * inherit *directly* from the relation with OID 'parentrelId'.
50 *
51 * The specified lock type is acquired on each child relation (but not on the
52 * given rel; caller should already have locked it). If lockmode is NoLock
53 * then no locks are acquired, but caller must beware of race conditions
54 * against possible DROPs of child relations.
55 */
56 List *
find_inheritance_children(Oid parentrelId,LOCKMODE lockmode)57 find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
58 {
59 List *list = NIL;
60 Relation relation;
61 SysScanDesc scan;
62 ScanKeyData key[1];
63 HeapTuple inheritsTuple;
64 Oid inhrelid;
65 Oid *oidarr;
66 int maxoids,
67 numoids,
68 i;
69
70 /*
71 * Can skip the scan if pg_class shows the relation has never had a
72 * subclass.
73 */
74 if (!has_subclass(parentrelId))
75 return NIL;
76
77 /*
78 * Scan pg_inherits and build a working array of subclass OIDs.
79 */
80 maxoids = 32;
81 oidarr = (Oid *) palloc(maxoids * sizeof(Oid));
82 numoids = 0;
83
84 relation = heap_open(InheritsRelationId, AccessShareLock);
85
86 ScanKeyInit(&key[0],
87 Anum_pg_inherits_inhparent,
88 BTEqualStrategyNumber, F_OIDEQ,
89 ObjectIdGetDatum(parentrelId));
90
91 scan = systable_beginscan(relation, InheritsParentIndexId, true,
92 NULL, 1, key);
93
94 while ((inheritsTuple = systable_getnext(scan)) != NULL)
95 {
96 inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
97 if (numoids >= maxoids)
98 {
99 maxoids *= 2;
100 oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid));
101 }
102 oidarr[numoids++] = inhrelid;
103 }
104
105 systable_endscan(scan);
106
107 heap_close(relation, AccessShareLock);
108
109 /*
110 * If we found more than one child, sort them by OID. This ensures
111 * reasonably consistent behavior regardless of the vagaries of an
112 * indexscan. This is important since we need to be sure all backends
113 * lock children in the same order to avoid needless deadlocks.
114 */
115 if (numoids > 1)
116 qsort(oidarr, numoids, sizeof(Oid), oid_cmp);
117
118 /*
119 * Acquire locks and build the result list.
120 */
121 for (i = 0; i < numoids; i++)
122 {
123 inhrelid = oidarr[i];
124
125 if (lockmode != NoLock)
126 {
127 /* Get the lock to synchronize against concurrent drop */
128 LockRelationOid(inhrelid, lockmode);
129
130 /*
131 * Now that we have the lock, double-check to see if the relation
132 * really exists or not. If not, assume it was dropped while we
133 * waited to acquire lock, and ignore it.
134 */
135 if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid)))
136 {
137 /* Release useless lock */
138 UnlockRelationOid(inhrelid, lockmode);
139 /* And ignore this relation */
140 continue;
141 }
142 }
143
144 list = lappend_oid(list, inhrelid);
145 }
146
147 pfree(oidarr);
148
149 return list;
150 }
151
152
153 /*
154 * find_all_inheritors -
155 * Returns a list of relation OIDs including the given rel plus
156 * all relations that inherit from it, directly or indirectly.
157 * Optionally, it also returns the number of parents found for
158 * each such relation within the inheritance tree rooted at the
159 * given rel.
160 *
161 * The specified lock type is acquired on all child relations (but not on the
162 * given rel; caller should already have locked it). If lockmode is NoLock
163 * then no locks are acquired, but caller must beware of race conditions
164 * against possible DROPs of child relations.
165 */
166 List *
find_all_inheritors(Oid parentrelId,LOCKMODE lockmode,List ** numparents)167 find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
168 {
169 /* hash table for O(1) rel_oid -> rel_numparents cell lookup */
170 HTAB *seen_rels;
171 HASHCTL ctl;
172 List *rels_list,
173 *rel_numparents;
174 ListCell *l;
175
176 memset(&ctl, 0, sizeof(ctl));
177 ctl.keysize = sizeof(Oid);
178 ctl.entrysize = sizeof(SeenRelsEntry);
179 ctl.hcxt = CurrentMemoryContext;
180
181 seen_rels = hash_create("find_all_inheritors temporary table",
182 32, /* start small and extend */
183 &ctl,
184 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
185
186 /*
187 * We build a list starting with the given rel and adding all direct and
188 * indirect children. We can use a single list as both the record of
189 * already-found rels and the agenda of rels yet to be scanned for more
190 * children. This is a bit tricky but works because the foreach() macro
191 * doesn't fetch the next list element until the bottom of the loop.
192 */
193 rels_list = list_make1_oid(parentrelId);
194 rel_numparents = list_make1_int(0);
195
196 foreach(l, rels_list)
197 {
198 Oid currentrel = lfirst_oid(l);
199 List *currentchildren;
200 ListCell *lc;
201
202 /* Get the direct children of this rel */
203 currentchildren = find_inheritance_children(currentrel, lockmode);
204
205 /*
206 * Add to the queue only those children not already seen. This avoids
207 * making duplicate entries in case of multiple inheritance paths from
208 * the same parent. (It'll also keep us from getting into an infinite
209 * loop, though theoretically there can't be any cycles in the
210 * inheritance graph anyway.)
211 */
212 foreach(lc, currentchildren)
213 {
214 Oid child_oid = lfirst_oid(lc);
215 bool found;
216 SeenRelsEntry *hash_entry;
217
218 hash_entry = hash_search(seen_rels, &child_oid, HASH_ENTER, &found);
219 if (found)
220 {
221 /* if the rel is already there, bump number-of-parents counter */
222 lfirst_int(hash_entry->numparents_cell)++;
223 }
224 else
225 {
226 /* if it's not there, add it. expect 1 parent, initially. */
227 rels_list = lappend_oid(rels_list, child_oid);
228 rel_numparents = lappend_int(rel_numparents, 1);
229 hash_entry->numparents_cell = rel_numparents->tail;
230 }
231 }
232 }
233
234 if (numparents)
235 *numparents = rel_numparents;
236 else
237 list_free(rel_numparents);
238
239 hash_destroy(seen_rels);
240
241 return rels_list;
242 }
243
244
245 /*
246 * has_subclass - does this relation have any children?
247 *
248 * In the current implementation, has_subclass returns whether a
249 * particular class *might* have a subclass. It will not return the
250 * correct result if a class had a subclass which was later dropped.
251 * This is because relhassubclass in pg_class is not updated immediately
252 * when a subclass is dropped, primarily because of concurrency concerns.
253 *
254 * Currently has_subclass is only used as an efficiency hack to skip
255 * unnecessary inheritance searches, so this is OK. Note that ANALYZE
256 * on a childless table will clean up the obsolete relhassubclass flag.
257 *
258 * Although this doesn't actually touch pg_inherits, it seems reasonable
259 * to keep it here since it's normally used with the other routines here.
260 */
261 bool
has_subclass(Oid relationId)262 has_subclass(Oid relationId)
263 {
264 HeapTuple tuple;
265 bool result;
266
267 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationId));
268 if (!HeapTupleIsValid(tuple))
269 elog(ERROR, "cache lookup failed for relation %u", relationId);
270
271 result = ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass;
272 ReleaseSysCache(tuple);
273 return result;
274 }
275
276 /*
277 * has_superclass - does this relation inherit from another?
278 *
279 * Unlike has_subclass, this can be relied on to give an accurate answer.
280 * However, the caller must hold a lock on the given relation so that it
281 * can't be concurrently added to or removed from an inheritance hierarchy.
282 */
283 bool
has_superclass(Oid relationId)284 has_superclass(Oid relationId)
285 {
286 Relation catalog;
287 SysScanDesc scan;
288 ScanKeyData skey;
289 bool result;
290
291 catalog = heap_open(InheritsRelationId, AccessShareLock);
292 ScanKeyInit(&skey, Anum_pg_inherits_inhrelid, BTEqualStrategyNumber,
293 F_OIDEQ, ObjectIdGetDatum(relationId));
294 scan = systable_beginscan(catalog, InheritsRelidSeqnoIndexId, true,
295 NULL, 1, &skey);
296 result = HeapTupleIsValid(systable_getnext(scan));
297 systable_endscan(scan);
298 heap_close(catalog, AccessShareLock);
299
300 return result;
301 }
302
303 /*
304 * Given two type OIDs, determine whether the first is a complex type
305 * (class type) that inherits from the second.
306 */
307 bool
typeInheritsFrom(Oid subclassTypeId,Oid superclassTypeId)308 typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
309 {
310 bool result = false;
311 Oid subclassRelid;
312 Oid superclassRelid;
313 Relation inhrel;
314 List *visited,
315 *queue;
316 ListCell *queue_item;
317
318 /* We need to work with the associated relation OIDs */
319 subclassRelid = typeidTypeRelid(subclassTypeId);
320 if (subclassRelid == InvalidOid)
321 return false; /* not a complex type */
322 superclassRelid = typeidTypeRelid(superclassTypeId);
323 if (superclassRelid == InvalidOid)
324 return false; /* not a complex type */
325
326 /* No point in searching if the superclass has no subclasses */
327 if (!has_subclass(superclassRelid))
328 return false;
329
330 /*
331 * Begin the search at the relation itself, so add its relid to the queue.
332 */
333 queue = list_make1_oid(subclassRelid);
334 visited = NIL;
335
336 inhrel = heap_open(InheritsRelationId, AccessShareLock);
337
338 /*
339 * Use queue to do a breadth-first traversal of the inheritance graph from
340 * the relid supplied up to the root. Notice that we append to the queue
341 * inside the loop --- this is okay because the foreach() macro doesn't
342 * advance queue_item until the next loop iteration begins.
343 */
344 foreach(queue_item, queue)
345 {
346 Oid this_relid = lfirst_oid(queue_item);
347 ScanKeyData skey;
348 SysScanDesc inhscan;
349 HeapTuple inhtup;
350
351 /*
352 * If we've seen this relid already, skip it. This avoids extra work
353 * in multiple-inheritance scenarios, and also protects us from an
354 * infinite loop in case there is a cycle in pg_inherits (though
355 * theoretically that shouldn't happen).
356 */
357 if (list_member_oid(visited, this_relid))
358 continue;
359
360 /*
361 * Okay, this is a not-yet-seen relid. Add it to the list of
362 * already-visited OIDs, then find all the types this relid inherits
363 * from and add them to the queue.
364 */
365 visited = lappend_oid(visited, this_relid);
366
367 ScanKeyInit(&skey,
368 Anum_pg_inherits_inhrelid,
369 BTEqualStrategyNumber, F_OIDEQ,
370 ObjectIdGetDatum(this_relid));
371
372 inhscan = systable_beginscan(inhrel, InheritsRelidSeqnoIndexId, true,
373 NULL, 1, &skey);
374
375 while ((inhtup = systable_getnext(inhscan)) != NULL)
376 {
377 Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup);
378 Oid inhparent = inh->inhparent;
379
380 /* If this is the target superclass, we're done */
381 if (inhparent == superclassRelid)
382 {
383 result = true;
384 break;
385 }
386
387 /* Else add to queue */
388 queue = lappend_oid(queue, inhparent);
389 }
390
391 systable_endscan(inhscan);
392
393 if (result)
394 break;
395 }
396
397 /* clean up ... */
398 heap_close(inhrel, AccessShareLock);
399
400 list_free(visited);
401 list_free(queue);
402
403 return result;
404 }
405