1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <foreign/foreign.h>
8 #include <utils/hsearch.h>
9 #include <utils/fmgrprotos.h>
10 #include <parser/parsetree.h>
11 #include <nodes/bitmapset.h>
12
13 #include "data_node_chunk_assignment.h"
14 #include "dimension.h"
15 #include "dimension_slice.h"
16 #include "dimension_vector.h"
17 #include "hypercube.h"
18 #include "chunk.h"
19 #include "chunk_data_node.h"
20
21 static int
get_remote_chunk_id_from_relid(Oid server_oid,Oid chunk_relid)22 get_remote_chunk_id_from_relid(Oid server_oid, Oid chunk_relid)
23 {
24 Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
25 ForeignServer *fs = GetForeignServer(server_oid);
26 ChunkDataNode *cdn;
27
28 Assert(chunk != NULL);
29 cdn = ts_chunk_data_node_scan_by_chunk_id_and_node_name(chunk->fd.id,
30 fs->servername,
31 CurrentMemoryContext);
32 Assert(cdn != NULL);
33
34 return cdn->fd.node_chunk_id;
35 }
36
37 /*
38 * Find an existing data node chunk assignment or initialize a new one.
39 */
40 static DataNodeChunkAssignment *
get_or_create_sca(DataNodeChunkAssignments * scas,Oid serverid,RelOptInfo * rel)41 get_or_create_sca(DataNodeChunkAssignments *scas, Oid serverid, RelOptInfo *rel)
42 {
43 DataNodeChunkAssignment *sca;
44 bool found;
45
46 Assert(rel == NULL || rel->serverid == serverid);
47
48 sca = hash_search(scas->assignments, &serverid, HASH_ENTER, &found);
49
50 if (!found)
51 {
52 /* New entry */
53 memset(sca, 0, sizeof(*sca));
54 sca->node_server_oid = serverid;
55 }
56
57 return sca;
58 }
59
60 static const DimensionSlice *
get_slice_for_dimension(Oid chunk_relid,int32 dimension_id)61 get_slice_for_dimension(Oid chunk_relid, int32 dimension_id)
62 {
63 Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
64
65 return ts_hypercube_get_slice_by_dimension_id(chunk->cube, dimension_id);
66 }
67
68 /*
69 * Assign the given chunk relation to a data node.
70 *
71 * The chunk is assigned according to the strategy set in the
72 * DataNodeChunkAssignments state.
73 */
74 DataNodeChunkAssignment *
data_node_chunk_assignment_assign_chunk(DataNodeChunkAssignments * scas,RelOptInfo * chunkrel)75 data_node_chunk_assignment_assign_chunk(DataNodeChunkAssignments *scas, RelOptInfo *chunkrel)
76 {
77 DataNodeChunkAssignment *sca = get_or_create_sca(scas, chunkrel->serverid, NULL);
78 RangeTblEntry *rte = planner_rt_fetch(chunkrel->relid, scas->root);
79 MemoryContext old;
80
81 /* Should never assign the same chunk twice */
82 Assert(!bms_is_member(chunkrel->relid, sca->chunk_relids));
83
84 old = MemoryContextSwitchTo(scas->mctx);
85
86 /* If this is the first chunk we assign to this data node, increment the
87 * number of data nodes with one or more chunks on them */
88 if (list_length(sca->chunk_oids) == 0)
89 scas->num_nodes_with_chunks++;
90
91 sca->chunk_relids = bms_add_member(sca->chunk_relids, chunkrel->relid);
92 sca->chunk_oids = lappend_oid(sca->chunk_oids, rte->relid);
93 sca->remote_chunk_ids =
94 lappend_int(sca->remote_chunk_ids,
95 get_remote_chunk_id_from_relid(chunkrel->serverid, rte->relid));
96 sca->pages += chunkrel->pages;
97 sca->rows += chunkrel->rows;
98 sca->tuples += chunkrel->tuples;
99
100 MemoryContextSwitchTo(old);
101
102 scas->total_num_chunks++;
103
104 return sca;
105 }
106
107 /*
108 * Initialize a new chunk assignment state with a specific assignment strategy.
109 */
110 void
data_node_chunk_assignments_init(DataNodeChunkAssignments * scas,DataNodeChunkAssignmentStrategy strategy,PlannerInfo * root,unsigned int nrels_hint)111 data_node_chunk_assignments_init(DataNodeChunkAssignments *scas,
112 DataNodeChunkAssignmentStrategy strategy, PlannerInfo *root,
113 unsigned int nrels_hint)
114 {
115 HASHCTL hctl = {
116 .keysize = sizeof(Oid),
117 .entrysize = sizeof(DataNodeChunkAssignment),
118 .hcxt = CurrentMemoryContext,
119 };
120
121 scas->strategy = strategy;
122 scas->root = root;
123 scas->mctx = hctl.hcxt;
124 scas->total_num_chunks = 0;
125 scas->num_nodes_with_chunks = 0;
126 scas->assignments = hash_create("data node chunk assignments",
127 nrels_hint,
128 &hctl,
129 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
130 }
131
132 /*
133 * Assign chunks to data nodes.
134 *
135 * Each chunk in the chunkrels array is a assigned a data node using the strategy
136 * set in the DataNodeChunkAssignments state.
137 */
138 DataNodeChunkAssignments *
data_node_chunk_assignment_assign_chunks(DataNodeChunkAssignments * scas,RelOptInfo ** chunkrels,unsigned int nrels)139 data_node_chunk_assignment_assign_chunks(DataNodeChunkAssignments *scas, RelOptInfo **chunkrels,
140 unsigned int nrels)
141 {
142 unsigned int i;
143
144 Assert(scas->assignments != NULL && scas->root != NULL);
145
146 for (i = 0; i < nrels; i++)
147 {
148 RelOptInfo *chunkrel = chunkrels[i];
149
150 Assert(IS_SIMPLE_REL(chunkrel) && chunkrel->fdw_private != NULL);
151 data_node_chunk_assignment_assign_chunk(scas, chunkrel);
152 }
153
154 return scas;
155 }
156
157 /*
158 * Get the data node assignment for the given relation (chunk).
159 */
160 DataNodeChunkAssignment *
data_node_chunk_assignment_get_or_create(DataNodeChunkAssignments * scas,RelOptInfo * rel)161 data_node_chunk_assignment_get_or_create(DataNodeChunkAssignments *scas, RelOptInfo *rel)
162 {
163 return get_or_create_sca(scas, rel->serverid, rel);
164 }
165
166 /*
167 * Check if a dimension slice overlaps with other slices.
168 *
169 * This is a naive implementation that runs in linear time. A more efficient
170 * approach would be to use, e.g., an interval tree.
171 */
172 static bool
dimension_slice_overlaps_with_others(const DimensionSlice * slice,const List * other_slices)173 dimension_slice_overlaps_with_others(const DimensionSlice *slice, const List *other_slices)
174 {
175 ListCell *lc;
176
177 foreach (lc, other_slices)
178 {
179 const DimensionSlice *other_slice = lfirst(lc);
180
181 if (ts_dimension_slices_collide(slice, other_slice))
182 return true;
183 }
184
185 return false;
186 }
187
188 /*
189 * DataNodeSlice: a hash table entry to track the data node a chunk slice is placed
190 * on.
191 */
192 typedef struct DataNodeSlice
193 {
194 int32 sliceid;
195 Oid node_serverid;
196 } DataNodeSlice;
197
198 /*
199 * Check whether chunks are assigned in an overlapping way.
200 *
201 * Assignments are overlapping if any data node has a chunk that overlaps (in the
202 * given paritioning dimension) with a chunk on another data node. There are two
203 * cases when this can happen:
204 *
205 * 1. The same slice exists on multiple data nodes (we optimize for detecting
206 * this).
207 *
208 * 2. Two different slices overlap while existing on different data nodes (this
209 * case is more costly to detect).
210 */
211 bool
data_node_chunk_assignments_are_overlapping(DataNodeChunkAssignments * scas,int32 partitioning_dimension_id)212 data_node_chunk_assignments_are_overlapping(DataNodeChunkAssignments *scas,
213 int32 partitioning_dimension_id)
214 {
215 HASH_SEQ_STATUS status;
216 HASHCTL hashctl = {
217 .keysize = sizeof(int32),
218 .entrysize = sizeof(DataNodeSlice),
219 .hcxt = CurrentMemoryContext,
220 };
221 HTAB *all_data_node_slice_htab;
222 DataNodeChunkAssignment *sca;
223 List *all_data_node_slices = NIL;
224
225 /* No overlapping can occur if there are chunks on only one data node (this
226 * covers also the case of a single chunk) */
227 if (scas->num_nodes_with_chunks <= 1)
228 return false;
229
230 /* If there are multiple data nodes with chunks and they are not placed along
231 * a closed "space" dimension, we assume overlapping */
232 if (partitioning_dimension_id <= 0)
233 return true;
234
235 /* Use a hash table to track slice data node mappings by slice ID. The same
236 * slice can exist on multiple data nodes, causing an overlap across data nodes
237 * in the slice dimension. This hash table is used to quickly detect such
238 * "same-slice overlaps" and avoids having to do a more expensive range
239 * overlap check.
240 */
241 all_data_node_slice_htab = hash_create("all_data_node_slices",
242 scas->total_num_chunks,
243 &hashctl,
244 HASH_ELEM | HASH_BLOBS);
245
246 hash_seq_init(&status, scas->assignments);
247
248 while ((sca = hash_seq_search(&status)))
249 {
250 List *data_node_slices = NIL;
251 ListCell *lc;
252
253 /* Check each slice on the data node against the slices on other
254 * data nodes */
255 foreach (lc, sca->chunk_oids)
256 {
257 Oid chunk_oid = lfirst_oid(lc);
258 const DimensionSlice *slice;
259 DataNodeSlice *ss;
260 bool found;
261
262 slice = get_slice_for_dimension(chunk_oid, partitioning_dimension_id);
263
264 Assert(NULL != slice);
265
266 /* Get or create a new entry in the global slice set */
267 ss = hash_search(all_data_node_slice_htab, &slice->fd.id, HASH_ENTER, &found);
268
269 if (!found)
270 {
271 ss->sliceid = slice->fd.id;
272 ss->node_serverid = sca->node_server_oid;
273 data_node_slices = lappend(data_node_slices, ts_dimension_slice_copy(slice));
274 }
275
276 /* First detect "same-slice overlap", and then do a more expensive
277 * range overlap check */
278 if (ss->node_serverid != sca->node_server_oid ||
279 /* Check if the slice overlaps with the accumulated slices of
280 * other data nodes. This can be made more efficient by using an
281 * interval tree. */
282 dimension_slice_overlaps_with_others(slice, all_data_node_slices))
283 {
284 /* The same slice exists on (at least) two data nodes, or it
285 * overlaps with a different slice on another data node */
286 hash_seq_term(&status);
287 hash_destroy(all_data_node_slice_htab);
288 return true;
289 }
290 }
291
292 /* Add the data node's slice set to the set of all data nodes checked so
293 * far */
294 all_data_node_slices = list_concat(all_data_node_slices, data_node_slices);
295 }
296
297 hash_destroy(all_data_node_slice_htab);
298
299 return false;
300 }
301