1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <foreign/foreign.h>
8 #include <utils/hsearch.h>
9 #include <utils/fmgrprotos.h>
10 #include <parser/parsetree.h>
11 #include <nodes/bitmapset.h>
12 
13 #include "data_node_chunk_assignment.h"
14 #include "dimension.h"
15 #include "dimension_slice.h"
16 #include "dimension_vector.h"
17 #include "hypercube.h"
18 #include "chunk.h"
19 #include "chunk_data_node.h"
20 
21 static int
get_remote_chunk_id_from_relid(Oid server_oid,Oid chunk_relid)22 get_remote_chunk_id_from_relid(Oid server_oid, Oid chunk_relid)
23 {
24 	Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
25 	ForeignServer *fs = GetForeignServer(server_oid);
26 	ChunkDataNode *cdn;
27 
28 	Assert(chunk != NULL);
29 	cdn = ts_chunk_data_node_scan_by_chunk_id_and_node_name(chunk->fd.id,
30 															fs->servername,
31 															CurrentMemoryContext);
32 	Assert(cdn != NULL);
33 
34 	return cdn->fd.node_chunk_id;
35 }
36 
37 /*
38  * Find an existing data node chunk assignment or initialize a new one.
39  */
40 static DataNodeChunkAssignment *
get_or_create_sca(DataNodeChunkAssignments * scas,Oid serverid,RelOptInfo * rel)41 get_or_create_sca(DataNodeChunkAssignments *scas, Oid serverid, RelOptInfo *rel)
42 {
43 	DataNodeChunkAssignment *sca;
44 	bool found;
45 
46 	Assert(rel == NULL || rel->serverid == serverid);
47 
48 	sca = hash_search(scas->assignments, &serverid, HASH_ENTER, &found);
49 
50 	if (!found)
51 	{
52 		/* New entry */
53 		memset(sca, 0, sizeof(*sca));
54 		sca->node_server_oid = serverid;
55 	}
56 
57 	return sca;
58 }
59 
60 static const DimensionSlice *
get_slice_for_dimension(Oid chunk_relid,int32 dimension_id)61 get_slice_for_dimension(Oid chunk_relid, int32 dimension_id)
62 {
63 	Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
64 
65 	return ts_hypercube_get_slice_by_dimension_id(chunk->cube, dimension_id);
66 }
67 
68 /*
69  * Assign the given chunk relation to a data node.
70  *
71  * The chunk is assigned according to the strategy set in the
72  * DataNodeChunkAssignments state.
73  */
74 DataNodeChunkAssignment *
data_node_chunk_assignment_assign_chunk(DataNodeChunkAssignments * scas,RelOptInfo * chunkrel)75 data_node_chunk_assignment_assign_chunk(DataNodeChunkAssignments *scas, RelOptInfo *chunkrel)
76 {
77 	DataNodeChunkAssignment *sca = get_or_create_sca(scas, chunkrel->serverid, NULL);
78 	RangeTblEntry *rte = planner_rt_fetch(chunkrel->relid, scas->root);
79 	MemoryContext old;
80 
81 	/* Should never assign the same chunk twice */
82 	Assert(!bms_is_member(chunkrel->relid, sca->chunk_relids));
83 
84 	old = MemoryContextSwitchTo(scas->mctx);
85 
86 	/* If this is the first chunk we assign to this data node, increment the
87 	 * number of data nodes with one or more chunks on them */
88 	if (list_length(sca->chunk_oids) == 0)
89 		scas->num_nodes_with_chunks++;
90 
91 	sca->chunk_relids = bms_add_member(sca->chunk_relids, chunkrel->relid);
92 	sca->chunk_oids = lappend_oid(sca->chunk_oids, rte->relid);
93 	sca->remote_chunk_ids =
94 		lappend_int(sca->remote_chunk_ids,
95 					get_remote_chunk_id_from_relid(chunkrel->serverid, rte->relid));
96 	sca->pages += chunkrel->pages;
97 	sca->rows += chunkrel->rows;
98 	sca->tuples += chunkrel->tuples;
99 
100 	MemoryContextSwitchTo(old);
101 
102 	scas->total_num_chunks++;
103 
104 	return sca;
105 }
106 
107 /*
108  * Initialize a new chunk assignment state with a specific assignment strategy.
109  */
110 void
data_node_chunk_assignments_init(DataNodeChunkAssignments * scas,DataNodeChunkAssignmentStrategy strategy,PlannerInfo * root,unsigned int nrels_hint)111 data_node_chunk_assignments_init(DataNodeChunkAssignments *scas,
112 								 DataNodeChunkAssignmentStrategy strategy, PlannerInfo *root,
113 								 unsigned int nrels_hint)
114 {
115 	HASHCTL hctl = {
116 		.keysize = sizeof(Oid),
117 		.entrysize = sizeof(DataNodeChunkAssignment),
118 		.hcxt = CurrentMemoryContext,
119 	};
120 
121 	scas->strategy = strategy;
122 	scas->root = root;
123 	scas->mctx = hctl.hcxt;
124 	scas->total_num_chunks = 0;
125 	scas->num_nodes_with_chunks = 0;
126 	scas->assignments = hash_create("data node chunk assignments",
127 									nrels_hint,
128 									&hctl,
129 									HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
130 }
131 
132 /*
133  * Assign chunks to data nodes.
134  *
135  * Each chunk in the chunkrels array is a assigned a data node using the strategy
136  * set in the DataNodeChunkAssignments state.
137  */
138 DataNodeChunkAssignments *
data_node_chunk_assignment_assign_chunks(DataNodeChunkAssignments * scas,RelOptInfo ** chunkrels,unsigned int nrels)139 data_node_chunk_assignment_assign_chunks(DataNodeChunkAssignments *scas, RelOptInfo **chunkrels,
140 										 unsigned int nrels)
141 {
142 	unsigned int i;
143 
144 	Assert(scas->assignments != NULL && scas->root != NULL);
145 
146 	for (i = 0; i < nrels; i++)
147 	{
148 		RelOptInfo *chunkrel = chunkrels[i];
149 
150 		Assert(IS_SIMPLE_REL(chunkrel) && chunkrel->fdw_private != NULL);
151 		data_node_chunk_assignment_assign_chunk(scas, chunkrel);
152 	}
153 
154 	return scas;
155 }
156 
157 /*
158  * Get the data node assignment for the given relation (chunk).
159  */
160 DataNodeChunkAssignment *
data_node_chunk_assignment_get_or_create(DataNodeChunkAssignments * scas,RelOptInfo * rel)161 data_node_chunk_assignment_get_or_create(DataNodeChunkAssignments *scas, RelOptInfo *rel)
162 {
163 	return get_or_create_sca(scas, rel->serverid, rel);
164 }
165 
166 /*
167  * Check if a dimension slice overlaps with other slices.
168  *
169  * This is a naive implementation that runs in linear time. A more efficient
170  * approach would be to use, e.g., an interval tree.
171  */
172 static bool
dimension_slice_overlaps_with_others(const DimensionSlice * slice,const List * other_slices)173 dimension_slice_overlaps_with_others(const DimensionSlice *slice, const List *other_slices)
174 {
175 	ListCell *lc;
176 
177 	foreach (lc, other_slices)
178 	{
179 		const DimensionSlice *other_slice = lfirst(lc);
180 
181 		if (ts_dimension_slices_collide(slice, other_slice))
182 			return true;
183 	}
184 
185 	return false;
186 }
187 
188 /*
189  * DataNodeSlice: a hash table entry to track the data node a chunk slice is placed
190  * on.
191  */
192 typedef struct DataNodeSlice
193 {
194 	int32 sliceid;
195 	Oid node_serverid;
196 } DataNodeSlice;
197 
198 /*
199  * Check whether chunks are assigned in an overlapping way.
200  *
201  * Assignments are overlapping if any data node has a chunk that overlaps (in the
202  * given paritioning dimension) with a chunk on another data node. There are two
203  * cases when this can happen:
204  *
205  * 1. The same slice exists on multiple data nodes (we optimize for detecting
206  * this).
207  *
208  * 2. Two different slices overlap while existing on different data nodes (this
209  * case is more costly to detect).
210  */
211 bool
data_node_chunk_assignments_are_overlapping(DataNodeChunkAssignments * scas,int32 partitioning_dimension_id)212 data_node_chunk_assignments_are_overlapping(DataNodeChunkAssignments *scas,
213 											int32 partitioning_dimension_id)
214 {
215 	HASH_SEQ_STATUS status;
216 	HASHCTL hashctl = {
217 		.keysize = sizeof(int32),
218 		.entrysize = sizeof(DataNodeSlice),
219 		.hcxt = CurrentMemoryContext,
220 	};
221 	HTAB *all_data_node_slice_htab;
222 	DataNodeChunkAssignment *sca;
223 	List *all_data_node_slices = NIL;
224 
225 	/* No overlapping can occur if there are chunks on only one data node (this
226 	 * covers also the case of a single chunk) */
227 	if (scas->num_nodes_with_chunks <= 1)
228 		return false;
229 
230 	/* If there are multiple data nodes with chunks and they are not placed along
231 	 * a closed "space" dimension, we assume overlapping */
232 	if (partitioning_dimension_id <= 0)
233 		return true;
234 
235 	/* Use a hash table to track slice data node mappings by slice ID. The same
236 	 * slice can exist on multiple data nodes, causing an overlap across data nodes
237 	 * in the slice dimension. This hash table is used to quickly detect such
238 	 * "same-slice overlaps" and avoids having to do a more expensive range
239 	 * overlap check.
240 	 */
241 	all_data_node_slice_htab = hash_create("all_data_node_slices",
242 										   scas->total_num_chunks,
243 										   &hashctl,
244 										   HASH_ELEM | HASH_BLOBS);
245 
246 	hash_seq_init(&status, scas->assignments);
247 
248 	while ((sca = hash_seq_search(&status)))
249 	{
250 		List *data_node_slices = NIL;
251 		ListCell *lc;
252 
253 		/* Check each slice on the data node against the slices on other
254 		 * data nodes */
255 		foreach (lc, sca->chunk_oids)
256 		{
257 			Oid chunk_oid = lfirst_oid(lc);
258 			const DimensionSlice *slice;
259 			DataNodeSlice *ss;
260 			bool found;
261 
262 			slice = get_slice_for_dimension(chunk_oid, partitioning_dimension_id);
263 
264 			Assert(NULL != slice);
265 
266 			/* Get or create a new entry in the global slice set */
267 			ss = hash_search(all_data_node_slice_htab, &slice->fd.id, HASH_ENTER, &found);
268 
269 			if (!found)
270 			{
271 				ss->sliceid = slice->fd.id;
272 				ss->node_serverid = sca->node_server_oid;
273 				data_node_slices = lappend(data_node_slices, ts_dimension_slice_copy(slice));
274 			}
275 
276 			/* First detect "same-slice overlap", and then do a more expensive
277 			 * range overlap check */
278 			if (ss->node_serverid != sca->node_server_oid ||
279 				/* Check if the slice overlaps with the accumulated slices of
280 				 * other data nodes. This can be made more efficient by using an
281 				 * interval tree. */
282 				dimension_slice_overlaps_with_others(slice, all_data_node_slices))
283 			{
284 				/* The same slice exists on (at least) two data nodes, or it
285 				 * overlaps with a different slice on another data node */
286 				hash_seq_term(&status);
287 				hash_destroy(all_data_node_slice_htab);
288 				return true;
289 			}
290 		}
291 
292 		/* Add the data node's slice set to the set of all data nodes checked so
293 		 * far */
294 		all_data_node_slices = list_concat(all_data_node_slices, data_node_slices);
295 	}
296 
297 	hash_destroy(all_data_node_slice_htab);
298 
299 	return false;
300 }
301