1 /*-------------------------------------------------------------------------
2 *
3 * dependency.c
4 * Functions to reason about distributed objects and their dependencies
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12
13 #include "distributed/pg_version_constants.h"
14
15 #include "access/genam.h"
16 #include "access/heapam.h"
17 #include "access/htup_details.h"
18 #include "access/skey.h"
19 #include "access/sysattr.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_class.h"
24 #include "catalog/pg_depend.h"
25 #include "catalog/pg_namespace.h"
26 #include "catalog/pg_proc_d.h"
27 #include "catalog/pg_rewrite.h"
28 #include "catalog/pg_rewrite_d.h"
29 #include "catalog/pg_shdepend.h"
30 #include "catalog/pg_type.h"
31 #if PG_VERSION_NUM >= PG_VERSION_13
32 #include "common/hashfn.h"
33 #endif
34 #include "distributed/commands.h"
35 #include "distributed/commands/utility_hook.h"
36 #include "distributed/listutils.h"
37 #include "distributed/metadata/dependency.h"
38 #include "distributed/metadata/distobject.h"
39 #include "distributed/metadata_cache.h"
40 #include "distributed/version_compat.h"
41 #include "miscadmin.h"
42 #include "utils/fmgroids.h"
43 #include "utils/hsearch.h"
44 #include "utils/lsyscache.h"
45
46 /*
47 * ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used
48 * together with RecurseObjectDependencies.
49 *
50 * We keep three different datastructures for the following reasons
51 * - A List ordered by insert/collect order
52 * - A Set to quickly O(1) check if an ObjectAddress has already been collected
53 * - A set to check which objects are already visited
54 */
55 typedef struct ObjectAddressCollector
56 {
57 List *dependencyList;
58 HTAB *dependencySet;
59
60 HTAB *visitedObjects;
61 } ObjectAddressCollector;
62
63 /*
64 * DependencyMode distinguishes the data stored in DependencyDefinition. For details see
65 * DependencyDefinition's inline comments in the data union.
66 */
67 typedef enum DependencyMode
68 {
69 DependencyObjectAddress,
70 DependencyPgDepend,
71 DependencyPgShDepend
72 } DependencyMode;
73
74 typedef struct DependencyDefinition
75 {
76 /* describe how the dependency data is stored in the data field */
77 DependencyMode mode;
78
79 /*
80 * Dependencies can be found in different ways and therefore stored differently on the
81 * definition.
82 */
83 union
84 {
85 /*
86 * pg_depend is used for dependencies found in the database local pg_depend table.
87 * The entry is copied while scanning the table. The record can be inspected
88 * during the chasing algorithm to follow dependencies of different classes, or
89 * based on dependency type.
90 */
91 FormData_pg_depend pg_depend;
92
93 /*
94 * pg_shdepend is used for dependencies found in the global pg_shdepend table.
95 * The entry is copied while scanning the table. The record can be inspected
96 * during the chasing algorithm to follow dependencies of different classes, or
97 * based on dependency type.
98 */
99 FormData_pg_shdepend pg_shdepend;
100
101 /*
102 * address is used for dependencies that are artificially added during the
103 * chasing. Since they are added by citus code we assume the dependency needs to
104 * be chased anyway, ofcourse it will only actually be chased if the object is a
105 * supported object by citus
106 */
107 ObjectAddress address;
108 } data;
109 } DependencyDefinition;
110
111 /*
112 * ViewDependencyNode represents a view (or possibly a table) in a dependency graph of
113 * views.
114 */
115 typedef struct ViewDependencyNode
116 {
117 Oid id;
118 int remainingDependencyCount;
119 List *dependingNodes;
120 }ViewDependencyNode;
121
122
123 static List * GetRelationTriggerFunctionDepencyList(Oid relationId);
124 static List * GetRelationStatsSchemaDependencyList(Oid relationId);
125 static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId);
126 static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
127
128 /* forward declarations for functions to interact with the ObjectAddressCollector */
129 static void InitObjectAddressCollector(ObjectAddressCollector *collector);
130 static void CollectObjectAddress(ObjectAddressCollector *collector,
131 const ObjectAddress *address);
132 static bool IsObjectAddressCollected(ObjectAddress findAddress,
133 ObjectAddressCollector *collector);
134 static void MarkObjectVisited(ObjectAddressCollector *collector,
135 ObjectAddress target);
136 static bool TargetObjectVisited(ObjectAddressCollector *collector,
137 ObjectAddress target);
138
139 typedef List *(*expandFn)(ObjectAddressCollector *collector, ObjectAddress target);
140 typedef bool (*followFn)(ObjectAddressCollector *collector,
141 DependencyDefinition *definition);
142 typedef void (*applyFn)(ObjectAddressCollector *collector,
143 DependencyDefinition *definition);
144
145 /* forward declaration of functions that recurse pg_depend */
146 static void RecurseObjectDependencies(ObjectAddress target, expandFn expand,
147 followFn follow, applyFn apply,
148 ObjectAddressCollector *collector);
149 static List * DependencyDefinitionFromPgDepend(ObjectAddress target);
150 static List * DependencyDefinitionFromPgShDepend(ObjectAddress target);
151 static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector,
152 DependencyDefinition *definition);
153 static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector,
154 DependencyDefinition *definition);
155 static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
156 DependencyDefinition *definition);
157 static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
158 ObjectAddress target);
159 static ViewDependencyNode * BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap);
160 static Oid GetDependingView(Form_pg_depend pg_depend);
161
162
163 /*
164 * GetUniqueDependenciesList takes a list of object addresses and returns a new list
165 * of ObjectAddesses whose elements are unique.
166 */
167 List *
GetUniqueDependenciesList(List * objectAddressesList)168 GetUniqueDependenciesList(List *objectAddressesList)
169 {
170 ObjectAddressCollector objectAddressCollector = { 0 };
171 InitObjectAddressCollector(&objectAddressCollector);
172
173 ObjectAddress *objectAddress = NULL;
174 foreach_ptr(objectAddress, objectAddressesList)
175 {
176 if (IsObjectAddressCollected(*objectAddress, &objectAddressCollector))
177 {
178 /* skip objects that are already collected */
179 continue;
180 }
181
182 CollectObjectAddress(&objectAddressCollector, objectAddress);
183 }
184
185 return objectAddressCollector.dependencyList;
186 }
187
188
189 /*
190 * GetDependenciesForObject returns a list of ObjectAddesses to be created in order
191 * before the target object could safely be created on a worker. Some of the object might
192 * already be created on a worker. It should be created in an idempotent way.
193 */
194 List *
GetDependenciesForObject(const ObjectAddress * target)195 GetDependenciesForObject(const ObjectAddress *target)
196 {
197 ObjectAddressCollector collector = { 0 };
198 InitObjectAddressCollector(&collector);
199
200 RecurseObjectDependencies(*target,
201 &ExpandCitusSupportedTypes,
202 &FollowNewSupportedDependencies,
203 &ApplyAddToDependencyList,
204 &collector);
205
206 return collector.dependencyList;
207 }
208
209
210 /*
211 * OrderObjectAddressListInDependencyOrder given a list of ObjectAddresses return a new
212 * list of the same ObjectAddresses ordered on dependency order where dependencies
213 * precedes the corresponding object in the list.
214 *
215 * The algortihm traveses pg_depend in a depth first order starting at the first object in
216 * the provided list. By traversing depth first it will put the first dependency at the
217 * head of the list with dependencies depending on them later.
218 *
219 * If the object is already in the list it is skipped for traversal. This happens when an
220 * object was already added to the target list before it occurred in the input list.
221 */
222 List *
OrderObjectAddressListInDependencyOrder(List * objectAddressList)223 OrderObjectAddressListInDependencyOrder(List *objectAddressList)
224 {
225 ObjectAddressCollector collector = { 0 };
226 InitObjectAddressCollector(&collector);
227
228 ObjectAddress *objectAddress = NULL;
229 foreach_ptr(objectAddress, objectAddressList)
230 {
231 if (IsObjectAddressCollected(*objectAddress, &collector))
232 {
233 /* skip objects that are already ordered */
234 continue;
235 }
236
237 RecurseObjectDependencies(*objectAddress,
238 &ExpandCitusSupportedTypes,
239 &FollowAllSupportedDependencies,
240 &ApplyAddToDependencyList,
241 &collector);
242
243 CollectObjectAddress(&collector, objectAddress);
244 }
245
246 return collector.dependencyList;
247 }
248
249
250 /*
251 * RecurseObjectDependencies recursively visits all dependencies of an object. It sources
252 * the dependencies from pg_depend and pg_shdepend while 'expanding' the list via an
253 * optional `expand` function.
254 *
255 * Starting from the target ObjectAddress. For every dependency found the `follow`
256 * function will be called. When `follow` returns true it will recursively visit the
257 * dependencies for that object.
258 *
259 * Visiting will happen in depth first order, which is useful to create or sorted lists of
260 * dependencies to create.
261 *
262 * For all dependencies that should be visited the apply function will be called. This
263 * function is designed to be the mutating function for the context being passed. Although
264 * nothing prevents the follow function to also mutate the context.
265 *
266 * - follow will be called on the way down, so the invocation order is top to bottom of
267 * the dependency tree
268 * - apply is called on the way back, so the invocation order is bottom to top. Apply is
269 * not called for entries for which follow has returned false.
270 */
271 static void
RecurseObjectDependencies(ObjectAddress target,expandFn expand,followFn follow,applyFn apply,ObjectAddressCollector * collector)272 RecurseObjectDependencies(ObjectAddress target, expandFn expand, followFn follow,
273 applyFn apply, ObjectAddressCollector *collector)
274 {
275 if (TargetObjectVisited(collector, target))
276 {
277 /* prevent infinite loops due to circular dependencies */
278 return;
279 }
280
281 MarkObjectVisited(collector, target);
282
283 /* lookup both pg_depend and pg_shdepend for dependencies */
284 List *pgDependDefinitions = DependencyDefinitionFromPgDepend(target);
285 List *pgShDependDefinitions = DependencyDefinitionFromPgShDepend(target);
286 List *dependenyDefinitionList = list_concat(pgDependDefinitions,
287 pgShDependDefinitions);
288
289 /* concat expanded entries if applicable */
290 if (expand != NULL)
291 {
292 List *expandedEntries = expand(collector, target);
293 dependenyDefinitionList = list_concat(dependenyDefinitionList, expandedEntries);
294 }
295
296 /* iterate all entries and recurse depth first */
297 DependencyDefinition *dependencyDefinition = NULL;
298 foreach_ptr(dependencyDefinition, dependenyDefinitionList)
299 {
300 if (follow == NULL || !follow(collector, dependencyDefinition))
301 {
302 /* skip all pg_depend entries the user didn't want to follow */
303 continue;
304 }
305
306 /*
307 * recurse depth first, this makes sure we call apply for the deepest dependency
308 * first.
309 */
310 ObjectAddress address = DependencyDefinitionObjectAddress(dependencyDefinition);
311 RecurseObjectDependencies(address, expand, follow, apply, collector);
312
313 /* now apply changes for current entry */
314 if (apply != NULL)
315 {
316 apply(collector, dependencyDefinition);
317 }
318 }
319 }
320
321
322 /*
323 * DependencyDefinitionFromPgDepend loads all pg_depend records describing the
324 * dependencies of target.
325 */
326 static List *
DependencyDefinitionFromPgDepend(ObjectAddress target)327 DependencyDefinitionFromPgDepend(ObjectAddress target)
328 {
329 ScanKeyData key[2];
330 HeapTuple depTup = NULL;
331 List *dependenyDefinitionList = NIL;
332
333 /*
334 * iterate the actual pg_depend catalog
335 */
336 Relation depRel = table_open(DependRelationId, AccessShareLock);
337
338 /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
339 ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
340 ObjectIdGetDatum(target.classId));
341 ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
342 ObjectIdGetDatum(target.objectId));
343 SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2,
344 key);
345
346 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
347 {
348 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
349 DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
350
351 /* keep track of all pg_depend records as dependency definitions */
352 dependency->mode = DependencyPgDepend;
353 dependency->data.pg_depend = *pg_depend;
354 dependenyDefinitionList = lappend(dependenyDefinitionList, dependency);
355 }
356
357 systable_endscan(depScan);
358 relation_close(depRel, AccessShareLock);
359
360 return dependenyDefinitionList;
361 }
362
363
364 /*
365 * DependencyDefinitionFromPgDepend loads all pg_shdepend records describing the
366 * dependencies of target.
367 */
368 static List *
DependencyDefinitionFromPgShDepend(ObjectAddress target)369 DependencyDefinitionFromPgShDepend(ObjectAddress target)
370 {
371 ScanKeyData key[3];
372 HeapTuple depTup = NULL;
373 List *dependenyDefinitionList = NIL;
374
375 /*
376 * iterate the actual pg_shdepend catalog
377 */
378 Relation shdepRel = table_open(SharedDependRelationId, AccessShareLock);
379
380 /*
381 * Scan pg_shdepend for dbid = $1 AND classid = $2 AND objid = $3 using
382 * pg_shdepend_depender_index
383 *
384 * where $1 is decided as follows:
385 * - shared dependencies $1 = InvalidOid
386 * - other dependencies $1 = MyDatabaseId
387 * This is consistent with postgres' static classIdGetDbId function
388 */
389 Oid dbid = InvalidOid;
390 if (!IsSharedRelation(target.classId))
391 {
392 dbid = MyDatabaseId;
393 }
394 ScanKeyInit(&key[0], Anum_pg_shdepend_dbid, BTEqualStrategyNumber, F_OIDEQ,
395 ObjectIdGetDatum(dbid));
396 ScanKeyInit(&key[1], Anum_pg_shdepend_classid, BTEqualStrategyNumber, F_OIDEQ,
397 ObjectIdGetDatum(target.classId));
398 ScanKeyInit(&key[2], Anum_pg_shdepend_objid, BTEqualStrategyNumber, F_OIDEQ,
399 ObjectIdGetDatum(target.objectId));
400 SysScanDesc shdepScan = systable_beginscan(shdepRel, SharedDependDependerIndexId,
401 true, NULL, 3, key);
402
403 while (HeapTupleIsValid(depTup = systable_getnext(shdepScan)))
404 {
405 Form_pg_shdepend pg_shdepend = (Form_pg_shdepend) GETSTRUCT(depTup);
406 DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
407
408 /* keep track of all pg_shdepend records as dependency definitions */
409 dependency->mode = DependencyPgShDepend;
410 dependency->data.pg_shdepend = *pg_shdepend;
411 dependenyDefinitionList = lappend(dependenyDefinitionList, dependency);
412 }
413
414 systable_endscan(shdepScan);
415 relation_close(shdepRel, AccessShareLock);
416
417 return dependenyDefinitionList;
418 }
419
420
421 /*
422 * InitObjectAddressCollector takes a pointer to an already allocated (possibly stack)
423 * ObjectAddressCollector struct. It makes sure this struct is ready to be used for object
424 * collection.
425 *
426 * If an already initialized collector is passed the collector will be cleared from its
427 * contents to be reused.
428 */
429 static void
InitObjectAddressCollector(ObjectAddressCollector * collector)430 InitObjectAddressCollector(ObjectAddressCollector *collector)
431 {
432 HASHCTL info;
433
434 memset(&info, 0, sizeof(info));
435 info.keysize = sizeof(ObjectAddress);
436 info.entrysize = sizeof(ObjectAddress);
437 info.hcxt = CurrentMemoryContext;
438 int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
439
440 collector->dependencySet = hash_create("dependency set", 128, &info, hashFlags);
441 collector->dependencyList = NULL;
442
443 collector->visitedObjects = hash_create("visited object set", 128, &info, hashFlags);
444 }
445
446
447 /*
448 * TargetObjectVisited returns true if the input target has been visited while
449 * traversing pg_depend.
450 */
451 static bool
TargetObjectVisited(ObjectAddressCollector * collector,ObjectAddress target)452 TargetObjectVisited(ObjectAddressCollector *collector, ObjectAddress target)
453 {
454 bool found = false;
455
456 /* find in set */
457 hash_search(collector->visitedObjects, &target, HASH_FIND, &found);
458
459 return found;
460 }
461
462
463 /*
464 * MarkObjectVisited marks the object as visited during the traversal of
465 * pg_depend.
466 */
467 static void
MarkObjectVisited(ObjectAddressCollector * collector,ObjectAddress target)468 MarkObjectVisited(ObjectAddressCollector *collector, ObjectAddress target)
469 {
470 bool found = false;
471
472 /* add to set */
473 ObjectAddress *address = (ObjectAddress *) hash_search(collector->visitedObjects,
474 &target,
475 HASH_ENTER, &found);
476
477 if (!found)
478 {
479 /* copy object address in */
480 *address = target;
481 }
482 }
483
484
485 /*
486 * CollectObjectAddress adds an ObjectAddress to the collector.
487 */
488 static void
CollectObjectAddress(ObjectAddressCollector * collector,const ObjectAddress * collect)489 CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *collect)
490 {
491 bool found = false;
492
493 /* add to set */
494 ObjectAddress *address = (ObjectAddress *) hash_search(collector->dependencySet,
495 collect,
496 HASH_ENTER, &found);
497
498 if (!found)
499 {
500 /* copy object address in */
501 *address = *collect;
502 }
503
504 /* add to list*/
505 collector->dependencyList = lappend(collector->dependencyList, address);
506 }
507
508
509 /*
510 * IsObjectAddressCollected is a helper function that can check if an ObjectAddress is
511 * already in a (unsorted) list of ObjectAddresses
512 */
513 static bool
IsObjectAddressCollected(ObjectAddress findAddress,ObjectAddressCollector * collector)514 IsObjectAddressCollected(ObjectAddress findAddress,
515 ObjectAddressCollector *collector)
516 {
517 bool found = false;
518
519 /* add to set */
520 hash_search(collector->dependencySet, &findAddress, HASH_FIND, &found);
521
522 return found;
523 }
524
525
526 /*
527 * SupportedDependencyByCitus returns whether citus has support to distribute the object
528 * addressed.
529 */
530 bool
SupportedDependencyByCitus(const ObjectAddress * address)531 SupportedDependencyByCitus(const ObjectAddress *address)
532 {
533 if (!EnableDependencyCreation)
534 {
535 /*
536 * If the user has disabled object propagation we need to fall back to the legacy
537 * behaviour in which we only support schema creation
538 */
539 switch (getObjectClass(address))
540 {
541 case OCLASS_SCHEMA:
542 {
543 return true;
544 }
545
546 default:
547 {
548 return false;
549 }
550 }
551
552 /* should be unreachable */
553 Assert(false);
554 }
555
556 /*
557 * looking at the type of a object to see if we know how to create the object on the
558 * workers.
559 */
560 switch (getObjectClass(address))
561 {
562 case OCLASS_AM:
563 {
564 /*
565 * Only support access methods if they came from extensions
566 * During the dependency resolution it will cascade into the extension and
567 * distributed that one instead of the Access Method. Now access methods can
568 * be configured on tables on the workers.
569 */
570 return IsObjectAddressOwnedByExtension(address, NULL);
571 }
572
573 case OCLASS_COLLATION:
574 case OCLASS_SCHEMA:
575 {
576 return true;
577 }
578
579 case OCLASS_PROC:
580 {
581 return true;
582 }
583
584 case OCLASS_DATABASE:
585 {
586 /* only to propagate its owner */
587 return true;
588 }
589
590 case OCLASS_ROLE:
591 {
592 /*
593 * Community only supports the extension owner as a distributed object to
594 * propagate alter statements for this user
595 */
596 if (address->objectId == CitusExtensionOwner())
597 {
598 return true;
599 }
600
601 return false;
602 }
603
604 case OCLASS_EXTENSION:
605 {
606 return true;
607 }
608
609 case OCLASS_TYPE:
610 {
611 switch (get_typtype(address->objectId))
612 {
613 case TYPTYPE_ENUM:
614 case TYPTYPE_COMPOSITE:
615 {
616 return true;
617 }
618
619 case TYPTYPE_BASE:
620 {
621 /*
622 * array types should be followed but not created, as they get created
623 * by the original type.
624 */
625 return type_is_array(address->objectId);
626 }
627
628 default:
629 {
630 /* type not supported */
631 return false;
632 }
633 }
634
635 /*
636 * should be unreachable, break here is to make sure the function has a path
637 * without return, instead of falling through to the next block */
638 break;
639 }
640
641 case OCLASS_CLASS:
642 {
643 /*
644 * composite types have a reference to a relation of composite type, we need
645 * to follow those to get the dependencies of type fields.
646 */
647 if (get_rel_relkind(address->objectId) == RELKIND_COMPOSITE_TYPE)
648 {
649 return true;
650 }
651
652 return false;
653 }
654
655 default:
656 {
657 /* unsupported type */
658 return false;
659 }
660 }
661 }
662
663
664 /*
665 * IsTableOwnedByExtension returns whether the table with the given relation ID is
666 * owned by an extension.
667 */
668 bool
IsTableOwnedByExtension(Oid relationId)669 IsTableOwnedByExtension(Oid relationId)
670 {
671 ObjectAddress tableAddress = { 0 };
672 ObjectAddressSet(tableAddress, RelationRelationId, relationId);
673
674 return IsObjectAddressOwnedByExtension(&tableAddress, NULL);
675 }
676
677
678 /*
679 * IsObjectAddressOwnedByExtension returns whether or not the object is owned by an
680 * extension. It is assumed that an object having a dependency on an extension is created
681 * by that extension and therefore owned by that extension.
682 *
683 * If extensionAddress is not set to a NULL pointer the function will write the extension
684 * address this function depends on into this location.
685 */
686 bool
IsObjectAddressOwnedByExtension(const ObjectAddress * target,ObjectAddress * extensionAddress)687 IsObjectAddressOwnedByExtension(const ObjectAddress *target,
688 ObjectAddress *extensionAddress)
689 {
690 ScanKeyData key[2];
691 HeapTuple depTup = NULL;
692 bool result = false;
693
694 Relation depRel = table_open(DependRelationId, AccessShareLock);
695
696 /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
697 ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
698 ObjectIdGetDatum(target->classId));
699 ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
700 ObjectIdGetDatum(target->objectId));
701 SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2,
702 key);
703
704 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
705 {
706 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
707 if (pg_depend->deptype == DEPENDENCY_EXTENSION)
708 {
709 result = true;
710 if (extensionAddress != NULL)
711 {
712 ObjectAddressSubSet(*extensionAddress, pg_depend->refclassid,
713 pg_depend->refobjid, pg_depend->refobjsubid);
714 }
715 break;
716 }
717 }
718
719 systable_endscan(depScan);
720 table_close(depRel, AccessShareLock);
721
722 return result;
723 }
724
725
726 /*
727 * FollowNewSupportedDependencies applies filters on pg_depend entries to follow all
728 * objects which should be distributed before the root object can safely be created.
729 */
730 static bool
FollowNewSupportedDependencies(ObjectAddressCollector * collector,DependencyDefinition * definition)731 FollowNewSupportedDependencies(ObjectAddressCollector *collector,
732 DependencyDefinition *definition)
733 {
734 if (definition->mode == DependencyPgDepend)
735 {
736 /*
737 * For dependencies found in pg_depend:
738 *
739 * Follow only normal and extension dependencies. The latter is used to reach the
740 * extensions, the objects that directly depend on the extension are eliminated
741 * during the "apply" phase.
742 *
743 * Other dependencies are internal dependencies and managed by postgres.
744 */
745 if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL &&
746 definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION)
747 {
748 return false;
749 }
750 }
751
752 /* rest of the tests are to see if we want to follow the actual dependency */
753 ObjectAddress address = DependencyDefinitionObjectAddress(definition);
754
755 /*
756 * If the object is already in our dependency list we do not have to follow any
757 * further
758 */
759 if (IsObjectAddressCollected(address, collector))
760 {
761 return false;
762 }
763
764 /*
765 * If the object is already distributed it is not a `new` object that needs to be
766 * distributed before we create a dependant object
767 */
768 if (IsObjectDistributed(&address))
769 {
770 return false;
771 }
772
773 /*
774 * We can only distribute dependencies that citus knows how to distribute.
775 *
776 * But we don't want to bail out if the object is owned by extension, because
777 * Citus can create the extension.
778 */
779 if (!SupportedDependencyByCitus(&address) &&
780 !IsObjectAddressOwnedByExtension(&address, NULL))
781 {
782 return false;
783 }
784
785 if (CitusExtensionObject(&address))
786 {
787 /* following citus extension could complicate role management */
788 return false;
789 }
790
791 return true;
792 }
793
794
795 /*
796 * FollowAllSupportedDependencies applies filters on pg_depend entries to follow the
797 * dependency tree of objects in depth first order. We will visit all supported objects.
798 * This is used to sort a list of dependencies in dependency order.
799 */
800 static bool
FollowAllSupportedDependencies(ObjectAddressCollector * collector,DependencyDefinition * definition)801 FollowAllSupportedDependencies(ObjectAddressCollector *collector,
802 DependencyDefinition *definition)
803 {
804 if (definition->mode == DependencyPgDepend)
805 {
806 /*
807 * For dependencies found in pg_depend:
808 *
809 * Follow only normal and extension dependencies. The latter is used to reach the
810 * extensions, the objects that directly depend on the extension are eliminated
811 * during the "apply" phase.
812 *
813 * Other dependencies are internal dependencies and managed by postgres.
814 */
815 if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL &&
816 definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION)
817 {
818 return false;
819 }
820 }
821
822 /* rest of the tests are to see if we want to follow the actual dependency */
823 ObjectAddress address = DependencyDefinitionObjectAddress(definition);
824
825 /*
826 * If the object is already in our dependency list we do not have to follow any
827 * further
828 */
829 if (IsObjectAddressCollected(address, collector))
830 {
831 return false;
832 }
833
834 /*
835 * We can only distribute dependencies that citus knows how to distribute.
836 *
837 * But we don't want to bail out if the object is owned by extension, because
838 * Citus can create the extension.
839 */
840 if (!SupportedDependencyByCitus(&address) &&
841 !IsObjectAddressOwnedByExtension(&address, NULL))
842 {
843 return false;
844 }
845
846 if (CitusExtensionObject(&address))
847 {
848 /* following citus extension could complicate role management */
849 return false;
850 }
851
852 return true;
853 }
854
855
856 /*
857 * ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect
858 * all the ObjectAddresses for pg_depend entries to the context. The context here is
859 * assumed to be a (ObjectAddressCollector *) to the location where all ObjectAddresses
860 * will be collected.
861 */
862 static void
ApplyAddToDependencyList(ObjectAddressCollector * collector,DependencyDefinition * definition)863 ApplyAddToDependencyList(ObjectAddressCollector *collector,
864 DependencyDefinition *definition)
865 {
866 ObjectAddress address = DependencyDefinitionObjectAddress(definition);
867
868 /*
869 * Objects owned by an extension are assumed to be created on the workers by creating
870 * the extension in the cluster, we we don't want explicitly create them.
871 *
872 * Since we do need to capture the extension as a dependency we are following the
873 * object instead of breaking the traversal there.
874 */
875 if (IsObjectAddressOwnedByExtension(&address, NULL))
876 {
877 return;
878 }
879
880 CollectObjectAddress(collector, &address);
881 }
882
883
884 /*
885 * ExpandCitusSupportedTypes base on supported types by citus we might want to expand
886 * the list of objects to visit in pg_depend.
887 *
888 * An example where we want to expand is for types. Their dependencies are not captured
889 * with an entry in pg_depend from their object address, but by the object address of the
890 * relation describing the type.
891 */
892 static List *
ExpandCitusSupportedTypes(ObjectAddressCollector * collector,ObjectAddress target)893 ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress target)
894 {
895 List *result = NIL;
896
897 switch (target.classId)
898 {
899 case TypeRelationId:
900 {
901 /*
902 * types depending on other types are not captured in pg_depend, instead they
903 * are described with their dependencies by the relation that describes the
904 * composite type.
905 */
906 if (get_typtype(target.objectId) == TYPTYPE_COMPOSITE)
907 {
908 Oid typeRelationId = get_typ_typrelid(target.objectId);
909 DependencyDefinition *dependency =
910 CreateObjectAddressDependencyDef(RelationRelationId, typeRelationId);
911 result = lappend(result, dependency);
912 }
913
914 /*
915 * array types don't have a normal dependency on their element type, instead
916 * their dependency is an internal one. We can't follow interal dependencies
917 * as that would cause a cyclic dependency on others, instead we expand here
918 * to follow the dependency on the element type.
919 */
920 if (type_is_array(target.objectId))
921 {
922 Oid typeId = get_element_type(target.objectId);
923 DependencyDefinition *dependency =
924 CreateObjectAddressDependencyDef(TypeRelationId, typeId);
925 result = lappend(result, dependency);
926 }
927
928 break;
929 }
930
931 case RelationRelationId:
932 {
933 /*
934 * Triggers both depend to the relations and to the functions they
935 * execute. Also, pg_depend records dependencies from triggers to the
936 * functions but not from relations to their triggers. Given above two,
937 * we directly expand depencies for the relations to trigger functions.
938 * That way, we won't attempt to create the trigger as a dependency of
939 * the relation, which would fail as the relation itself is not created
940 * yet when ensuring dependencies.
941 */
942 Oid relationId = target.objectId;
943 List *triggerFunctionDepencyList =
944 GetRelationTriggerFunctionDepencyList(relationId);
945 result = list_concat(result, triggerFunctionDepencyList);
946
947 /*
948 * Statistics' both depend to the relations and to the schemas they belong
949 * to. Also, pg_depend records dependencies from statistics to their schemas
950 * but not from relations to their statistics' schemas. Given above two,
951 * we directly expand dependencies for the relations to schemas of
952 * statistics.
953 */
954 List *statisticsSchemaDependencyList =
955 GetRelationStatsSchemaDependencyList(relationId);
956 result = list_concat(result, statisticsSchemaDependencyList);
957 }
958
959 default:
960 {
961 /* no expansion for unsupported types */
962 break;
963 }
964 }
965 return result;
966 }
967
968
969 /*
970 * GetRelationStatsSchemaDependencyList returns a list of DependencyDefinition
971 * objects for the schemas that statistics' of the relation with relationId depends.
972 */
973 static List *
GetRelationStatsSchemaDependencyList(Oid relationId)974 GetRelationStatsSchemaDependencyList(Oid relationId)
975 {
976 List *dependencyList = NIL;
977
978 List *schemaIds = GetExplicitStatisticsSchemaIdList(relationId);
979 Oid schemaId = InvalidOid;
980 foreach_oid(schemaId, schemaIds)
981 {
982 DependencyDefinition *dependency =
983 CreateObjectAddressDependencyDef(NamespaceRelationId, schemaId);
984 dependencyList = lappend(dependencyList, dependency);
985 }
986
987 return dependencyList;
988 }
989
990
991 /*
992 * GetRelationTriggerFunctionDepencyList returns a list of DependencyDefinition
993 * objects for the functions that triggers of the relation with relationId depends.
994 */
995 static List *
GetRelationTriggerFunctionDepencyList(Oid relationId)996 GetRelationTriggerFunctionDepencyList(Oid relationId)
997 {
998 List *dependencyList = NIL;
999
1000 List *triggerIdList = GetExplicitTriggerIdList(relationId);
1001 Oid triggerId = InvalidOid;
1002 foreach_oid(triggerId, triggerIdList)
1003 {
1004 Oid functionId = GetTriggerFunctionId(triggerId);
1005 DependencyDefinition *dependency =
1006 CreateObjectAddressDependencyDef(ProcedureRelationId, functionId);
1007 dependencyList = lappend(dependencyList, dependency);
1008 }
1009
1010 return dependencyList;
1011 }
1012
1013
1014 /*
1015 * CreateObjectAddressDependencyDef returns DependencyDefinition object that
1016 * stores the ObjectAddress for the database object identified by classId and
1017 * objectId.
1018 */
1019 static DependencyDefinition *
CreateObjectAddressDependencyDef(Oid classId,Oid objectId)1020 CreateObjectAddressDependencyDef(Oid classId, Oid objectId)
1021 {
1022 DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
1023 dependency->mode = DependencyObjectAddress;
1024 ObjectAddressSet(dependency->data.address, classId, objectId);
1025 return dependency;
1026 }
1027
1028
1029 /*
1030 * DependencyDefinitionObjectAddress returns the object address of the dependency defined
1031 * by the dependency definition, irregardless what the source of the definition is
1032 */
1033 static ObjectAddress
DependencyDefinitionObjectAddress(DependencyDefinition * definition)1034 DependencyDefinitionObjectAddress(DependencyDefinition *definition)
1035 {
1036 switch (definition->mode)
1037 {
1038 case DependencyObjectAddress:
1039 {
1040 return definition->data.address;
1041 }
1042
1043 case DependencyPgDepend:
1044 {
1045 ObjectAddress address = { 0 };
1046 ObjectAddressSet(address,
1047 definition->data.pg_depend.refclassid,
1048 definition->data.pg_depend.refobjid);
1049 return address;
1050 }
1051
1052 case DependencyPgShDepend:
1053 {
1054 ObjectAddress address = { 0 };
1055 ObjectAddressSet(address,
1056 definition->data.pg_shdepend.refclassid,
1057 definition->data.pg_shdepend.refobjid);
1058 return address;
1059 }
1060 }
1061
1062 ereport(ERROR, (errmsg("unsupported dependency definition mode")));
1063 }
1064
1065
1066 /*
1067 * BuildViewDependencyGraph gets a relation (or a view) and builds a dependency graph for the
1068 * depending views.
1069 */
1070 static ViewDependencyNode *
BuildViewDependencyGraph(Oid relationId,HTAB * nodeMap)1071 BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap)
1072 {
1073 bool found = false;
1074 ViewDependencyNode *node = (ViewDependencyNode *) hash_search(nodeMap, &relationId,
1075 HASH_ENTER, &found);
1076
1077 if (found)
1078 {
1079 return node;
1080 }
1081
1082 node->id = relationId;
1083 node->remainingDependencyCount = 0;
1084 node->dependingNodes = NIL;
1085
1086 Oid targetObjectClassId = RelationRelationId;
1087 Oid targetObjectId = relationId;
1088 List *dependencyTupleList = GetPgDependTuplesForDependingObjects(targetObjectClassId,
1089 targetObjectId);
1090
1091 HeapTuple depTup = NULL;
1092 foreach_ptr(depTup, dependencyTupleList)
1093 {
1094 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
1095
1096 Oid dependingView = GetDependingView(pg_depend);
1097 if (dependingView != InvalidOid)
1098 {
1099 ViewDependencyNode *dependingNode = BuildViewDependencyGraph(dependingView,
1100 nodeMap);
1101
1102 node->dependingNodes = lappend(node->dependingNodes, dependingNode);
1103 dependingNode->remainingDependencyCount++;
1104 }
1105 }
1106
1107 return node;
1108 }
1109
1110
1111 /*
1112 * GetPgDependTuplesForDependingObjects scans pg_depend for given object and
1113 * returns a list of heap tuples for the objects depending on it.
1114 */
1115 List *
GetPgDependTuplesForDependingObjects(Oid targetObjectClassId,Oid targetObjectId)1116 GetPgDependTuplesForDependingObjects(Oid targetObjectClassId, Oid targetObjectId)
1117 {
1118 List *dependencyTupleList = NIL;
1119
1120 Relation pgDepend = table_open(DependRelationId, AccessShareLock);
1121
1122 ScanKeyData key[2];
1123 int scanKeyCount = 2;
1124
1125 ScanKeyInit(&key[0], Anum_pg_depend_refclassid, BTEqualStrategyNumber, F_OIDEQ,
1126 ObjectIdGetDatum(targetObjectClassId));
1127 ScanKeyInit(&key[1], Anum_pg_depend_refobjid, BTEqualStrategyNumber, F_OIDEQ,
1128 ObjectIdGetDatum(targetObjectId));
1129
1130 bool useIndex = true;
1131 SysScanDesc depScan = systable_beginscan(pgDepend, DependReferenceIndexId,
1132 useIndex, NULL, scanKeyCount, key);
1133
1134 HeapTuple dependencyTuple = NULL;
1135 while (HeapTupleIsValid(dependencyTuple = systable_getnext(depScan)))
1136 {
1137 /* copy the tuple first */
1138 dependencyTuple = heap_copytuple(dependencyTuple);
1139 dependencyTupleList = lappend(dependencyTupleList, dependencyTuple);
1140 }
1141
1142 systable_endscan(depScan);
1143 relation_close(pgDepend, NoLock);
1144
1145 return dependencyTupleList;
1146 }
1147
1148
1149 /*
1150 * GetDependingViews takes a relation id, finds the views that depend on the relation
1151 * and returns list of the oids of those views. It recurses on the pg_depend table to
1152 * find the views that recursively depend on the table.
1153 *
1154 * The returned views will have the correct order for creating them, from the point of
1155 * dependencies between.
1156 */
1157 List *
GetDependingViews(Oid relationId)1158 GetDependingViews(Oid relationId)
1159 {
1160 HASHCTL info;
1161 memset(&info, 0, sizeof(info));
1162 info.keysize = sizeof(Oid);
1163 info.entrysize = sizeof(ViewDependencyNode);
1164 info.hash = oid_hash;
1165 uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
1166 HTAB *nodeMap = hash_create("view dependency map (oid)", 32, &info, hashFlags);
1167
1168 ViewDependencyNode *tableNode = BuildViewDependencyGraph(relationId, nodeMap);
1169
1170 List *dependingViews = NIL;
1171 List *nodeQueue = list_make1(tableNode);
1172 ViewDependencyNode *node = NULL;
1173 foreach_ptr_append(node, nodeQueue)
1174 {
1175 ViewDependencyNode *dependingNode = NULL;
1176 foreach_ptr(dependingNode, node->dependingNodes)
1177 {
1178 dependingNode->remainingDependencyCount--;
1179 if (dependingNode->remainingDependencyCount == 0)
1180 {
1181 nodeQueue = lappend(nodeQueue, dependingNode);
1182 dependingViews = lappend_oid(dependingViews, dependingNode->id);
1183 }
1184 }
1185 }
1186 return dependingViews;
1187 }
1188
1189
1190 /*
1191 * GetDependingView gets a row of pg_depend and returns the oid of the view that is depended.
1192 * If the depended object is not a rewrite object, the object to rewrite is not a view or it
1193 * is the same view with the depending one InvalidOid is returned.
1194 */
1195 Oid
GetDependingView(Form_pg_depend pg_depend)1196 GetDependingView(Form_pg_depend pg_depend)
1197 {
1198 if (pg_depend->classid != RewriteRelationId)
1199 {
1200 return InvalidOid;
1201 }
1202
1203 Relation rewriteRel = table_open(RewriteRelationId, AccessShareLock);
1204 ScanKeyData rkey[1];
1205
1206 ScanKeyInit(&rkey[0],
1207 Anum_pg_rewrite_oid,
1208 BTEqualStrategyNumber, F_OIDEQ,
1209 ObjectIdGetDatum(pg_depend->objid));
1210
1211 SysScanDesc rscan = systable_beginscan(rewriteRel, RewriteOidIndexId,
1212 true, NULL, 1, rkey);
1213
1214 HeapTuple rewriteTup = systable_getnext(rscan);
1215 if (!HeapTupleIsValid(rewriteTup))
1216 {
1217 /*
1218 * This function already verified that objid's classid is
1219 * RewriteRelationId, so it should exists. But be on the
1220 * safe side.
1221 */
1222 ereport(ERROR, (errmsg("catalog lookup failed for view %u",
1223 pg_depend->objid)));
1224 }
1225
1226 Form_pg_rewrite pg_rewrite = (Form_pg_rewrite) GETSTRUCT(rewriteTup);
1227
1228 bool isView = get_rel_relkind(pg_rewrite->ev_class) == RELKIND_VIEW;
1229 bool isMatView = get_rel_relkind(pg_rewrite->ev_class) == RELKIND_MATVIEW;
1230 bool isDifferentThanRef = pg_rewrite->ev_class != pg_depend->refobjid;
1231
1232 Oid dependingView = InvalidOid;
1233 if ((isView || isMatView) && isDifferentThanRef)
1234 {
1235 dependingView = pg_rewrite->ev_class;
1236 }
1237
1238 systable_endscan(rscan);
1239 relation_close(rewriteRel, AccessShareLock);
1240
1241 return dependingView;
1242 }
1243