1 /*------------------------------------------------------------------------- 2 * 3 * metadata_utility.h 4 * Type and function declarations used for reading and modifying 5 * coordinator node's metadata. 6 * 7 * Copyright (c) Citus Data, Inc. 8 * 9 * $Id$ 10 * 11 *------------------------------------------------------------------------- 12 */ 13 14 #ifndef METADATA_UTILITY_H 15 #define METADATA_UTILITY_H 16 17 #include "postgres.h" 18 19 #include "access/heapam.h" 20 #include "access/htup.h" 21 #include "access/tupdesc.h" 22 #include "catalog/indexing.h" 23 #include "catalog/objectaddress.h" 24 #include "distributed/citus_nodes.h" 25 #include "distributed/connection_management.h" 26 #include "distributed/relay_utility.h" 27 #include "utils/acl.h" 28 #include "utils/relcache.h" 29 30 31 /* total number of hash tokens (2^32) */ 32 #define HASH_TOKEN_COUNT INT64CONST(4294967296) 33 #define SELECT_TRUE_QUERY "SELECT TRUE FROM %s LIMIT 1" 34 #define PG_TABLE_SIZE_FUNCTION "pg_table_size(%s)" 35 #define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)" 36 #define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)" 37 #define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)" 38 #define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)" 39 #define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)" 40 #define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ 41 "worker_partitioned_relation_total_size(%s)" 42 43 #define SHARD_SIZES_COLUMN_COUNT 2 44 #define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4 45 46 /* In-memory representation of a typed tuple in pg_dist_shard. */ 47 typedef struct ShardInterval 48 { 49 CitusNode type; 50 Oid relationId; 51 char storageType; 52 Oid valueTypeId; /* min/max value datum's typeId */ 53 int valueTypeLen; /* min/max value datum's typelen */ 54 bool valueByVal; /* min/max value datum's byval */ 55 bool minValueExists; 56 bool maxValueExists; 57 Datum minValue; /* a shard's typed min value datum */ 58 Datum maxValue; /* a shard's typed max value datum */ 59 uint64 shardId; 60 int shardIndex; 61 } ShardInterval; 62 63 64 /* In-memory representation of a tuple in pg_dist_placement. */ 65 typedef struct GroupShardPlacement 66 { 67 CitusNode type; 68 uint64 placementId; /* sequence that implies this placement creation order */ 69 uint64 shardId; 70 uint64 shardLength; 71 ShardState shardState; 72 int32 groupId; 73 } GroupShardPlacement; 74 75 76 /* A GroupShardPlacement which has had some extra data resolved */ 77 typedef struct ShardPlacement 78 { 79 /* 80 * careful, the rest of the code assumes this exactly matches GroupShardPlacement 81 */ 82 CitusNode type; 83 uint64 placementId; 84 uint64 shardId; 85 uint64 shardLength; 86 ShardState shardState; 87 int32 groupId; 88 89 /* the rest of the fields aren't from pg_dist_placement */ 90 char *nodeName; 91 uint32 nodePort; 92 uint32 nodeId; 93 char partitionMethod; 94 uint32 colocationGroupId; 95 uint32 representativeValue; 96 } ShardPlacement; 97 98 99 typedef enum CascadeToColocatedOption 100 { 101 CASCADE_TO_COLOCATED_UNSPECIFIED, 102 CASCADE_TO_COLOCATED_YES, 103 CASCADE_TO_COLOCATED_NO, 104 CASCADE_TO_COLOCATED_NO_ALREADY_CASCADED 105 }CascadeToColocatedOption; 106 107 /* 108 * TableConversionParameters are the parameters that are given to 109 * table conversion UDFs: undistribute_table, alter_distributed_table, 110 * alter_table_set_access_method. 111 * 112 * When passing a TableConversionParameters object to one of the table 113 * conversion functions some of the parameters needs to be set: 114 * UndistributeTable: relationId 115 * AlterDistributedTable: relationId, distributionColumn, shardCountIsNull, 116 * shardCount, colocateWith, cascadeToColocated 117 * AlterTableSetAccessMethod: relationId, accessMethod 118 * 119 * conversionType parameter will be automatically set by the function. 120 * 121 * TableConversionState objects can be created using TableConversionParameters 122 * objects with CreateTableConversion function. 123 */ 124 typedef struct TableConversionParameters 125 { 126 /* 127 * Determines type of conversion: UNDISTRIBUTE_TABLE, 128 * ALTER_DISTRIBUTED_TABLE, ALTER_TABLE_SET_ACCESS_METHOD. 129 */ 130 char conversionType; 131 132 /* Oid of the table to do conversion on */ 133 Oid relationId; 134 135 /* 136 * Options to do conversions on the table 137 * distributionColumn is the name of the new distribution column, 138 * shardCountIsNull is if the shardCount variable is not given 139 * shardCount is the new shard count, 140 * colocateWith is the name of the table to colocate with, 'none', or 141 * 'default' 142 * accessMethod is the name of the new accessMethod for the table 143 */ 144 char *distributionColumn; 145 bool shardCountIsNull; 146 int shardCount; 147 char *colocateWith; 148 char *accessMethod; 149 150 /* 151 * cascadeToColocated determines whether the shardCount and 152 * colocateWith will be cascaded to the currently colocated tables 153 */ 154 CascadeToColocatedOption cascadeToColocated; 155 156 /* 157 * cascadeViaForeignKeys determines if the conversion operation 158 * will be cascaded to the graph connected with foreign keys 159 * to the table 160 */ 161 bool cascadeViaForeignKeys; 162 163 /* 164 * suppressNoticeMessages determines if we want to suppress NOTICE 165 * messages that we explicitly issue 166 */ 167 bool suppressNoticeMessages; 168 } TableConversionParameters; 169 170 typedef struct TableConversionReturn 171 { 172 /* 173 * commands to create foreign keys for the table 174 * 175 * When the table conversion is cascaded we can recreate 176 * some of the foreign keys of the cascaded tables. So with this 177 * list we can return it to the initial conversion operation so 178 * foreign keys can be created after every colocated table is 179 * converted. 180 */ 181 List *foreignKeyCommands; 182 }TableConversionReturn; 183 184 185 /* 186 * Size query types for PG and Citus 187 * For difference details, please see: 188 * https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-ADMIN-DBSIZE 189 */ 190 typedef enum SizeQueryType 191 { 192 RELATION_SIZE, /* pg_relation_size() */ 193 TOTAL_RELATION_SIZE, /* pg_total_relation_size() */ 194 TABLE_SIZE, /* pg_table_size() */ 195 CSTORE_TABLE_SIZE /* cstore_table_size() */ 196 } SizeQueryType; 197 198 199 /* Size functions */ 200 extern Datum citus_table_size(PG_FUNCTION_ARGS); 201 extern Datum citus_total_relation_size(PG_FUNCTION_ARGS); 202 extern Datum citus_relation_size(PG_FUNCTION_ARGS); 203 204 /* Function declarations to read shard and shard placement data */ 205 extern uint32 TableShardReplicationFactor(Oid relationId); 206 extern List * LoadShardIntervalList(Oid relationId); 207 extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId); 208 extern int ShardIntervalCount(Oid relationId); 209 extern List * LoadShardList(Oid relationId); 210 extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); 211 extern uint64 ShardLength(uint64 shardId); 212 extern bool NodeGroupHasLivePlacements(int32 groupId); 213 extern bool NodeGroupHasShardPlacements(int32 groupId, 214 bool onlyConsiderActivePlacements); 215 extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); 216 extern List * ActiveShardPlacementList(uint64 shardId); 217 extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); 218 extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); 219 extern List * BuildShardPlacementList(int64 shardId); 220 extern List * AllShardPlacementsOnNodeGroup(int32 groupId); 221 extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); 222 extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); 223 extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, 224 SizeQueryType sizeQueryType, 225 bool optimizePartitionCalculations); 226 extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); 227 228 /* Function declarations to modify shard and shard placement data */ 229 extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, 230 text *shardMinValue, text *shardMaxValue); 231 extern void DeleteShardRow(uint64 shardId); 232 extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, 233 char shardState, uint64 shardLength, 234 int32 groupId); 235 extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, 236 Var *distributionColumn, uint32 colocationId, 237 char replicationModel); 238 extern void DeletePartitionRow(Oid distributedRelationId); 239 extern void DeleteShardRow(uint64 shardId); 240 extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, 241 char shardState); 242 extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement); 243 extern void UpdateShardPlacementState(uint64 placementId, char shardState); 244 extern void UpdatePlacementGroupId(uint64 placementId, int groupId); 245 extern void DeleteShardPlacementRow(uint64 placementId); 246 extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, 247 char distributionMethod, int shardCount, 248 bool shardCountIsStrict, char *colocateWithTableName, 249 bool viaDeprecatedAPI); 250 extern void CreateTruncateTrigger(Oid relationId); 251 extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); 252 253 extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); 254 extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); 255 extern bool ShouldPropagate(void); 256 extern bool ShouldPropagateObject(const ObjectAddress *address); 257 extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort); 258 259 /* Remaining metadata utility functions */ 260 extern char * TableOwner(Oid relationId); 261 extern void EnsureTablePermissions(Oid relationId, AclMode mode); 262 extern void EnsureTableOwner(Oid relationId); 263 extern void EnsureSchemaOwner(Oid schemaId); 264 extern void EnsureHashDistributedTable(Oid relationId); 265 extern void EnsureFunctionOwner(Oid functionId); 266 extern void EnsureSuperUser(void); 267 extern void ErrorIfTableIsACatalogTable(Relation relation); 268 extern void EnsureTableNotDistributed(Oid relationId); 269 extern void EnsureRelationExists(Oid relationId); 270 extern bool RegularTable(Oid relationId); 271 extern bool TableEmpty(Oid tableId); 272 extern bool RelationUsesIdentityColumns(TupleDesc relationDesc); 273 extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); 274 extern uint64 GetFirstShardId(Oid relationId); 275 extern Datum StringToDatum(char *inputString, Oid dataType); 276 extern char * DatumToString(Datum datum, Oid dataType); 277 extern int CompareShardPlacementsByWorker(const void *leftElement, 278 const void *rightElement); 279 extern int CompareShardPlacementsByGroupId(const void *leftElement, 280 const void *rightElement); 281 extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray, 282 bool *isNullArray, 283 Oid intervalTypeId, 284 int32 intervalTypeMod); 285 extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, 286 Oid *intervalTypeId, int32 *intervalTypeMod); 287 extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool 288 useDistributedTransaction, bool 289 useShardMinMaxQuery); 290 extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, 291 uint64 *availableBytes, 292 uint64 *totalBytes); 293 extern void ExecuteQueryViaSPI(char *query, int SPIOK); 294 extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId); 295 extern void AlterSequenceType(Oid seqOid, Oid typeOid); 296 extern void MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList); 297 extern void MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid); 298 extern void EnsureDistributedSequencesHaveOneType(Oid relationId, 299 List *dependentSequenceList, 300 List *attnumList); 301 #endif /* METADATA_UTILITY_H */ 302