1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <catalog/pg_class.h>
8 #include <catalog/namespace.h>
9 #include <catalog/pg_trigger.h>
10 #include <catalog/indexing.h>
11 #include <catalog/pg_inherits.h>
12 #include <catalog/toasting.h>
13 #include <commands/trigger.h>
14 #include <commands/tablecmds.h>
15 #include <commands/defrem.h>
16 #include <tcop/tcopprot.h>
17 #include <access/htup.h>
18 #include <access/htup_details.h>
19 #include <access/xact.h>
20 #include <access/reloptions.h>
21 #include <nodes/makefuncs.h>
22 #include <utils/builtins.h>
23 #include <utils/lsyscache.h>
24 #include <utils/syscache.h>
25 #include <utils/hsearch.h>
26 #include <storage/lmgr.h>
27 #include <miscadmin.h>
28 #include <funcapi.h>
29 #include <fmgr.h>
30 #include <utils/datum.h>
31 #include <catalog/pg_type.h>
32 #include <utils/acl.h>
33 #include <utils/timestamp.h>
34 #include <nodes/execnodes.h>
35 #include <executor/executor.h>
36 #include <access/tupdesc.h>
37 
38 #include "export.h"
39 #include "debug_point.h"
40 #include "chunk.h"
41 #include "chunk_index.h"
42 #include "chunk_data_node.h"
43 #include "cross_module_fn.h"
44 #include "catalog.h"
45 #include "continuous_agg.h"
46 #include "cross_module_fn.h"
47 #include "dimension.h"
48 #include "dimension_slice.h"
49 #include "dimension_vector.h"
50 #include "errors.h"
51 #include "partitioning.h"
52 #include "hypertable.h"
53 #include "hypertable_data_node.h"
54 #include "hypercube.h"
55 #include "scanner.h"
56 #include "process_utility.h"
57 #include "time_utils.h"
58 #include "trigger.h"
59 #include "compat/compat.h"
60 #include "utils.h"
61 #include "hypertable_cache.h"
62 #include "cache.h"
63 #include "bgw_policy/chunk_stats.h"
64 #include "scan_iterator.h"
65 #include "compression_chunk_size.h"
66 #include "extension.h"
67 
68 TS_FUNCTION_INFO_V1(ts_chunk_show_chunks);
69 TS_FUNCTION_INFO_V1(ts_chunk_drop_chunks);
70 TS_FUNCTION_INFO_V1(ts_chunks_in);
71 TS_FUNCTION_INFO_V1(ts_chunk_id_from_relid);
72 TS_FUNCTION_INFO_V1(ts_chunk_show);
73 TS_FUNCTION_INFO_V1(ts_chunk_create);
74 
75 static const char *
DatumGetNameString(Datum datum)76 DatumGetNameString(Datum datum)
77 {
78 	Name name = DatumGetName(datum);
79 	return pstrdup(NameStr(*name));
80 }
81 
82 /* Used when processing scanned chunks */
83 typedef enum ChunkResult
84 {
85 	CHUNK_DONE,
86 	CHUNK_IGNORED,
87 	CHUNK_PROCESSED
88 } ChunkResult;
89 
90 /*
91  * Context for scanning and building a chunk from a stub.
92  *
93  * If found, the chunk will be created and the chunk pointer member is set in
94  * the result. Optionally, a caller can pre-allocate the chunk member's memory,
95  * which is useful if one, e.g., wants to fill in an memory-aligned array of
96  * chunks.
97  *
98  * If the chunk is a tombstone (dropped flag set), then the Chunk will not be
99  * created and instead is_dropped will be TRUE.
100  */
101 typedef struct ChunkStubScanCtx
102 {
103 	ChunkStub *stub;
104 	Chunk *chunk;
105 	bool is_dropped;
106 } ChunkStubScanCtx;
107 
108 static bool
chunk_stub_is_valid(const ChunkStub * stub,unsigned int expected_slices)109 chunk_stub_is_valid(const ChunkStub *stub, unsigned int expected_slices)
110 {
111 	return stub && stub->id > 0 && stub->constraints && expected_slices == stub->cube->num_slices &&
112 		   stub->cube->num_slices == stub->constraints->num_dimension_constraints;
113 }
114 
115 typedef ChunkResult (*on_chunk_stub_func)(ChunkScanCtx *ctx, ChunkStub *stub);
116 static void chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *p);
117 static void chunk_scan_ctx_destroy(ChunkScanCtx *ctx);
118 static void chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube);
119 static int chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_func on_chunk,
120 											 uint16 limit);
121 static Datum chunks_return_srf(FunctionCallInfo fcinfo);
122 static int chunk_cmp(const void *ch1, const void *ch2);
123 static Chunk *chunk_find(const Hypertable *ht, const Point *p, bool resurrect, bool lock_slices);
124 static void init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
125 											  const char *table_name);
126 static Hypertable *find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid);
127 static Chunk *get_chunks_in_time_range(Hypertable *ht, int64 older_than, int64 newer_than,
128 									   const char *caller_name, MemoryContext mctx,
129 									   uint64 *num_chunks_returned, ScanTupLock *tuplock);
130 
131 #define IS_VALID_CHUNK(chunk)                                                                      \
132 	((chunk) && !(chunk)->fd.dropped && (chunk)->fd.id > 0 && (chunk)->fd.hypertable_id > 0 &&     \
133 	 OidIsValid((chunk)->table_id) && OidIsValid((chunk)->hypertable_relid) &&                     \
134 	 (chunk)->constraints && (chunk)->cube &&                                                      \
135 	 (chunk)->cube->num_slices == (chunk)->constraints->num_dimension_constraints &&               \
136 	 ((chunk)->relkind == RELKIND_RELATION ||                                                      \
137 	  ((chunk)->relkind == RELKIND_FOREIGN_TABLE && (chunk)->data_nodes != NIL)))
138 
139 #define ASSERT_IS_VALID_CHUNK(chunk) Assert(IS_VALID_CHUNK(chunk))
140 
141 #define ASSERT_IS_NULL_OR_VALID_CHUNK(chunk) Assert(chunk == NULL || IS_VALID_CHUNK(chunk))
142 
143 /*
144  * The chunk status field values are persisted in the database and must never be changed.
145  * Those values are used as flags and must always be powers of 2 to allow bitwise operations.
146  */
147 #define CHUNK_STATUS_DEFAULT 0
148 /*
149  * Setting a Data-Node chunk as CHUNK_STATUS_COMPRESSED means that the corresponding
150  * compressed_chunk_id field points to a chunk that holds the compressed data. Otherwise,
151  * the corresponding compressed_chunk_id is NULL.
152  *
153  * However, for Access-Nodes compressed_chunk_id is always NULL. CHUNK_STATUS_COMPRESSED being set
154  * means that a remote compress_chunk() operation has taken place for this distributed
155  * meta-chunk. On the other hand, if CHUNK_STATUS_COMPRESSED is cleared, then it is probable
156  * that a remote compress_chunk() has not taken place, but not certain.
157  *
158  * For the above reason, this flag should not be assumed to be consistent (when it is cleared)
159  * for Access-Nodes. When used in distributed hypertables one should take advantage of the
160  * idempotent properties of remote compress_chunk() and distributed compression policy to
161  * make progress.
162  */
163 #define CHUNK_STATUS_COMPRESSED 1
164 /*
165  * When inserting into a compressed chunk the configured compress_orderby is not retained.
166  * Any such chunks need an explicit Sort step to produce ordered output until the chunk
167  * ordering has been restored by recompress_chunk. This flag can only exist on compressed
168  * chunks.
169  */
170 #define CHUNK_STATUS_COMPRESSED_UNORDERED 2
171 
172 static HeapTuple
chunk_formdata_make_tuple(const FormData_chunk * fd,TupleDesc desc)173 chunk_formdata_make_tuple(const FormData_chunk *fd, TupleDesc desc)
174 {
175 	Datum values[Natts_chunk];
176 	bool nulls[Natts_chunk] = { false };
177 
178 	memset(values, 0, sizeof(Datum) * Natts_chunk);
179 
180 	values[AttrNumberGetAttrOffset(Anum_chunk_id)] = Int32GetDatum(fd->id);
181 	values[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)] = Int32GetDatum(fd->hypertable_id);
182 	values[AttrNumberGetAttrOffset(Anum_chunk_schema_name)] = NameGetDatum(&fd->schema_name);
183 	values[AttrNumberGetAttrOffset(Anum_chunk_table_name)] = NameGetDatum(&fd->table_name);
184 	/*when we insert a chunk the compressed chunk id is always NULL */
185 	if (fd->compressed_chunk_id == INVALID_CHUNK_ID)
186 		nulls[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)] = true;
187 	else
188 	{
189 		values[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)] =
190 			Int32GetDatum(fd->compressed_chunk_id);
191 	}
192 	values[AttrNumberGetAttrOffset(Anum_chunk_dropped)] = BoolGetDatum(fd->dropped);
193 	values[AttrNumberGetAttrOffset(Anum_chunk_status)] = Int32GetDatum(fd->status);
194 
195 	return heap_form_tuple(desc, values, nulls);
196 }
197 
198 static void
chunk_formdata_fill(FormData_chunk * fd,const TupleInfo * ti)199 chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti)
200 {
201 	bool should_free;
202 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
203 	bool nulls[Natts_chunk];
204 	Datum values[Natts_chunk];
205 
206 	memset(fd, 0, sizeof(FormData_chunk));
207 	heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
208 
209 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_id)]);
210 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)]);
211 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_schema_name)]);
212 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_table_name)]);
213 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_dropped)]);
214 	Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_status)]);
215 
216 	fd->id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_id)]);
217 	fd->hypertable_id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)]);
218 	memcpy(&fd->schema_name,
219 		   DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_schema_name)]),
220 		   NAMEDATALEN);
221 	memcpy(&fd->table_name,
222 		   DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_table_name)]),
223 		   NAMEDATALEN);
224 
225 	if (nulls[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)])
226 		fd->compressed_chunk_id = INVALID_CHUNK_ID;
227 	else
228 		fd->compressed_chunk_id =
229 			DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)]);
230 
231 	fd->dropped = DatumGetBool(values[AttrNumberGetAttrOffset(Anum_chunk_dropped)]);
232 	fd->status = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_status)]);
233 
234 	if (should_free)
235 		heap_freetuple(tuple);
236 }
237 int64
ts_chunk_primary_dimension_start(const Chunk * chunk)238 ts_chunk_primary_dimension_start(const Chunk *chunk)
239 {
240 	return chunk->cube->slices[0]->fd.range_start;
241 }
242 
243 int64
ts_chunk_primary_dimension_end(const Chunk * chunk)244 ts_chunk_primary_dimension_end(const Chunk *chunk)
245 {
246 	return chunk->cube->slices[0]->fd.range_end;
247 }
248 
249 static void
chunk_insert_relation(Relation rel,const Chunk * chunk)250 chunk_insert_relation(Relation rel, const Chunk *chunk)
251 {
252 	HeapTuple new_tuple;
253 	CatalogSecurityContext sec_ctx;
254 
255 	new_tuple = chunk_formdata_make_tuple(&chunk->fd, RelationGetDescr(rel));
256 
257 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
258 	ts_catalog_insert(rel, new_tuple);
259 	ts_catalog_restore_user(&sec_ctx);
260 
261 	heap_freetuple(new_tuple);
262 }
263 
264 void
ts_chunk_insert_lock(const Chunk * chunk,LOCKMODE lock)265 ts_chunk_insert_lock(const Chunk *chunk, LOCKMODE lock)
266 {
267 	Catalog *catalog = ts_catalog_get();
268 	Relation rel;
269 
270 	rel = table_open(catalog_get_table_id(catalog, CHUNK), lock);
271 	chunk_insert_relation(rel, chunk);
272 	table_close(rel, lock);
273 }
274 
275 typedef struct CollisionInfo
276 {
277 	Hypercube *cube;
278 	ChunkStub *colliding_chunk;
279 } CollisionInfo;
280 
281 /*-
282  * Align a chunk's hypercube in 'aligned' dimensions.
283  *
284  * Alignment ensures that chunks line up in a particular dimension, i.e., their
285  * ranges should either be identical or not overlap at all.
286  *
287  * Non-aligned:
288  *
289  * ' [---------]      <- existing slice
290  * '      [---------] <- calculated (new) slice
291  *
292  * To align the slices above there are two cases depending on where the
293  * insertion point happens:
294  *
295  * Case 1 (reuse slice):
296  *
297  * ' [---------]
298  * '      [--x------]
299  *
300  * The insertion point x falls within the range of the existing slice. We should
301  * reuse the existing slice rather than creating a new one.
302  *
303  * Case 2 (cut to align):
304  *
305  * ' [---------]
306  * '      [-------x-]
307  *
308  * The insertion point falls outside the range of the existing slice and we need
309  * to cut the new slice to line up.
310  *
311  * ' [---------]
312  * '        cut [---]
313  * '
314  *
315  * Note that slice reuse (case 1) happens already when calculating the tentative
316  * hypercube for the chunk, and is thus already performed once reaching this
317  * function. Thus, we deal only with case 2 here. Also note that a new slice
318  * might overlap in complicated ways, requiring multiple cuts. For instance,
319  * consider the following situation:
320  *
321  * ' [------]   [-] [---]
322  * '      [---x-------]  <- calculated slice
323  *
324  * This should but cut-to-align as follows:
325  *
326  * ' [------]   [-] [---]
327  * '         [x]
328  *
329  * After a chunk collision scan, this function is called for each chunk in the
330  * chunk scan context. Chunks in the scan context may have only a partial set of
331  * slices if they only overlap in some, but not all, dimensions (see
332  * illustrations below). Still, partial chunks may still be of interest for
333  * alignment in a particular dimension. Thus, if a chunk has an overlapping
334  * slice in an aligned dimension, we cut to not overlap with that slice.
335  */
336 static ChunkResult
do_dimension_alignment(ChunkScanCtx * scanctx,ChunkStub * stub)337 do_dimension_alignment(ChunkScanCtx *scanctx, ChunkStub *stub)
338 {
339 	CollisionInfo *info = scanctx->data;
340 	Hypercube *cube = info->cube;
341 	const Hyperspace *space = scanctx->space;
342 	ChunkResult res = CHUNK_IGNORED;
343 	int i;
344 
345 	for (i = 0; i < space->num_dimensions; i++)
346 	{
347 		const Dimension *dim = &space->dimensions[i];
348 		const DimensionSlice *chunk_slice;
349 		DimensionSlice *cube_slice;
350 		int64 coord = scanctx->point->coordinates[i];
351 
352 		if (!dim->fd.aligned)
353 			continue;
354 
355 		/*
356 		 * The stub might not have a slice for each dimension, so we cannot
357 		 * use array indexing. Fetch slice by dimension ID instead.
358 		 */
359 		chunk_slice = ts_hypercube_get_slice_by_dimension_id(stub->cube, dim->fd.id);
360 
361 		if (NULL == chunk_slice)
362 			continue;
363 
364 		cube_slice = cube->slices[i];
365 
366 		/*
367 		 * Only cut-to-align if the slices collide and are not identical
368 		 * (i.e., if we are reusing an existing slice we should not cut it)
369 		 */
370 		if (!ts_dimension_slices_equal(cube_slice, chunk_slice) &&
371 			ts_dimension_slices_collide(cube_slice, chunk_slice))
372 		{
373 			ts_dimension_slice_cut(cube_slice, chunk_slice, coord);
374 			res = CHUNK_PROCESSED;
375 		}
376 	}
377 
378 	return res;
379 }
380 
381 /*
382  * Calculate, and potentially set, a new chunk interval for an open dimension.
383  */
384 static bool
calculate_and_set_new_chunk_interval(const Hypertable * ht,const Point * p)385 calculate_and_set_new_chunk_interval(const Hypertable *ht, const Point *p)
386 {
387 	Hyperspace *hs = ht->space;
388 	Dimension *dim = NULL;
389 	Datum datum;
390 	int64 chunk_interval, coord;
391 	int i;
392 
393 	if (!OidIsValid(ht->chunk_sizing_func) || ht->fd.chunk_target_size <= 0)
394 		return false;
395 
396 	/* Find first open dimension */
397 	for (i = 0; i < hs->num_dimensions; i++)
398 	{
399 		dim = &hs->dimensions[i];
400 
401 		if (IS_OPEN_DIMENSION(dim))
402 			break;
403 
404 		dim = NULL;
405 	}
406 
407 	/* Nothing to do if no open dimension */
408 	if (NULL == dim)
409 	{
410 		elog(WARNING,
411 			 "adaptive chunking enabled on hypertable \"%s\" without an open (time) dimension",
412 			 get_rel_name(ht->main_table_relid));
413 
414 		return false;
415 	}
416 
417 	coord = p->coordinates[i];
418 	datum = OidFunctionCall3(ht->chunk_sizing_func,
419 							 Int32GetDatum(dim->fd.id),
420 							 Int64GetDatum(coord),
421 							 Int64GetDatum(ht->fd.chunk_target_size));
422 	chunk_interval = DatumGetInt64(datum);
423 
424 	/* Check if the function didn't set and interval or nothing changed */
425 	if (chunk_interval <= 0 || chunk_interval == dim->fd.interval_length)
426 		return false;
427 
428 	/* Update the dimension */
429 	ts_dimension_set_chunk_interval(dim, chunk_interval);
430 
431 	return true;
432 }
433 
434 /*
435  * Resolve chunk collisions.
436  *
437  * After a chunk collision scan, this function is called for each chunk in the
438  * chunk scan context. We only care about chunks that have a full set of
439  * slices/constraints that overlap with our tentative hypercube, i.e., they
440  * fully collide. We resolve those collisions by cutting the hypercube.
441  */
442 static ChunkResult
do_collision_resolution(ChunkScanCtx * scanctx,ChunkStub * stub)443 do_collision_resolution(ChunkScanCtx *scanctx, ChunkStub *stub)
444 {
445 	CollisionInfo *info = scanctx->data;
446 	Hypercube *cube = info->cube;
447 	const Hyperspace *space = scanctx->space;
448 	ChunkResult res = CHUNK_IGNORED;
449 	int i;
450 
451 	if (stub->cube->num_slices != space->num_dimensions || !ts_hypercubes_collide(cube, stub->cube))
452 		return CHUNK_IGNORED;
453 
454 	for (i = 0; i < space->num_dimensions; i++)
455 	{
456 		DimensionSlice *cube_slice = cube->slices[i];
457 		DimensionSlice *chunk_slice = stub->cube->slices[i];
458 		int64 coord = scanctx->point->coordinates[i];
459 
460 		/*
461 		 * Only cut if we aren't reusing an existing slice and there is a
462 		 * collision
463 		 */
464 		if (!ts_dimension_slices_equal(cube_slice, chunk_slice) &&
465 			ts_dimension_slices_collide(cube_slice, chunk_slice))
466 		{
467 			ts_dimension_slice_cut(cube_slice, chunk_slice, coord);
468 			res = CHUNK_PROCESSED;
469 
470 			/*
471 			 * Redo the collision check after each cut since cutting in one
472 			 * dimension might have resolved the collision in another
473 			 */
474 			if (!ts_hypercubes_collide(cube, stub->cube))
475 				return res;
476 		}
477 	}
478 
479 	Assert(!ts_hypercubes_collide(cube, stub->cube));
480 
481 	return res;
482 }
483 
484 static ChunkResult
check_for_collisions(ChunkScanCtx * scanctx,ChunkStub * stub)485 check_for_collisions(ChunkScanCtx *scanctx, ChunkStub *stub)
486 {
487 	CollisionInfo *info = scanctx->data;
488 	Hypercube *cube = info->cube;
489 	const Hyperspace *space = scanctx->space;
490 
491 	/* Check if this chunk collides with our hypercube */
492 	if (stub->cube->num_slices == space->num_dimensions && ts_hypercubes_collide(cube, stub->cube))
493 	{
494 		info->colliding_chunk = stub;
495 		return CHUNK_DONE;
496 	}
497 
498 	return CHUNK_IGNORED;
499 }
500 
501 /*
502  * Check if a (tentative) chunk collides with existing chunks.
503  *
504  * Return the colliding chunk. Note that the chunk is a stub and not a full
505  * chunk.
506  */
507 static ChunkStub *
chunk_collides(const Hypertable * ht,const Hypercube * hc)508 chunk_collides(const Hypertable *ht, const Hypercube *hc)
509 {
510 	ChunkScanCtx scanctx;
511 	CollisionInfo info = {
512 		.cube = (Hypercube *) hc,
513 		.colliding_chunk = NULL,
514 	};
515 
516 	chunk_scan_ctx_init(&scanctx, ht->space, NULL);
517 
518 	/* Scan for all chunks that collide with the hypercube of the new chunk */
519 	chunk_collision_scan(&scanctx, hc);
520 	scanctx.data = &info;
521 
522 	/* Find chunks that collide */
523 	chunk_scan_ctx_foreach_chunk_stub(&scanctx, check_for_collisions, 0);
524 
525 	chunk_scan_ctx_destroy(&scanctx);
526 
527 	return info.colliding_chunk;
528 }
529 
530 /*-
531  * Resolve collisions and perform alignmment.
532  *
533  * Chunks collide only if their hypercubes overlap in all dimensions. For
534  * instance, the 2D chunks below collide because they overlap in both the X and
535  * Y dimensions:
536  *
537  * ' _____
538  * ' |    |
539  * ' | ___|__
540  * ' |_|__|  |
541  * '   |     |
542  * '   |_____|
543  *
544  * While the following chunks do not collide, although they still overlap in the
545  * X dimension:
546  *
547  * ' _____
548  * ' |    |
549  * ' |    |
550  * ' |____|
551  * '   ______
552  * '   |     |
553  * '   |    *|
554  * '   |_____|
555  *
556  * For the collision case above we obviously want to cut our hypercube to no
557  * longer collide with existing chunks. However, the second case might still be
558  * of interest for alignment in case X is an 'aligned' dimension. If '*' is the
559  * insertion point, then we still want to cut the hypercube to ensure that the
560  * dimension remains aligned, like so:
561  *
562  * ' _____
563  * ' |    |
564  * ' |    |
565  * ' |____|
566  * '       ___
567  * '       | |
568  * '       |*|
569  * '       |_|
570  *
571  *
572  * We perform alignment first as that might actually resolve chunk
573  * collisions. After alignment we check for any remaining collisions.
574  */
575 static void
chunk_collision_resolve(const Hypertable * ht,Hypercube * cube,const Point * p)576 chunk_collision_resolve(const Hypertable *ht, Hypercube *cube, const Point *p)
577 {
578 	ChunkScanCtx scanctx;
579 	CollisionInfo info = {
580 		.cube = cube,
581 		.colliding_chunk = NULL,
582 	};
583 
584 	chunk_scan_ctx_init(&scanctx, ht->space, p);
585 
586 	/* Scan for all chunks that collide with the hypercube of the new chunk */
587 	chunk_collision_scan(&scanctx, cube);
588 	scanctx.data = &info;
589 
590 	/* Cut the hypercube in any aligned dimensions */
591 	chunk_scan_ctx_foreach_chunk_stub(&scanctx, do_dimension_alignment, 0);
592 
593 	/*
594 	 * If there are any remaining collisions with chunks, then cut-to-fit to
595 	 * resolve those collisions
596 	 */
597 	chunk_scan_ctx_foreach_chunk_stub(&scanctx, do_collision_resolution, 0);
598 
599 	chunk_scan_ctx_destroy(&scanctx);
600 }
601 
602 static int
chunk_add_constraints(const Chunk * chunk)603 chunk_add_constraints(const Chunk *chunk)
604 {
605 	int num_added;
606 
607 	num_added = ts_chunk_constraints_add_dimension_constraints(chunk->constraints,
608 															   chunk->fd.id,
609 															   chunk->cube);
610 	num_added += ts_chunk_constraints_add_inheritable_constraints(chunk->constraints,
611 																  chunk->fd.id,
612 																  chunk->relkind,
613 																  chunk->hypertable_relid);
614 
615 	return num_added;
616 }
617 
618 /* applies the attributes and statistics target for columns on the hypertable
619    to columns on the chunk */
620 static void
set_attoptions(Relation ht_rel,Oid chunk_oid)621 set_attoptions(Relation ht_rel, Oid chunk_oid)
622 {
623 	TupleDesc tupleDesc = RelationGetDescr(ht_rel);
624 	int natts = tupleDesc->natts;
625 	int attno;
626 
627 	for (attno = 1; attno <= natts; attno++)
628 	{
629 		Form_pg_attribute attribute = TupleDescAttr(tupleDesc, attno - 1);
630 		char *attributeName = NameStr(attribute->attname);
631 		HeapTuple tuple;
632 		Datum options;
633 		bool isnull;
634 
635 		/* Ignore dropped */
636 		if (attribute->attisdropped)
637 			continue;
638 
639 		tuple = SearchSysCacheAttName(RelationGetRelid(ht_rel), attributeName);
640 
641 		Assert(tuple != NULL);
642 
643 		/*
644 		 * Pass down the attribute options (ALTER TABLE ALTER COLUMN SET
645 		 * attribute_option)
646 		 */
647 		options = SysCacheGetAttr(ATTNAME, tuple, Anum_pg_attribute_attoptions, &isnull);
648 
649 		if (!isnull)
650 		{
651 			AlterTableCmd *cmd = makeNode(AlterTableCmd);
652 
653 			cmd->subtype = AT_SetOptions;
654 			cmd->name = attributeName;
655 			cmd->def = (Node *) untransformRelOptions(options);
656 			AlterTableInternal(chunk_oid, list_make1(cmd), false);
657 		}
658 
659 		/*
660 		 * Pass down the attribute options (ALTER TABLE ALTER COLUMN SET
661 		 * STATISTICS)
662 		 */
663 		options = SysCacheGetAttr(ATTNAME, tuple, Anum_pg_attribute_attstattarget, &isnull);
664 		if (!isnull)
665 		{
666 			int32 target = DatumGetInt32(options);
667 
668 			/* Don't do anything if it's set to the default */
669 			if (target != -1)
670 			{
671 				AlterTableCmd *cmd = makeNode(AlterTableCmd);
672 
673 				cmd->subtype = AT_SetStatistics;
674 				cmd->name = attributeName;
675 				cmd->def = (Node *) makeInteger(target);
676 				AlterTableInternal(chunk_oid, list_make1(cmd), false);
677 			}
678 		}
679 
680 		ReleaseSysCache(tuple);
681 	}
682 }
683 
684 static void
create_toast_table(CreateStmt * stmt,Oid chunk_oid)685 create_toast_table(CreateStmt *stmt, Oid chunk_oid)
686 {
687 	/* similar to tcop/utility.c */
688 	static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
689 	Datum toast_options =
690 		transformRelOptions((Datum) 0, stmt->options, "toast", validnsps, true, false);
691 
692 	(void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
693 
694 	NewRelationCreateToastTable(chunk_oid, toast_options);
695 }
696 
697 /*
698  * Get the access method name for a relation.
699  */
700 static char *
get_am_name_for_rel(Oid relid)701 get_am_name_for_rel(Oid relid)
702 {
703 	HeapTuple tuple;
704 	Form_pg_class cform;
705 	Oid amoid;
706 
707 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
708 
709 	if (!HeapTupleIsValid(tuple))
710 		elog(ERROR, "cache lookup failed for relation %u", relid);
711 
712 	cform = (Form_pg_class) GETSTRUCT(tuple);
713 	amoid = cform->relam;
714 	ReleaseSysCache(tuple);
715 
716 	return get_am_name(amoid);
717 }
718 
719 static void
copy_hypertable_acl_to_relid(const Hypertable * ht,Oid owner_id,Oid relid)720 copy_hypertable_acl_to_relid(const Hypertable *ht, Oid owner_id, Oid relid)
721 {
722 	HeapTuple ht_tuple;
723 	bool is_null;
724 	Datum acl_datum;
725 	Relation class_rel;
726 
727 	/* We open it here since there is no point in trying to update the tuples
728 	 * if we cannot open the Relation catalog table */
729 	class_rel = table_open(RelationRelationId, RowExclusiveLock);
730 
731 	ht_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(ht->main_table_relid));
732 	Assert(HeapTupleIsValid(ht_tuple));
733 
734 	/* We only bother about setting the chunk ACL if the hypertable ACL is
735 	 * non-null */
736 	acl_datum = SysCacheGetAttr(RELOID, ht_tuple, Anum_pg_class_relacl, &is_null);
737 	if (!is_null)
738 	{
739 		HeapTuple chunk_tuple, newtuple;
740 		Datum new_val[Natts_pg_class] = { 0 };
741 		bool new_null[Natts_pg_class] = { false };
742 		bool new_repl[Natts_pg_class] = { false };
743 		Acl *acl = DatumGetAclP(acl_datum);
744 
745 		new_repl[Anum_pg_class_relacl - 1] = true;
746 		new_val[Anum_pg_class_relacl - 1] = PointerGetDatum(acl);
747 
748 		/* Find the tuple for the chunk in `pg_class` */
749 		chunk_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
750 		Assert(HeapTupleIsValid(chunk_tuple));
751 
752 		/* Update the relacl for the chunk tuple to use the acl from the hypertable */
753 		newtuple = heap_modify_tuple(chunk_tuple,
754 									 RelationGetDescr(class_rel),
755 									 new_val,
756 									 new_null,
757 									 new_repl);
758 		CatalogTupleUpdate(class_rel, &newtuple->t_self, newtuple);
759 
760 		/* We need to update the shared dependencies as well to indicate that
761 		 * the chunk is dependent on any roles that the hypertable is
762 		 * dependent on. */
763 		Oid *newmembers;
764 		int nnewmembers = aclmembers(acl, &newmembers);
765 
766 		/* The list of old members is intentionally empty since we are using
767 		 * updateAclDependencies to set the ACL for the chunk. We can use NULL
768 		 * because getOidListDiff, which is called from updateAclDependencies,
769 		 * can handle that. */
770 		updateAclDependencies(RelationRelationId,
771 							  relid,
772 							  0,
773 							  owner_id,
774 							  0,
775 							  NULL,
776 							  nnewmembers,
777 							  newmembers);
778 
779 		heap_freetuple(newtuple);
780 		ReleaseSysCache(chunk_tuple);
781 	}
782 
783 	ReleaseSysCache(ht_tuple);
784 	table_close(class_rel, RowExclusiveLock);
785 }
786 
787 /*
788  * Create a chunk's table.
789  *
790  * A chunk inherits from the main hypertable and will have the same owner. Since
791  * chunks can be created either in the TimescaleDB internal schema or in a
792  * user-specified schema, some care has to be taken to use the right
793  * permissions, depending on the case:
794  *
795  * 1. if the chunk is created in the internal schema, we create it as the
796  * catalog/schema owner (i.e., anyone can create chunks there via inserting into
797  * a hypertable, but can not do it via CREATE TABLE).
798  *
799  * 2. if the chunk is created in a user-specified "associated schema", then we
800  * shouldn't use the catalog owner to create the table since that typically
801  * implies super-user permissions. If we would allow that, anyone can specify
802  * someone else's schema in create_hypertable() and create chunks in it without
803  * having the proper permissions to do so. With this logic, the hypertable owner
804  * must have permissions to create tables in the associated schema, or else
805  * table creation will fail. If the schema doesn't yet exist, the table owner
806  * instead needs the proper permissions on the database to create the schema.
807  */
808 Oid
ts_chunk_create_table(const Chunk * chunk,const Hypertable * ht,const char * tablespacename)809 ts_chunk_create_table(const Chunk *chunk, const Hypertable *ht, const char *tablespacename)
810 {
811 	Relation rel;
812 	ObjectAddress objaddr;
813 	int sec_ctx;
814 
815 	/*
816 	 * The CreateForeignTableStmt embeds a regular CreateStmt, so we can use
817 	 * it to create both regular and foreign tables
818 	 */
819 	CreateForeignTableStmt stmt = {
820 		.base.type = T_CreateStmt,
821 		.base.relation = makeRangeVar((char *) NameStr(chunk->fd.schema_name),
822 									  (char *) NameStr(chunk->fd.table_name),
823 									  0),
824 		.base.inhRelations = list_make1(makeRangeVar((char *) NameStr(ht->fd.schema_name),
825 													 (char *) NameStr(ht->fd.table_name),
826 													 0)),
827 		.base.tablespacename = tablespacename ? (char *) tablespacename : NULL,
828 		/* Propagate storage options of the main table to a regular chunk
829 		 * table, but avoid using it for a foreign chunk table. */
830 		.base.options =
831 			(chunk->relkind == RELKIND_RELATION) ? ts_get_reloptions(ht->main_table_relid) : NIL,
832 		.base.accessMethod = (chunk->relkind == RELKIND_RELATION) ?
833 								 get_am_name_for_rel(chunk->hypertable_relid) :
834 								 NULL,
835 	};
836 	Oid uid, saved_uid;
837 
838 	Assert(chunk->hypertable_relid == ht->main_table_relid);
839 
840 	rel = table_open(ht->main_table_relid, AccessShareLock);
841 
842 	/*
843 	 * If the chunk is created in the internal schema, become the catalog
844 	 * owner, otherwise become the hypertable owner
845 	 */
846 	if (namestrcmp((Name) &chunk->fd.schema_name, INTERNAL_SCHEMA_NAME) == 0)
847 		uid = ts_catalog_database_info_get()->owner_uid;
848 	else
849 		uid = rel->rd_rel->relowner;
850 
851 	GetUserIdAndSecContext(&saved_uid, &sec_ctx);
852 
853 	if (uid != saved_uid)
854 		SetUserIdAndSecContext(uid, sec_ctx | SECURITY_LOCAL_USERID_CHANGE);
855 
856 	objaddr = DefineRelation(&stmt.base, chunk->relkind, rel->rd_rel->relowner, NULL, NULL);
857 
858 	/* Make the newly defined relation visible so that we can update the
859 	 * ACL. */
860 	CommandCounterIncrement();
861 
862 	/* Copy acl from hypertable to chunk relation record */
863 	copy_hypertable_acl_to_relid(ht, rel->rd_rel->relowner, objaddr.objectId);
864 
865 	if (chunk->relkind == RELKIND_RELATION)
866 	{
867 		/*
868 		 * need to create a toast table explicitly for some of the option
869 		 * setting to work
870 		 */
871 		create_toast_table(&stmt.base, objaddr.objectId);
872 
873 		if (uid != saved_uid)
874 			SetUserIdAndSecContext(saved_uid, sec_ctx);
875 	}
876 	else if (chunk->relkind == RELKIND_FOREIGN_TABLE)
877 	{
878 		ChunkDataNode *cdn;
879 
880 		if (list_length(chunk->data_nodes) == 0)
881 			ereport(ERROR,
882 					(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
883 					 (errmsg("no data nodes associated with chunk \"%s\"",
884 							 get_rel_name(chunk->table_id)))));
885 
886 		/*
887 		 * Use the first chunk data node as the "primary" to put in the foreign
888 		 * table
889 		 */
890 		cdn = linitial(chunk->data_nodes);
891 		stmt.base.type = T_CreateForeignServerStmt;
892 		stmt.servername = NameStr(cdn->fd.node_name);
893 
894 		/* Create the foreign table catalog information */
895 		CreateForeignTable(&stmt, objaddr.objectId);
896 
897 		/*
898 		 * Need to restore security context to execute remote commands as the
899 		 * original user
900 		 */
901 		if (uid != saved_uid)
902 			SetUserIdAndSecContext(saved_uid, sec_ctx);
903 
904 		/* Create the corresponding chunk replicas on the remote data nodes */
905 		ts_cm_functions->create_chunk_on_data_nodes(chunk, ht, NULL, NIL);
906 
907 		/* Record the remote data node chunk ID mappings */
908 		ts_chunk_data_node_insert_multi(chunk->data_nodes);
909 	}
910 	else
911 		elog(ERROR, "invalid relkind \"%c\" when creating chunk", chunk->relkind);
912 
913 	set_attoptions(rel, objaddr.objectId);
914 
915 	table_close(rel, AccessShareLock);
916 
917 	return objaddr.objectId;
918 }
919 
920 static List *
chunk_assign_data_nodes(const Chunk * chunk,const Hypertable * ht)921 chunk_assign_data_nodes(const Chunk *chunk, const Hypertable *ht)
922 {
923 	List *htnodes;
924 	List *chunk_data_nodes = NIL;
925 	ListCell *lc;
926 
927 	if (chunk->relkind != RELKIND_FOREIGN_TABLE)
928 		return NIL;
929 
930 	if (ht->data_nodes == NIL)
931 		ereport(ERROR,
932 				(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
933 				 (errmsg("no data nodes associated with hypertable \"%s\"",
934 						 get_rel_name(ht->main_table_relid)))));
935 
936 	Assert(chunk->cube != NULL);
937 
938 	htnodes = ts_hypertable_assign_chunk_data_nodes(ht, chunk->cube);
939 	Assert(htnodes != NIL);
940 
941 	foreach (lc, htnodes)
942 	{
943 		HypertableDataNode *htnode = lfirst(lc);
944 		ForeignServer *foreign_server =
945 			GetForeignServerByName(NameStr(htnode->fd.node_name), false);
946 		ChunkDataNode *chunk_data_node = palloc0(sizeof(ChunkDataNode));
947 
948 		/*
949 		 * Create a stub data node (partially filled in entry). This will be
950 		 * fully filled in and persisted to metadata tables once we create the
951 		 * remote tables during insert
952 		 */
953 		chunk_data_node->fd.chunk_id = chunk->fd.id;
954 		chunk_data_node->fd.node_chunk_id = -1;
955 		namestrcpy(&chunk_data_node->fd.node_name, foreign_server->servername);
956 		chunk_data_node->foreign_server_oid = foreign_server->serverid;
957 		chunk_data_nodes = lappend(chunk_data_nodes, chunk_data_node);
958 	}
959 
960 	return chunk_data_nodes;
961 }
962 
963 List *
ts_chunk_get_data_node_name_list(const Chunk * chunk)964 ts_chunk_get_data_node_name_list(const Chunk *chunk)
965 {
966 	List *datanodes = NULL;
967 	ListCell *lc;
968 
969 	foreach (lc, chunk->data_nodes)
970 	{
971 		ChunkDataNode *cdn = lfirst(lc);
972 
973 		datanodes = lappend(datanodes, NameStr(cdn->fd.node_name));
974 	}
975 
976 	return datanodes;
977 }
978 
979 bool
ts_chunk_has_data_node(const Chunk * chunk,const char * node_name)980 ts_chunk_has_data_node(const Chunk *chunk, const char *node_name)
981 {
982 	ListCell *lc;
983 	ChunkDataNode *cdn;
984 	bool found = false;
985 
986 	if (chunk == NULL || node_name == NULL)
987 		return false;
988 
989 	/* check that the chunk is indeed present on the specified data node */
990 	foreach (lc, chunk->data_nodes)
991 	{
992 		cdn = lfirst(lc);
993 		if (namestrcmp(&cdn->fd.node_name, node_name) == 0)
994 		{
995 			found = true;
996 			break;
997 		}
998 	}
999 
1000 	return found;
1001 }
1002 
1003 static int32
get_next_chunk_id()1004 get_next_chunk_id()
1005 {
1006 	int32 chunk_id;
1007 	CatalogSecurityContext sec_ctx;
1008 	const Catalog *catalog = ts_catalog_get();
1009 
1010 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
1011 	chunk_id = ts_catalog_table_next_seq_id(catalog, CHUNK);
1012 	ts_catalog_restore_user(&sec_ctx);
1013 
1014 	return chunk_id;
1015 }
1016 
1017 /*
1018  * Create a chunk object from the dimensional constraints in the given hypercube.
1019  *
1020  * The chunk object is then used to create the actual chunk table and update the
1021  * metadata separately.
1022  *
1023  * The table name for the chunk can be given explicitly, or generated if
1024  * table_name is NULL. If the table name is generated, it will use the given
1025  * prefix or, if NULL, use the hypertable's associated table prefix. Similarly,
1026  * if schema_name is NULL it will use the hypertable's associated schema for
1027  * the chunk.
1028  */
1029 static Chunk *
chunk_create_object(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix,int32 chunk_id)1030 chunk_create_object(const Hypertable *ht, Hypercube *cube, const char *schema_name,
1031 					const char *table_name, const char *prefix, int32 chunk_id)
1032 {
1033 	const Hyperspace *hs = ht->space;
1034 	Chunk *chunk;
1035 	const char relkind = hypertable_chunk_relkind(ht);
1036 
1037 	if (NULL == schema_name || schema_name[0] == '\0')
1038 		schema_name = NameStr(ht->fd.associated_schema_name);
1039 
1040 	/* Create a new chunk based on the hypercube */
1041 	chunk = ts_chunk_create_base(chunk_id, hs->num_dimensions, relkind);
1042 
1043 	chunk->fd.hypertable_id = hs->hypertable_id;
1044 	chunk->cube = cube;
1045 	chunk->hypertable_relid = ht->main_table_relid;
1046 	namestrcpy(&chunk->fd.schema_name, schema_name);
1047 
1048 	if (NULL == table_name || table_name[0] == '\0')
1049 	{
1050 		int len;
1051 
1052 		if (NULL == prefix)
1053 			prefix = NameStr(ht->fd.associated_table_prefix);
1054 
1055 		len = snprintf(chunk->fd.table_name.data, NAMEDATALEN, "%s_%d_chunk", prefix, chunk->fd.id);
1056 
1057 		if (len >= NAMEDATALEN)
1058 			elog(ERROR, "chunk table name too long");
1059 	}
1060 	else
1061 		namestrcpy(&chunk->fd.table_name, table_name);
1062 
1063 	if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1064 		chunk->data_nodes = chunk_assign_data_nodes(chunk, ht);
1065 
1066 	return chunk;
1067 }
1068 
1069 static void
chunk_insert_into_metadata_after_lock(const Chunk * chunk)1070 chunk_insert_into_metadata_after_lock(const Chunk *chunk)
1071 {
1072 	/* Insert chunk */
1073 	ts_chunk_insert_lock(chunk, RowExclusiveLock);
1074 
1075 	/* Add metadata for dimensional and inheritable constraints */
1076 	ts_chunk_constraints_insert_metadata(chunk->constraints);
1077 }
1078 
1079 static void
chunk_create_table_constraints(const Chunk * chunk)1080 chunk_create_table_constraints(const Chunk *chunk)
1081 {
1082 	/* Create the chunk's constraints, triggers, and indexes */
1083 	ts_chunk_constraints_create(chunk->constraints,
1084 								chunk->table_id,
1085 								chunk->fd.id,
1086 								chunk->hypertable_relid,
1087 								chunk->fd.hypertable_id);
1088 
1089 	if (chunk->relkind == RELKIND_RELATION)
1090 	{
1091 		ts_trigger_create_all_on_chunk(chunk);
1092 		ts_chunk_index_create_all(chunk->fd.hypertable_id,
1093 								  chunk->hypertable_relid,
1094 								  chunk->fd.id,
1095 								  chunk->table_id);
1096 	}
1097 }
1098 
1099 static Oid
chunk_create_table(Chunk * chunk,const Hypertable * ht)1100 chunk_create_table(Chunk *chunk, const Hypertable *ht)
1101 {
1102 	/* Create the actual table relation for the chunk */
1103 	const char *tablespace = ts_hypertable_select_tablespace_name(ht, chunk);
1104 
1105 	chunk->table_id = ts_chunk_create_table(chunk, ht, tablespace);
1106 
1107 	Assert(OidIsValid(chunk->table_id));
1108 
1109 	return chunk->table_id;
1110 }
1111 
1112 static void
init_scan_by_chunk_id(ScanIterator * iterator,int32 chunk_id)1113 init_scan_by_chunk_id(ScanIterator *iterator, int32 chunk_id)
1114 {
1115 	iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
1116 	ts_scan_iterator_scan_key_init(iterator,
1117 								   Anum_chunk_idx_id,
1118 								   BTEqualStrategyNumber,
1119 								   F_INT4EQ,
1120 								   Int32GetDatum(chunk_id));
1121 }
1122 
1123 /*
1124  * Creates only a table for a chunk.
1125  * Either table name or chunk id needs to be provided.
1126  */
1127 static Chunk *
chunk_create_only_table_after_lock(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix,int32 chunk_id)1128 chunk_create_only_table_after_lock(const Hypertable *ht, Hypercube *cube, const char *schema_name,
1129 								   const char *table_name, const char *prefix, int32 chunk_id)
1130 {
1131 	Chunk *chunk;
1132 
1133 	Assert(table_name != NULL || chunk_id != INVALID_CHUNK_ID);
1134 
1135 	chunk = chunk_create_object(ht, cube, schema_name, table_name, prefix, chunk_id);
1136 	Assert(chunk != NULL);
1137 
1138 	chunk_create_table(chunk, ht);
1139 
1140 	return chunk;
1141 }
1142 
1143 static void
chunk_table_drop_inherit(const Chunk * chunk,Hypertable * ht)1144 chunk_table_drop_inherit(const Chunk *chunk, Hypertable *ht)
1145 {
1146 	AlterTableCmd drop_inh_cmd = {
1147 		.type = T_AlterTableCmd,
1148 		.subtype = AT_DropInherit,
1149 		.def = (Node *) makeRangeVar(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name), -1),
1150 		.missing_ok = false
1151 	};
1152 
1153 	AlterTableInternal(chunk->table_id, list_make1(&drop_inh_cmd), false);
1154 }
1155 
1156 /*
1157  * Checks that given hypercube does not collide with existing chunks and
1158  * creates an empty table for a chunk without any metadata modifications.
1159  */
1160 Chunk *
ts_chunk_create_only_table(Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name)1161 ts_chunk_create_only_table(Hypertable *ht, Hypercube *cube, const char *schema_name,
1162 						   const char *table_name)
1163 {
1164 	ChunkStub *stub;
1165 	Chunk *chunk;
1166 	ScanTupLock tuplock = {
1167 		.lockmode = LockTupleKeyShare,
1168 		.waitpolicy = LockWaitBlock,
1169 	};
1170 
1171 	/*
1172 	 * Chunk table can be created if no chunk collides with the dimension slices.
1173 	 */
1174 	stub = chunk_collides(ht, cube);
1175 	if (stub != NULL)
1176 		ereport(ERROR,
1177 				(errcode(ERRCODE_TS_CHUNK_COLLISION),
1178 				 errmsg("chunk table creation failed due to dimension slice collision")));
1179 
1180 	/*
1181 	 * Serialize chunk creation around a lock on the "main table" to avoid
1182 	 * multiple processes trying to create the same chunk. We use a
1183 	 * ShareUpdateExclusiveLock, which is the weakest lock possible that
1184 	 * conflicts with itself. The lock needs to be held until transaction end.
1185 	 */
1186 	LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1187 
1188 	ts_hypercube_find_existing_slices(cube, &tuplock);
1189 
1190 	chunk = chunk_create_only_table_after_lock(ht,
1191 											   cube,
1192 											   schema_name,
1193 											   table_name,
1194 											   NULL,
1195 											   INVALID_CHUNK_ID);
1196 	chunk_table_drop_inherit(chunk, ht);
1197 
1198 	return chunk;
1199 }
1200 
1201 static Chunk *
chunk_create_from_hypercube_after_lock(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix)1202 chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
1203 									   const char *schema_name, const char *table_name,
1204 									   const char *prefix)
1205 {
1206 	/* Insert any new dimension slices into metadata */
1207 	ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);
1208 
1209 	Chunk *chunk = chunk_create_only_table_after_lock(ht,
1210 													  cube,
1211 													  schema_name,
1212 													  table_name,
1213 													  prefix,
1214 													  get_next_chunk_id());
1215 
1216 	chunk_add_constraints(chunk);
1217 	chunk_insert_into_metadata_after_lock(chunk);
1218 	chunk_create_table_constraints(chunk);
1219 
1220 	return chunk;
1221 }
1222 
1223 /*
1224  * Make a chunk table inherit a hypertable.
1225  *
1226  * Execution happens via high-level ALTER TABLE statement. This includes
1227  * numerous checks to ensure that the chunk table has all the prerequisites to
1228  * properly inherit the hypertable.
1229  */
1230 static void
chunk_add_inheritance(Chunk * chunk,const Hypertable * ht)1231 chunk_add_inheritance(Chunk *chunk, const Hypertable *ht)
1232 {
1233 	AlterTableCmd altercmd = {
1234 		.type = T_AlterTableCmd,
1235 		.subtype = AT_AddInherit,
1236 		.def = (Node *) makeRangeVar((char *) NameStr(ht->fd.schema_name),
1237 									 (char *) NameStr(ht->fd.table_name),
1238 									 0),
1239 		.missing_ok = false,
1240 	};
1241 	AlterTableStmt alterstmt = {
1242 		.type = T_AlterTableStmt,
1243 		.cmds = list_make1(&altercmd),
1244 		.missing_ok = false,
1245 #if PG14_GE
1246 		.objtype = OBJECT_TABLE,
1247 #else
1248 		.relkind = OBJECT_TABLE,
1249 #endif
1250 		.relation = makeRangeVar((char *) NameStr(chunk->fd.schema_name),
1251 								 (char *) NameStr(chunk->fd.table_name),
1252 								 0),
1253 	};
1254 	LOCKMODE lockmode = AlterTableGetLockLevel(alterstmt.cmds);
1255 #if PG13_GE
1256 	AlterTableUtilityContext atcontext = {
1257 		.relid = AlterTableLookupRelation(&alterstmt, lockmode),
1258 	};
1259 
1260 	AlterTable(&alterstmt, lockmode, &atcontext);
1261 #else
1262 	AlterTable(AlterTableLookupRelation(&alterstmt, lockmode), lockmode, &alterstmt);
1263 #endif
1264 }
1265 
1266 static Chunk *
chunk_create_from_hypercube_and_table_after_lock(const Hypertable * ht,Hypercube * cube,Oid chunk_table_relid,const char * schema_name,const char * table_name,const char * prefix)1267 chunk_create_from_hypercube_and_table_after_lock(const Hypertable *ht, Hypercube *cube,
1268 												 Oid chunk_table_relid, const char *schema_name,
1269 												 const char *table_name, const char *prefix)
1270 {
1271 	Oid current_chunk_schemaid = get_rel_namespace(chunk_table_relid);
1272 	Oid new_chunk_schemaid = InvalidOid;
1273 	Chunk *chunk;
1274 
1275 	Assert(OidIsValid(chunk_table_relid));
1276 	Assert(OidIsValid(current_chunk_schemaid));
1277 
1278 	/* Insert any new dimension slices into metadata */
1279 	ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);
1280 	chunk = chunk_create_object(ht, cube, schema_name, table_name, prefix, get_next_chunk_id());
1281 	chunk->table_id = chunk_table_relid;
1282 	chunk->hypertable_relid = ht->main_table_relid;
1283 	Assert(OidIsValid(ht->main_table_relid));
1284 
1285 	new_chunk_schemaid = get_namespace_oid(NameStr(chunk->fd.schema_name), false);
1286 
1287 	if (current_chunk_schemaid != new_chunk_schemaid)
1288 	{
1289 		Relation chunk_rel = table_open(chunk_table_relid, AccessExclusiveLock);
1290 		ObjectAddresses *objects;
1291 
1292 		CheckSetNamespace(current_chunk_schemaid, new_chunk_schemaid);
1293 		objects = new_object_addresses();
1294 		AlterTableNamespaceInternal(chunk_rel, current_chunk_schemaid, new_chunk_schemaid, objects);
1295 		free_object_addresses(objects);
1296 		table_close(chunk_rel, NoLock);
1297 		/* Make changes visible */
1298 		CommandCounterIncrement();
1299 	}
1300 
1301 	if (namestrcmp(&chunk->fd.table_name, get_rel_name(chunk_table_relid)) != 0)
1302 	{
1303 		/* Renaming will acquire and keep an AccessExclusivelock on the chunk
1304 		 * table */
1305 		RenameRelationInternal(chunk_table_relid, NameStr(chunk->fd.table_name), true, false);
1306 		/* Make changes visible */
1307 		CommandCounterIncrement();
1308 	}
1309 
1310 	/* Note that we do not automatically add constrains and triggers to the
1311 	 * chunk table when the chunk is created from an existing table. However,
1312 	 * PostgreSQL currently validates that CHECK constraints exists, but no
1313 	 * validation is done for other objects, including triggers, UNIQUE,
1314 	 * PRIMARY KEY, and FOREIGN KEY constraints. We might want to either
1315 	 * enforce that these constraints exist prior to creating the chunk from a
1316 	 * table, or we ensure that they are automatically added when the chunk is
1317 	 * created. However, for the latter case, we risk duplicating constraints
1318 	 * and triggers if some of them already exist on the chunk table prior to
1319 	 * creating the chunk from it. */
1320 	chunk_add_constraints(chunk);
1321 	chunk_insert_into_metadata_after_lock(chunk);
1322 	chunk_add_inheritance(chunk, ht);
1323 	chunk_create_table_constraints(chunk);
1324 
1325 	return chunk;
1326 }
1327 
1328 static Chunk *
chunk_create_from_point_after_lock(const Hypertable * ht,const Point * p,const char * schema_name,const char * table_name,const char * prefix)1329 chunk_create_from_point_after_lock(const Hypertable *ht, const Point *p, const char *schema_name,
1330 								   const char *table_name, const char *prefix)
1331 {
1332 	Hyperspace *hs = ht->space;
1333 	Hypercube *cube;
1334 	ScanTupLock tuplock = {
1335 		.lockmode = LockTupleKeyShare,
1336 		.waitpolicy = LockWaitBlock,
1337 	};
1338 
1339 	/*
1340 	 * If the user has enabled adaptive chunking, call the function to
1341 	 * calculate and set the new chunk time interval.
1342 	 */
1343 	calculate_and_set_new_chunk_interval(ht, p);
1344 
1345 	/* Calculate the hypercube for a new chunk that covers the tuple's point.
1346 	 *
1347 	 * We lock the tuple in KEY SHARE mode since we are concerned with
1348 	 * ensuring that it is not deleted (or the key value changed) while we are
1349 	 * adding chunk constraints (in `ts_chunk_constraints_insert_metadata`
1350 	 * called in `chunk_create_metadata_after_lock`). The range of a dimension
1351 	 * slice does not change, but we should use the weakest lock possible to
1352 	 * not unnecessarily block other operations. */
1353 	cube = ts_hypercube_calculate_from_point(hs, p, &tuplock);
1354 
1355 	/* Resolve collisions with other chunks by cutting the new hypercube */
1356 	chunk_collision_resolve(ht, cube, p);
1357 
1358 	return chunk_create_from_hypercube_after_lock(ht, cube, schema_name, table_name, prefix);
1359 }
1360 
1361 Chunk *
ts_chunk_find_or_create_without_cuts(const Hypertable * ht,Hypercube * hc,const char * schema_name,const char * table_name,Oid chunk_table_relid,bool * created)1362 ts_chunk_find_or_create_without_cuts(const Hypertable *ht, Hypercube *hc, const char *schema_name,
1363 									 const char *table_name, Oid chunk_table_relid, bool *created)
1364 {
1365 	ChunkStub *stub;
1366 	Chunk *chunk = NULL;
1367 
1368 	DEBUG_WAITPOINT("find_or_create_chunk_start");
1369 
1370 	stub = chunk_collides(ht, hc);
1371 
1372 	if (NULL == stub)
1373 	{
1374 		/* Serialize chunk creation around the root hypertable */
1375 		LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1376 
1377 		/* Check again after lock */
1378 		stub = chunk_collides(ht, hc);
1379 
1380 		if (NULL == stub)
1381 		{
1382 			ScanTupLock tuplock = {
1383 				.lockmode = LockTupleKeyShare,
1384 				.waitpolicy = LockWaitBlock,
1385 			};
1386 
1387 			/* Lock all slices that already exist to ensure they remain when we
1388 			 * commit since we won't create those slices ourselves. */
1389 			ts_hypercube_find_existing_slices(hc, &tuplock);
1390 
1391 			if (OidIsValid(chunk_table_relid))
1392 				chunk = chunk_create_from_hypercube_and_table_after_lock(ht,
1393 																		 hc,
1394 																		 chunk_table_relid,
1395 																		 schema_name,
1396 																		 table_name,
1397 																		 NULL);
1398 			else
1399 				chunk =
1400 					chunk_create_from_hypercube_after_lock(ht, hc, schema_name, table_name, NULL);
1401 
1402 			if (NULL != created)
1403 				*created = true;
1404 
1405 			ASSERT_IS_VALID_CHUNK(chunk);
1406 
1407 			DEBUG_WAITPOINT("find_or_create_chunk_created");
1408 
1409 			return chunk;
1410 		}
1411 
1412 		/* We didn't need the lock, so release it */
1413 		UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1414 	}
1415 
1416 	Assert(NULL != stub);
1417 
1418 	/* We can only use an existing chunk if it has identical dimensional
1419 	 * constraints. Otherwise, throw an error */
1420 	if (!ts_hypercube_equal(stub->cube, hc))
1421 		ereport(ERROR,
1422 				(errcode(ERRCODE_TS_CHUNK_COLLISION),
1423 				 errmsg("chunk creation failed due to collision")));
1424 
1425 	/* chunk_collides only returned a stub, so we need to lookup the full
1426 	 * chunk. */
1427 	chunk = ts_chunk_get_by_id(stub->id, true);
1428 
1429 	if (NULL != created)
1430 		*created = false;
1431 
1432 	DEBUG_WAITPOINT("find_or_create_chunk_found");
1433 
1434 	ASSERT_IS_VALID_CHUNK(chunk);
1435 
1436 	return chunk;
1437 }
1438 
1439 /*
1440  * Create a chunk through insertion of a tuple at a given point.
1441  */
1442 Chunk *
ts_chunk_create_from_point(const Hypertable * ht,const Point * p,const char * schema,const char * prefix)1443 ts_chunk_create_from_point(const Hypertable *ht, const Point *p, const char *schema,
1444 						   const char *prefix)
1445 {
1446 	Chunk *chunk;
1447 
1448 	/*
1449 	 * Serialize chunk creation around a lock on the "main table" to avoid
1450 	 * multiple processes trying to create the same chunk. We use a
1451 	 * ShareUpdateExclusiveLock, which is the weakest lock possible that
1452 	 * conflicts with itself. The lock needs to be held until transaction end.
1453 	 */
1454 	LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1455 
1456 	/* Recheck if someone else created the chunk before we got the table
1457 	 * lock. The returned chunk will have all slices locked so that they
1458 	 * aren't removed. */
1459 	chunk = chunk_find(ht, p, true, true);
1460 
1461 	if (NULL == chunk)
1462 	{
1463 		if (hypertable_is_distributed_member(ht))
1464 			ereport(ERROR,
1465 					(errcode(ERRCODE_TS_INTERNAL_ERROR),
1466 					 errmsg("distributed hypertable member cannot create chunk on its own"),
1467 					 errhint("Chunk creation should only happen through an access node.")));
1468 
1469 		chunk = chunk_create_from_point_after_lock(ht, p, schema, NULL, prefix);
1470 	}
1471 	else
1472 	{
1473 		/* Chunk was not created, so we can release the lock early */
1474 		UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1475 	}
1476 
1477 	ASSERT_IS_VALID_CHUNK(chunk);
1478 
1479 	return chunk;
1480 }
1481 
1482 ChunkStub *
ts_chunk_stub_create(int32 id,int16 num_constraints)1483 ts_chunk_stub_create(int32 id, int16 num_constraints)
1484 {
1485 	ChunkStub *stub;
1486 
1487 	stub = palloc0(sizeof(*stub));
1488 	stub->id = id;
1489 
1490 	if (num_constraints > 0)
1491 		stub->constraints = ts_chunk_constraints_alloc(num_constraints, CurrentMemoryContext);
1492 
1493 	return stub;
1494 }
1495 
1496 Chunk *
ts_chunk_create_base(int32 id,int16 num_constraints,const char relkind)1497 ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind)
1498 {
1499 	Chunk *chunk;
1500 
1501 	chunk = palloc0(sizeof(Chunk));
1502 	chunk->fd.id = id;
1503 	chunk->fd.compressed_chunk_id = INVALID_CHUNK_ID;
1504 	chunk->relkind = relkind;
1505 
1506 	if (num_constraints > 0)
1507 		chunk->constraints = ts_chunk_constraints_alloc(num_constraints, CurrentMemoryContext);
1508 
1509 	return chunk;
1510 }
1511 
1512 /*
1513  * Build a chunk from a chunk tuple and a stub.
1514  *
1515  * The stub allows the chunk to be constructed more efficiently. But if the stub
1516  * is not "valid", dimension slices and constraints are fully
1517  * rescanned/recreated.
1518  */
1519 static Chunk *
chunk_build_from_tuple_and_stub(Chunk ** chunkptr,TupleInfo * ti,const ChunkStub * stub)1520 chunk_build_from_tuple_and_stub(Chunk **chunkptr, TupleInfo *ti, const ChunkStub *stub)
1521 {
1522 	Chunk *chunk = NULL;
1523 	int num_constraints_hint = stub ? stub->constraints->num_constraints : 2;
1524 
1525 	if (NULL == chunkptr)
1526 		chunkptr = &chunk;
1527 
1528 	if (NULL == *chunkptr)
1529 		*chunkptr = MemoryContextAllocZero(ti->mctx, sizeof(Chunk));
1530 
1531 	chunk = *chunkptr;
1532 	chunk_formdata_fill(&chunk->fd, ti);
1533 
1534 	/*
1535 	 * When searching for the chunk stub matching the dimensional point, we
1536 	 * only scanned for dimensional constraints. We now need to rescan the
1537 	 * constraints to also get the inherited constraints.
1538 	 */
1539 	chunk->constraints =
1540 		ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, num_constraints_hint, ti->mctx);
1541 
1542 	/* If a stub is provided then reuse its hypercube. Note that stubs that
1543 	 * are results of a point or range scan might be incomplete (in terms of
1544 	 * number of slices and constraints). Only a chunk stub that matches in
1545 	 * all dimensions will have a complete hypercube. Thus, we need to check
1546 	 * the validity of the stub before we can reuse it.
1547 	 */
1548 	if (chunk_stub_is_valid(stub, chunk->constraints->num_dimension_constraints))
1549 	{
1550 		MemoryContext oldctx = MemoryContextSwitchTo(ti->mctx);
1551 
1552 		chunk->cube = ts_hypercube_copy(stub->cube);
1553 		MemoryContextSwitchTo(oldctx);
1554 
1555 		/*
1556 		 * The hypercube slices were filled in during the scan. Now we need to
1557 		 * sort them in dimension order.
1558 		 */
1559 		ts_hypercube_slice_sort(chunk->cube);
1560 	}
1561 	else
1562 		chunk->cube = ts_hypercube_from_constraints(chunk->constraints, ti->mctx);
1563 
1564 	return chunk;
1565 }
1566 
1567 static ScanFilterResult
chunk_tuple_dropped_filter(const TupleInfo * ti,void * arg)1568 chunk_tuple_dropped_filter(const TupleInfo *ti, void *arg)
1569 {
1570 	ChunkStubScanCtx *stubctx = arg;
1571 	bool isnull;
1572 	Datum dropped = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
1573 
1574 	Assert(!isnull);
1575 	stubctx->is_dropped = DatumGetBool(dropped);
1576 
1577 	return stubctx->is_dropped ? SCAN_EXCLUDE : SCAN_INCLUDE;
1578 }
1579 
1580 /* This is a modified version of chunk_tuple_dropped_filter that does
1581  * not use ChunkStubScanCtx as the arg, it just ignores the passed in
1582  * argument.
1583  * We need a variant as the ScannerCtx assumes that the the filter function
1584  * and tuple_found function share the argument.
1585  */
1586 static ScanFilterResult
chunk_check_ignorearg_dropped_filter(const TupleInfo * ti,void * arg)1587 chunk_check_ignorearg_dropped_filter(const TupleInfo *ti, void *arg)
1588 {
1589 	bool isnull;
1590 	Datum dropped = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
1591 
1592 	Assert(!isnull);
1593 	bool is_dropped = DatumGetBool(dropped);
1594 
1595 	return is_dropped ? SCAN_EXCLUDE : SCAN_INCLUDE;
1596 }
1597 
1598 static ScanTupleResult
chunk_tuple_found(TupleInfo * ti,void * arg)1599 chunk_tuple_found(TupleInfo *ti, void *arg)
1600 {
1601 	ChunkStubScanCtx *stubctx = arg;
1602 	Chunk *chunk;
1603 
1604 	chunk = chunk_build_from_tuple_and_stub(&stubctx->chunk, ti, stubctx->stub);
1605 	Assert(!chunk->fd.dropped);
1606 
1607 	/* Fill in table relids. Note that we cannot do this in
1608 	 * chunk_build_from_tuple_and_stub() since chunk_resurrect() also uses
1609 	 * that function and, in that case, the chunk object is needed to create
1610 	 * the data table and related objects. */
1611 	chunk->table_id = get_relname_relid(chunk->fd.table_name.data,
1612 										get_namespace_oid(chunk->fd.schema_name.data, true));
1613 	chunk->hypertable_relid = ts_hypertable_id_to_relid(chunk->fd.hypertable_id);
1614 	chunk->relkind = get_rel_relkind(chunk->table_id);
1615 
1616 	if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1617 		chunk->data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, ti->mctx);
1618 
1619 	return SCAN_DONE;
1620 }
1621 
1622 /* Create a chunk by scanning on chunk ID. A stub must be provided as input. */
1623 static Chunk *
chunk_create_from_stub(ChunkStubScanCtx * stubctx)1624 chunk_create_from_stub(ChunkStubScanCtx *stubctx)
1625 {
1626 	ScanKeyData scankey[1];
1627 	Catalog *catalog = ts_catalog_get();
1628 	int num_found;
1629 	ScannerCtx scanctx = {
1630 		.table = catalog_get_table_id(catalog, CHUNK),
1631 		.index = catalog_get_index(catalog, CHUNK, CHUNK_ID_INDEX),
1632 		.nkeys = 1,
1633 		.scankey = scankey,
1634 		.data = stubctx,
1635 		.filter = chunk_tuple_dropped_filter,
1636 		.tuple_found = chunk_tuple_found,
1637 		.lockmode = AccessShareLock,
1638 		.scandirection = ForwardScanDirection,
1639 	};
1640 
1641 	/*
1642 	 * Perform an index scan on chunk ID.
1643 	 */
1644 	ScanKeyInit(&scankey[0],
1645 				Anum_chunk_idx_id,
1646 				BTEqualStrategyNumber,
1647 				F_INT4EQ,
1648 				Int32GetDatum(stubctx->stub->id));
1649 
1650 	num_found = ts_scanner_scan(&scanctx);
1651 
1652 	Assert(num_found == 0 || num_found == 1);
1653 
1654 	if (stubctx->is_dropped)
1655 	{
1656 		Assert(num_found == 0);
1657 		return NULL;
1658 	}
1659 
1660 	if (num_found != 1)
1661 		elog(ERROR, "no chunk found with ID %d", stubctx->stub->id);
1662 
1663 	Assert(NULL != stubctx->chunk);
1664 
1665 	return stubctx->chunk;
1666 }
1667 
1668 /*
1669  * Initialize a chunk scan context.
1670  *
1671  * A chunk scan context is used to join chunk-related information from metadata
1672  * tables during scans.
1673  */
1674 static void
chunk_scan_ctx_init(ChunkScanCtx * ctx,const Hyperspace * hs,const Point * p)1675 chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *p)
1676 {
1677 	struct HASHCTL hctl = {
1678 		.keysize = sizeof(int32),
1679 		.entrysize = sizeof(ChunkScanEntry),
1680 		.hcxt = CurrentMemoryContext,
1681 	};
1682 
1683 	memset(ctx, 0, sizeof(*ctx));
1684 	ctx->htab = hash_create("chunk-scan-context", 20, &hctl, HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1685 	ctx->space = hs;
1686 	ctx->point = p;
1687 	ctx->lockmode = NoLock;
1688 }
1689 
1690 /*
1691  * Destroy the chunk scan context.
1692  *
1693  * This will free the hash table in the context, but not the chunks within since
1694  * they are not allocated on the hash tables memory context.
1695  */
1696 static void
chunk_scan_ctx_destroy(ChunkScanCtx * ctx)1697 chunk_scan_ctx_destroy(ChunkScanCtx *ctx)
1698 {
1699 	hash_destroy(ctx->htab);
1700 }
1701 
1702 static inline void
dimension_slice_and_chunk_constraint_join(ChunkScanCtx * scanctx,const DimensionVec * vec)1703 dimension_slice_and_chunk_constraint_join(ChunkScanCtx *scanctx, const DimensionVec *vec)
1704 {
1705 	int i;
1706 
1707 	for (i = 0; i < vec->num_slices; i++)
1708 	{
1709 		/*
1710 		 * For each dimension slice, find matching constraints. These will be
1711 		 * saved in the scan context
1712 		 */
1713 		ts_chunk_constraint_scan_by_dimension_slice(vec->slices[i], scanctx, CurrentMemoryContext);
1714 	}
1715 }
1716 
1717 /*
1718  * Scan for the chunk that encloses the given point.
1719  *
1720  * In each dimension there can be one or more slices that match the point's
1721  * coordinate in that dimension. Slices are collected in the scan context's hash
1722  * table according to the chunk IDs they are associated with. A slice might
1723  * represent the dimensional bound of multiple chunks, and thus is added to all
1724  * the hash table slots of those chunks. At the end of the scan there will be at
1725  * most one chunk that has a complete set of slices, since a point cannot belong
1726  * to two chunks.
1727  */
1728 static void
chunk_point_scan(ChunkScanCtx * scanctx,const Point * p,bool lock_slices)1729 chunk_point_scan(ChunkScanCtx *scanctx, const Point *p, bool lock_slices)
1730 {
1731 	int i;
1732 
1733 	/* Scan all dimensions for slices enclosing the point */
1734 	for (i = 0; i < scanctx->space->num_dimensions; i++)
1735 	{
1736 		DimensionVec *vec;
1737 		ScanTupLock tuplock = {
1738 			.lockmode = LockTupleKeyShare,
1739 			.waitpolicy = LockWaitBlock,
1740 		};
1741 
1742 		vec = ts_dimension_slice_scan_limit(scanctx->space->dimensions[i].fd.id,
1743 											p->coordinates[i],
1744 											0,
1745 											lock_slices ? &tuplock : NULL);
1746 
1747 		dimension_slice_and_chunk_constraint_join(scanctx, vec);
1748 	}
1749 }
1750 
1751 /*
1752  * Scan for chunks that collide with the given hypercube.
1753  *
1754  * Collisions are determined using axis-aligned bounding box collision detection
1755  * generalized to N dimensions. Slices are collected in the scan context's hash
1756  * table according to the chunk IDs they are associated with. A slice might
1757  * represent the dimensional bound of multiple chunks, and thus is added to all
1758  * the hash table slots of those chunks. At the end of the scan, those chunks
1759  * that have a full set of slices are the ones that actually collide with the
1760  * given hypercube.
1761  *
1762  * Chunks in the scan context that do not collide (do not have a full set of
1763  * slices), might still be important for ensuring alignment in those dimensions
1764  * that require alignment.
1765  */
1766 static void
chunk_collision_scan(ChunkScanCtx * scanctx,const Hypercube * cube)1767 chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube)
1768 {
1769 	int i;
1770 
1771 	/* Scan all dimensions for colliding slices */
1772 	for (i = 0; i < scanctx->space->num_dimensions; i++)
1773 	{
1774 		DimensionVec *vec;
1775 		DimensionSlice *slice = cube->slices[i];
1776 
1777 		vec = dimension_slice_collision_scan(slice->fd.dimension_id,
1778 											 slice->fd.range_start,
1779 											 slice->fd.range_end);
1780 
1781 		/* Add the slices to all the chunks they are associated with */
1782 		dimension_slice_and_chunk_constraint_join(scanctx, vec);
1783 	}
1784 }
1785 
1786 /*
1787  * Apply a function to each stub in the scan context's hash table. If the limit
1788  * is greater than zero only a limited number of chunks will be processed.
1789  *
1790  * The chunk handler function (on_chunk_func) should return CHUNK_PROCESSED if
1791  * the chunk should be considered processed and count towards the given
1792  * limit. CHUNK_IGNORE can be returned to have a chunk NOT count towards the
1793  * limit. CHUNK_DONE counts the chunk but aborts processing irrespective of
1794  * whether the limit is reached or not.
1795  *
1796  * Returns the number of processed chunks.
1797  */
1798 static int
chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx * ctx,on_chunk_stub_func on_chunk,uint16 limit)1799 chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_func on_chunk, uint16 limit)
1800 {
1801 	HASH_SEQ_STATUS status;
1802 	ChunkScanEntry *entry;
1803 
1804 	ctx->num_processed = 0;
1805 	hash_seq_init(&status, ctx->htab);
1806 
1807 	for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status))
1808 	{
1809 		switch (on_chunk(ctx, entry->stub))
1810 		{
1811 			case CHUNK_DONE:
1812 				ctx->num_processed++;
1813 				hash_seq_term(&status);
1814 				return ctx->num_processed;
1815 			case CHUNK_PROCESSED:
1816 				ctx->num_processed++;
1817 
1818 				if (limit > 0 && ctx->num_processed == limit)
1819 				{
1820 					hash_seq_term(&status);
1821 					return ctx->num_processed;
1822 				}
1823 				break;
1824 			case CHUNK_IGNORED:
1825 				break;
1826 		}
1827 	}
1828 
1829 	return ctx->num_processed;
1830 }
1831 
1832 static ChunkResult
set_complete_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)1833 set_complete_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
1834 {
1835 	if (chunk_stub_is_complete(stub, scanctx->space))
1836 	{
1837 		scanctx->data = stub;
1838 #ifdef USE_ASSERT_CHECKING
1839 		return CHUNK_PROCESSED;
1840 #else
1841 		return CHUNK_DONE;
1842 #endif
1843 	}
1844 	return CHUNK_IGNORED;
1845 }
1846 
1847 typedef struct ChunkScanCtxAddChunkData
1848 {
1849 	Chunk *chunks;
1850 	uint64 max_chunks;
1851 	uint64 num_chunks;
1852 } ChunkScanCtxAddChunkData;
1853 
1854 static ChunkResult
chunk_scan_context_add_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)1855 chunk_scan_context_add_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
1856 {
1857 	ChunkScanCtxAddChunkData *data = scanctx->data;
1858 	ChunkStubScanCtx stubctx = {
1859 		.chunk = &data->chunks[data->num_chunks],
1860 		.stub = stub,
1861 	};
1862 
1863 	Assert(data->num_chunks < data->max_chunks);
1864 	chunk_create_from_stub(&stubctx);
1865 
1866 	if (stubctx.is_dropped)
1867 		return CHUNK_IGNORED;
1868 
1869 	data->num_chunks++;
1870 
1871 	return CHUNK_PROCESSED;
1872 }
1873 
1874 /* Finds the first chunk that has a complete set of constraints. There should be
1875  * only one such chunk in the scan context when scanning for the chunk that
1876  * holds a particular tuple/point. */
1877 static ChunkStub *
chunk_scan_ctx_get_chunk_stub(ChunkScanCtx * ctx)1878 chunk_scan_ctx_get_chunk_stub(ChunkScanCtx *ctx)
1879 {
1880 	ctx->data = NULL;
1881 
1882 #ifdef USE_ASSERT_CHECKING
1883 	{
1884 		int n = chunk_scan_ctx_foreach_chunk_stub(ctx, set_complete_chunk, 0);
1885 
1886 		Assert(n == 0 || n == 1);
1887 	}
1888 #else
1889 	chunk_scan_ctx_foreach_chunk_stub(ctx, set_complete_chunk, 1);
1890 #endif
1891 
1892 	return ctx->data;
1893 }
1894 
1895 /*
1896  * Resurrect a chunk from a tombstone.
1897  *
1898  * A chunk can be dropped while retaining its metadata as a tombstone. Such a
1899  * chunk is marked with dropped=true.
1900  *
1901  * This function resurrects such a dropped chunk based on the original metadata,
1902  * including recreating the table and related objects.
1903  */
1904 static Chunk *
chunk_resurrect(const Hypertable * ht,const ChunkStub * stub)1905 chunk_resurrect(const Hypertable *ht, const ChunkStub *stub)
1906 {
1907 	ScanIterator iterator;
1908 	Chunk *chunk = NULL;
1909 	int count = 0;
1910 
1911 	Assert(NULL != stub->constraints);
1912 	Assert(NULL != stub->cube);
1913 
1914 	iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
1915 	init_scan_by_chunk_id(&iterator, stub->id);
1916 
1917 	ts_scanner_foreach(&iterator)
1918 	{
1919 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
1920 		HeapTuple new_tuple;
1921 
1922 		Assert(count == 0 && chunk == NULL);
1923 		chunk = chunk_build_from_tuple_and_stub(NULL, ti, stub);
1924 		Assert(chunk->fd.dropped);
1925 
1926 		/* Create data table and related objects */
1927 		chunk->hypertable_relid = ht->main_table_relid;
1928 		chunk->relkind = hypertable_chunk_relkind(ht);
1929 		if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1930 		{
1931 			chunk->data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, ti->mctx);
1932 			/* If the Data-Node replica list information has been deleted reassign them */
1933 			if (!chunk->data_nodes)
1934 				chunk->data_nodes = chunk_assign_data_nodes(chunk, ht);
1935 		}
1936 		chunk->table_id = chunk_create_table(chunk, ht);
1937 		chunk_create_table_constraints(chunk);
1938 
1939 		/* Finally, update the chunk tuple to no longer be a tombstone */
1940 		chunk->fd.dropped = false;
1941 		new_tuple = chunk_formdata_make_tuple(&chunk->fd, ts_scan_iterator_tupledesc(&iterator));
1942 		ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
1943 		heap_freetuple(new_tuple);
1944 		count++;
1945 
1946 		/* Assume there's only one match. (We break early to avoid scanning
1947 		 * also the updated tuple.) */
1948 		break;
1949 	}
1950 
1951 	ts_scan_iterator_close(&iterator);
1952 
1953 	Assert(count == 0 || count == 1);
1954 
1955 	/* If count == 0 and chunk is NULL here, the tombstone (metadata) must
1956 	 * have been removed before we had a chance to resurrect the chunk */
1957 	return chunk;
1958 }
1959 
1960 /*
1961  * Find a chunk matching a point in a hypertable's N-dimensional hyperspace.
1962  *
1963  * This involves:
1964  *
1965  * 1) For each dimension:
1966  *	  - Find all dimension slices that match the dimension
1967  * 2) For each dimension slice:
1968  *	  - Find all chunk constraints matching the dimension slice
1969  * 3) For each matching chunk constraint
1970  *	  - Insert a chunk stub into a hash table and add the constraint to the chunk
1971  *	  - If chunk already exists in hash table, add the constraint to the chunk
1972  * 4) At the end of the scan, only one chunk in the hash table should have
1973  *	  N number of constraints. This is the matching chunk.
1974  *
1975  * NOTE: this function allocates transient data, e.g., dimension slice,
1976  * constraints and chunks, that in the end are not part of the returned
1977  * chunk. Therefore, this scan should be executed on a transient memory
1978  * context. The returned chunk needs to be copied into another memory context in
1979  * case it needs to live beyond the lifetime of the other data.
1980  */
1981 static Chunk *
chunk_find(const Hypertable * ht,const Point * p,bool resurrect,bool lock_slices)1982 chunk_find(const Hypertable *ht, const Point *p, bool resurrect, bool lock_slices)
1983 {
1984 	ChunkStub *stub;
1985 	Chunk *chunk = NULL;
1986 	ChunkScanCtx ctx;
1987 
1988 	/* The scan context will keep the state accumulated during the scan */
1989 	chunk_scan_ctx_init(&ctx, ht->space, p);
1990 
1991 	/* Abort the scan when the chunk is found */
1992 	ctx.early_abort = true;
1993 
1994 	/* Scan for the chunk matching the point */
1995 	chunk_point_scan(&ctx, p, lock_slices);
1996 
1997 	/* Find the stub that has N matching dimension constraints */
1998 	stub = chunk_scan_ctx_get_chunk_stub(&ctx);
1999 
2000 	chunk_scan_ctx_destroy(&ctx);
2001 
2002 	if (NULL != stub)
2003 	{
2004 		ChunkStubScanCtx stubctx = {
2005 			.stub = stub,
2006 		};
2007 
2008 		/* Fill in the rest of the chunk's data from the chunk table, unless
2009 		 * the chunk is marked as dropped. */
2010 		chunk = chunk_create_from_stub(&stubctx);
2011 
2012 		/* Check if the found metadata is a tombstone (dropped=true) */
2013 		if (stubctx.is_dropped)
2014 		{
2015 			Assert(chunk == NULL);
2016 
2017 			/* Resurrect the chunk if requested by the caller */
2018 			if (resurrect)
2019 				chunk = chunk_resurrect(ht, stub);
2020 		}
2021 	}
2022 
2023 	ASSERT_IS_NULL_OR_VALID_CHUNK(chunk);
2024 
2025 	return chunk;
2026 }
2027 
2028 Chunk *
ts_chunk_find(const Hypertable * ht,const Point * p,bool lock_slices)2029 ts_chunk_find(const Hypertable *ht, const Point *p, bool lock_slices)
2030 {
2031 	return chunk_find(ht, p, false, lock_slices);
2032 }
2033 
2034 /*
2035  * Find all the chunks in hyperspace that include elements (dimension slices)
2036  * calculated by given range constraints and return the corresponding
2037  * ChunkScanCxt. It is the caller's responsibility to destroy this context after
2038  * usage.
2039  */
2040 static void
chunks_find_all_in_range_limit(const Hypertable * ht,const Dimension * time_dim,StrategyNumber start_strategy,int64 start_value,StrategyNumber end_strategy,int64 end_value,int limit,uint64 * num_found,ScanTupLock * tuplock,ChunkScanCtx * ctx)2041 chunks_find_all_in_range_limit(const Hypertable *ht, const Dimension *time_dim,
2042 							   StrategyNumber start_strategy, int64 start_value,
2043 							   StrategyNumber end_strategy, int64 end_value, int limit,
2044 							   uint64 *num_found, ScanTupLock *tuplock, ChunkScanCtx *ctx)
2045 {
2046 	DimensionVec *slices;
2047 
2048 	Assert(ht != NULL);
2049 
2050 	/* must have been checked earlier that this is the case */
2051 	Assert(time_dim != NULL);
2052 
2053 	slices = ts_dimension_slice_scan_range_limit(time_dim->fd.id,
2054 												 start_strategy,
2055 												 start_value,
2056 												 end_strategy,
2057 												 end_value,
2058 												 limit,
2059 												 tuplock);
2060 
2061 	/* The scan context will keep the state accumulated during the scan */
2062 	chunk_scan_ctx_init(ctx, ht->space, NULL);
2063 
2064 	/* No abort when the first chunk is found */
2065 	ctx->early_abort = false;
2066 
2067 	/* Scan for chunks that are in range */
2068 	dimension_slice_and_chunk_constraint_join(ctx, slices);
2069 
2070 	*num_found += hash_get_num_entries(ctx->htab);
2071 }
2072 
2073 static ChunkResult
append_chunk_common(ChunkScanCtx * scanctx,ChunkStub * stub,Chunk ** chunk)2074 append_chunk_common(ChunkScanCtx *scanctx, ChunkStub *stub, Chunk **chunk)
2075 {
2076 	ChunkStubScanCtx stubctx = {
2077 		.stub = stub,
2078 	};
2079 
2080 	*chunk = NULL;
2081 
2082 	if (!chunk_stub_is_complete(stub, scanctx->space))
2083 		return CHUNK_IGNORED;
2084 
2085 	/* Fill in the rest of the chunk's data from the chunk table */
2086 	*chunk = chunk_create_from_stub(&stubctx);
2087 
2088 	if (stubctx.is_dropped)
2089 		return CHUNK_IGNORED;
2090 
2091 	Assert(OidIsValid((*chunk)->table_id));
2092 
2093 	if (scanctx->lockmode != NoLock)
2094 		LockRelationOid((*chunk)->table_id, scanctx->lockmode);
2095 
2096 	return CHUNK_PROCESSED;
2097 }
2098 
2099 static ChunkResult
append_chunk_oid(ChunkScanCtx * scanctx,ChunkStub * stub)2100 append_chunk_oid(ChunkScanCtx *scanctx, ChunkStub *stub)
2101 {
2102 	Chunk *chunk;
2103 	ChunkResult res = append_chunk_common(scanctx, stub, &chunk);
2104 
2105 	if (res == CHUNK_PROCESSED)
2106 	{
2107 		Assert(chunk != NULL);
2108 		scanctx->data = lappend_oid(scanctx->data, chunk->table_id);
2109 	}
2110 
2111 	return res;
2112 }
2113 
2114 static ChunkResult
append_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)2115 append_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
2116 {
2117 	Chunk *chunk;
2118 	ChunkResult res = append_chunk_common(scanctx, stub, &chunk);
2119 
2120 	if (res == CHUNK_PROCESSED)
2121 	{
2122 		Chunk **chunks = scanctx->data;
2123 
2124 		Assert(chunk != NULL);
2125 		Assert(scanctx->num_processed < scanctx->num_complete_chunks);
2126 
2127 		if (NULL == chunks)
2128 		{
2129 			Assert(scanctx->num_complete_chunks > 0);
2130 			scanctx->data = chunks = palloc(sizeof(Chunk *) * scanctx->num_complete_chunks);
2131 		}
2132 
2133 		chunks[scanctx->num_processed] = chunk;
2134 	}
2135 
2136 	return res;
2137 }
2138 
2139 static void *
chunk_find_all(const Hypertable * ht,const List * dimension_vecs,on_chunk_stub_func on_chunk,LOCKMODE lockmode,unsigned int * num_chunks)2140 chunk_find_all(const Hypertable *ht, const List *dimension_vecs, on_chunk_stub_func on_chunk,
2141 			   LOCKMODE lockmode, unsigned int *num_chunks)
2142 {
2143 	ChunkScanCtx ctx;
2144 	ListCell *lc;
2145 	int num_processed;
2146 
2147 	/* The scan context will keep the state accumulated during the scan */
2148 	chunk_scan_ctx_init(&ctx, ht->space, NULL);
2149 
2150 	/* Do not abort the scan when one chunk is found */
2151 	ctx.early_abort = false;
2152 	ctx.lockmode = lockmode;
2153 
2154 	/* Scan all dimensions for slices enclosing the point */
2155 	foreach (lc, dimension_vecs)
2156 	{
2157 		const DimensionVec *vec = lfirst(lc);
2158 
2159 		dimension_slice_and_chunk_constraint_join(&ctx, vec);
2160 	}
2161 
2162 	ctx.data = NULL;
2163 	num_processed = chunk_scan_ctx_foreach_chunk_stub(&ctx, on_chunk, 0);
2164 
2165 	if (NULL != num_chunks)
2166 		*num_chunks = num_processed;
2167 
2168 	chunk_scan_ctx_destroy(&ctx);
2169 
2170 	return ctx.data;
2171 }
2172 
2173 Chunk **
ts_chunk_find_all(const Hypertable * ht,const List * dimension_vecs,LOCKMODE lockmode,unsigned int * num_chunks)2174 ts_chunk_find_all(const Hypertable *ht, const List *dimension_vecs, LOCKMODE lockmode,
2175 				  unsigned int *num_chunks)
2176 {
2177 	Chunk **chunks = chunk_find_all(ht, dimension_vecs, append_chunk, lockmode, num_chunks);
2178 
2179 #ifdef USE_ASSERT_CHECKING
2180 	/* Assert that we never return dropped chunks */
2181 	int i;
2182 
2183 	for (i = 0; i < *num_chunks; i++)
2184 		ASSERT_IS_VALID_CHUNK(chunks[i]);
2185 #endif
2186 
2187 	return chunks;
2188 }
2189 
2190 List *
ts_chunk_find_all_oids(const Hypertable * ht,const List * dimension_vecs,LOCKMODE lockmode)2191 ts_chunk_find_all_oids(const Hypertable *ht, const List *dimension_vecs, LOCKMODE lockmode)
2192 {
2193 	List *chunks = chunk_find_all(ht, dimension_vecs, append_chunk_oid, lockmode, NULL);
2194 
2195 #ifdef USE_ASSERT_CHECKING
2196 	/* Assert that we never return dropped chunks */
2197 	ListCell *lc;
2198 
2199 	foreach (lc, chunks)
2200 	{
2201 		Chunk *chunk = ts_chunk_get_by_relid(lfirst_oid(lc), true);
2202 		ASSERT_IS_VALID_CHUNK(chunk);
2203 	}
2204 #endif
2205 
2206 	return chunks;
2207 }
2208 
2209 /* show_chunks SQL function handler */
2210 Datum
ts_chunk_show_chunks(PG_FUNCTION_ARGS)2211 ts_chunk_show_chunks(PG_FUNCTION_ARGS)
2212 {
2213 	/*
2214 	 * chunks_return_srf is called even when it is not the first call but only
2215 	 * after doing some computation first
2216 	 */
2217 	if (SRF_IS_FIRSTCALL())
2218 	{
2219 		FuncCallContext *funcctx;
2220 		Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
2221 		Hypertable *ht;
2222 		const Dimension *time_dim;
2223 		Cache *hcache;
2224 		int64 older_than = PG_INT64_MAX;
2225 		int64 newer_than = PG_INT64_MIN;
2226 		Oid time_type;
2227 
2228 		hcache = ts_hypertable_cache_pin();
2229 		ht = find_hypertable_from_table_or_cagg(hcache, relid);
2230 		Assert(ht != NULL);
2231 		time_dim = hyperspace_get_open_dimension(ht->space, 0);
2232 		Assert(time_dim != NULL);
2233 		time_type = ts_dimension_get_partition_type(time_dim);
2234 
2235 		if (!PG_ARGISNULL(1))
2236 			older_than = ts_time_value_from_arg(PG_GETARG_DATUM(1),
2237 												get_fn_expr_argtype(fcinfo->flinfo, 1),
2238 												time_type);
2239 
2240 		if (!PG_ARGISNULL(2))
2241 			newer_than = ts_time_value_from_arg(PG_GETARG_DATUM(2),
2242 												get_fn_expr_argtype(fcinfo->flinfo, 2),
2243 												time_type);
2244 
2245 		funcctx = SRF_FIRSTCALL_INIT();
2246 		funcctx->user_fctx = get_chunks_in_time_range(ht,
2247 													  older_than,
2248 													  newer_than,
2249 													  "show_chunks",
2250 													  funcctx->multi_call_memory_ctx,
2251 													  &funcctx->max_calls,
2252 													  NULL);
2253 		ts_cache_release(hcache);
2254 	}
2255 
2256 	return chunks_return_srf(fcinfo);
2257 }
2258 
2259 static Chunk *
get_chunks_in_time_range(Hypertable * ht,int64 older_than,int64 newer_than,const char * caller_name,MemoryContext mctx,uint64 * num_chunks_returned,ScanTupLock * tuplock)2260 get_chunks_in_time_range(Hypertable *ht, int64 older_than, int64 newer_than,
2261 						 const char *caller_name, MemoryContext mctx, uint64 *num_chunks_returned,
2262 						 ScanTupLock *tuplock)
2263 {
2264 	MemoryContext oldcontext;
2265 	ChunkScanCtx chunk_scan_ctx;
2266 	Chunk *chunks;
2267 	ChunkScanCtxAddChunkData data;
2268 	const Dimension *time_dim;
2269 	StrategyNumber start_strategy;
2270 	StrategyNumber end_strategy;
2271 	uint64 num_chunks = 0;
2272 
2273 	if (older_than <= newer_than)
2274 		ereport(ERROR,
2275 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2276 				 errmsg("invalid time range"),
2277 				 errhint("The start of the time range must be before the end.")));
2278 
2279 	if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
2280 		elog(ERROR, "invalid operation on compressed hypertable");
2281 
2282 	start_strategy = (newer_than == PG_INT64_MIN) ? InvalidStrategy : BTGreaterEqualStrategyNumber;
2283 	end_strategy = (older_than == PG_INT64_MAX) ? InvalidStrategy : BTLessStrategyNumber;
2284 	time_dim = hyperspace_get_open_dimension(ht->space, 0);
2285 
2286 	oldcontext = MemoryContextSwitchTo(mctx);
2287 	chunks_find_all_in_range_limit(ht,
2288 								   time_dim,
2289 								   start_strategy,
2290 								   newer_than,
2291 								   end_strategy,
2292 								   older_than,
2293 								   -1,
2294 								   &num_chunks,
2295 								   tuplock,
2296 								   &chunk_scan_ctx);
2297 	MemoryContextSwitchTo(oldcontext);
2298 
2299 	chunks = MemoryContextAllocZero(mctx, sizeof(Chunk) * num_chunks);
2300 	data = (ChunkScanCtxAddChunkData){
2301 		.chunks = chunks,
2302 		.max_chunks = num_chunks,
2303 		.num_chunks = 0,
2304 	};
2305 
2306 	/* Get all the chunks from the context */
2307 	chunk_scan_ctx.data = &data;
2308 	chunk_scan_ctx_foreach_chunk_stub(&chunk_scan_ctx, chunk_scan_context_add_chunk, -1);
2309 	/*
2310 	 * only affects ctx.htab Got all the chunk already so can now safely
2311 	 * destroy the context
2312 	 */
2313 	chunk_scan_ctx_destroy(&chunk_scan_ctx);
2314 
2315 	*num_chunks_returned = data.num_chunks;
2316 	qsort(chunks, *num_chunks_returned, sizeof(Chunk), chunk_cmp);
2317 
2318 #ifdef USE_ASSERT_CHECKING
2319 	do
2320 	{
2321 		uint64 i = 0;
2322 		/* Assert that we never return dropped chunks */
2323 		for (i = 0; i < *num_chunks_returned; i++)
2324 			ASSERT_IS_VALID_CHUNK(&chunks[i]);
2325 	} while (false);
2326 #endif
2327 
2328 	return chunks;
2329 }
2330 
2331 List *
ts_chunk_data_nodes_copy(const Chunk * chunk)2332 ts_chunk_data_nodes_copy(const Chunk *chunk)
2333 {
2334 	List *lcopy = NIL;
2335 	ListCell *lc;
2336 
2337 	foreach (lc, chunk->data_nodes)
2338 	{
2339 		ChunkDataNode *node = lfirst(lc);
2340 		ChunkDataNode *copy = palloc(sizeof(ChunkDataNode));
2341 
2342 		memcpy(copy, node, sizeof(ChunkDataNode));
2343 
2344 		lcopy = lappend(lcopy, copy);
2345 	}
2346 
2347 	return lcopy;
2348 }
2349 
2350 Chunk *
ts_chunk_copy(const Chunk * chunk)2351 ts_chunk_copy(const Chunk *chunk)
2352 {
2353 	Chunk *copy;
2354 
2355 	ASSERT_IS_VALID_CHUNK(chunk);
2356 	copy = palloc(sizeof(Chunk));
2357 	memcpy(copy, chunk, sizeof(Chunk));
2358 
2359 	if (NULL != chunk->constraints)
2360 		copy->constraints = ts_chunk_constraints_copy(chunk->constraints);
2361 
2362 	if (NULL != chunk->cube)
2363 		copy->cube = ts_hypercube_copy(chunk->cube);
2364 
2365 	copy->data_nodes = ts_chunk_data_nodes_copy(chunk);
2366 
2367 	return copy;
2368 }
2369 
2370 static int
chunk_scan_internal(int indexid,ScanKeyData scankey[],int nkeys,tuple_filter_func filter,tuple_found_func tuple_found,void * data,int limit,ScanDirection scandir,LOCKMODE lockmode,MemoryContext mctx)2371 chunk_scan_internal(int indexid, ScanKeyData scankey[], int nkeys, tuple_filter_func filter,
2372 					tuple_found_func tuple_found, void *data, int limit, ScanDirection scandir,
2373 					LOCKMODE lockmode, MemoryContext mctx)
2374 {
2375 	Catalog *catalog = ts_catalog_get();
2376 	ScannerCtx ctx = {
2377 		.table = catalog_get_table_id(catalog, CHUNK),
2378 		.index = catalog_get_index(catalog, CHUNK, indexid),
2379 		.nkeys = nkeys,
2380 		.data = data,
2381 		.scankey = scankey,
2382 		.filter = filter,
2383 		.tuple_found = tuple_found,
2384 		.limit = limit,
2385 		.lockmode = lockmode,
2386 		.scandirection = scandir,
2387 		.result_mctx = mctx,
2388 	};
2389 
2390 	return ts_scanner_scan(&ctx);
2391 }
2392 
2393 /*
2394  * Get a window of chunks that "precedes" the given dimensional point.
2395  *
2396  * For instance, if the dimension is "time", then given a point in time the
2397  * function returns the recent chunks that come before the chunk that includes
2398  * that point. The count parameter determines the number or slices the window
2399  * should include in the given dimension. Note, that with multi-dimensional
2400  * partitioning, there might be multiple chunks in each dimensional slice that
2401  * all precede the given point. For instance, the example below shows two
2402  * different situations that each go "back" two slices (count = 2) in the
2403  * x-dimension, but returns two vs. eight chunks due to different
2404  * partitioning.
2405  *
2406  * '_____________
2407  * '|   |   | * |
2408  * '|___|___|___|
2409  * '
2410  * '
2411  * '____ ________
2412  * '|   |   | * |
2413  * '|___|___|___|
2414  * '|   |   |   |
2415  * '|___|___|___|
2416  * '|   |   |   |
2417  * '|___|___|___|
2418  * '|   |   |   |
2419  * '|___|___|___|
2420  *
2421  * Note that the returned chunks will be allocated on the given memory
2422  * context, including the list itself. So, beware of not leaking the list if
2423  * the chunks are later cached somewhere else.
2424  */
2425 List *
ts_chunk_get_window(int32 dimension_id,int64 point,int count,MemoryContext mctx)2426 ts_chunk_get_window(int32 dimension_id, int64 point, int count, MemoryContext mctx)
2427 {
2428 	List *chunks = NIL;
2429 	DimensionVec *dimvec;
2430 	int i;
2431 
2432 	/* Scan for "count" slices that precede the point in the given dimension */
2433 	dimvec = ts_dimension_slice_scan_by_dimension_before_point(dimension_id,
2434 															   point,
2435 															   count,
2436 															   BackwardScanDirection,
2437 															   mctx);
2438 
2439 	/*
2440 	 * For each slice, join with any constraints that reference the slice.
2441 	 * There might be multiple constraints for each slice in case of
2442 	 * multi-dimensional partitioning.
2443 	 */
2444 	for (i = 0; i < dimvec->num_slices; i++)
2445 	{
2446 		DimensionSlice *slice = dimvec->slices[i];
2447 		ChunkConstraints *ccs = ts_chunk_constraints_alloc(1, mctx);
2448 		int j;
2449 
2450 		ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id, ccs, mctx);
2451 
2452 		/* For each constraint, find the corresponding chunk */
2453 		for (j = 0; j < ccs->num_constraints; j++)
2454 		{
2455 			ChunkConstraint *cc = &ccs->constraints[j];
2456 			Chunk *chunk = ts_chunk_get_by_id(cc->fd.chunk_id, false);
2457 			MemoryContext old;
2458 
2459 			/* Dropped chunks do not contain valid data and must not be returned */
2460 			if (!chunk)
2461 				continue;
2462 			chunk->constraints = ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, 1, mctx);
2463 			chunk->cube = ts_hypercube_from_constraints(chunk->constraints, mctx);
2464 
2465 			/* Allocate the list on the same memory context as the chunks */
2466 			old = MemoryContextSwitchTo(mctx);
2467 			chunks = lappend(chunks, chunk);
2468 			MemoryContextSwitchTo(old);
2469 		}
2470 	}
2471 
2472 #ifdef USE_ASSERT_CHECKING
2473 	/* Assert that we never return dropped chunks */
2474 	do
2475 	{
2476 		ListCell *lc;
2477 
2478 		foreach (lc, chunks)
2479 		{
2480 			Chunk *chunk = lfirst(lc);
2481 			ASSERT_IS_VALID_CHUNK(chunk);
2482 		}
2483 	} while (false);
2484 #endif
2485 
2486 	return chunks;
2487 }
2488 
2489 static Chunk *
chunk_scan_find(int indexid,ScanKeyData scankey[],int nkeys,MemoryContext mctx,bool fail_if_not_found,const DisplayKeyData displaykey[])2490 chunk_scan_find(int indexid, ScanKeyData scankey[], int nkeys, MemoryContext mctx,
2491 				bool fail_if_not_found, const DisplayKeyData displaykey[])
2492 {
2493 	ChunkStubScanCtx stubctx = { 0 };
2494 	Chunk *chunk;
2495 	int num_found;
2496 
2497 	num_found = chunk_scan_internal(indexid,
2498 									scankey,
2499 									nkeys,
2500 									chunk_tuple_dropped_filter,
2501 									chunk_tuple_found,
2502 									&stubctx,
2503 									1,
2504 									ForwardScanDirection,
2505 									AccessShareLock,
2506 									mctx);
2507 	Assert(num_found == 0 || (num_found == 1 && !stubctx.is_dropped));
2508 	chunk = stubctx.chunk;
2509 
2510 	switch (num_found)
2511 	{
2512 		case 0:
2513 			if (fail_if_not_found)
2514 			{
2515 				int i = 0;
2516 				StringInfo info = makeStringInfo();
2517 				while (i < nkeys)
2518 				{
2519 					appendStringInfo(info,
2520 									 "%s: %s",
2521 									 displaykey[i].name,
2522 									 displaykey[i].as_string(scankey[i].sk_argument));
2523 					if (++i < nkeys)
2524 						appendStringInfoString(info, ", ");
2525 				}
2526 				ereport(ERROR,
2527 						(errcode(ERRCODE_UNDEFINED_OBJECT),
2528 						 errmsg("chunk not found"),
2529 						 errdetail("%s", info->data)));
2530 			}
2531 			break;
2532 		case 1:
2533 			ASSERT_IS_VALID_CHUNK(chunk);
2534 			break;
2535 		default:
2536 			elog(ERROR, "expected a single chunk, found %d", num_found);
2537 	}
2538 
2539 	return chunk;
2540 }
2541 
2542 Chunk *
ts_chunk_get_by_name_with_memory_context(const char * schema_name,const char * table_name,MemoryContext mctx,bool fail_if_not_found)2543 ts_chunk_get_by_name_with_memory_context(const char *schema_name, const char *table_name,
2544 										 MemoryContext mctx, bool fail_if_not_found)
2545 {
2546 	NameData schema, table;
2547 	ScanKeyData scankey[2];
2548 	static const DisplayKeyData displaykey[2] = {
2549 		[0] = { .name = "schema_name", .as_string = DatumGetNameString },
2550 		[1] = { .name = "table_name", .as_string = DatumGetNameString },
2551 	};
2552 
2553 	/* Early check for rogue input */
2554 	if (schema_name == NULL || table_name == NULL)
2555 	{
2556 		if (fail_if_not_found)
2557 			ereport(ERROR,
2558 					(errcode(ERRCODE_UNDEFINED_OBJECT),
2559 					 errmsg("chunk not found"),
2560 					 errdetail("schema_name: %s, table_name: %s",
2561 							   schema_name ? schema_name : "<null>",
2562 							   table_name ? table_name : "<null>")));
2563 		else
2564 			return NULL;
2565 	}
2566 
2567 	namestrcpy(&schema, schema_name);
2568 	namestrcpy(&table, table_name);
2569 
2570 	/*
2571 	 * Perform an index scan on chunk name.
2572 	 */
2573 	ScanKeyInit(&scankey[0],
2574 				Anum_chunk_schema_name_idx_schema_name,
2575 				BTEqualStrategyNumber,
2576 				F_NAMEEQ,
2577 				NameGetDatum(&schema));
2578 	ScanKeyInit(&scankey[1],
2579 				Anum_chunk_schema_name_idx_table_name,
2580 				BTEqualStrategyNumber,
2581 				F_NAMEEQ,
2582 				NameGetDatum(&table));
2583 
2584 	return chunk_scan_find(CHUNK_SCHEMA_NAME_INDEX,
2585 						   scankey,
2586 						   2,
2587 						   mctx,
2588 						   fail_if_not_found,
2589 						   displaykey);
2590 }
2591 
2592 Chunk *
ts_chunk_get_by_relid(Oid relid,bool fail_if_not_found)2593 ts_chunk_get_by_relid(Oid relid, bool fail_if_not_found)
2594 {
2595 	char *schema;
2596 	char *table;
2597 
2598 	if (!OidIsValid(relid))
2599 	{
2600 		if (fail_if_not_found)
2601 			ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("invalid Oid")));
2602 		else
2603 			return NULL;
2604 	}
2605 
2606 	schema = get_namespace_name(get_rel_namespace(relid));
2607 	table = get_rel_name(relid);
2608 	return chunk_get_by_name(schema, table, fail_if_not_found);
2609 }
2610 
2611 static const char *
DatumGetInt32AsString(Datum datum)2612 DatumGetInt32AsString(Datum datum)
2613 {
2614 	char *buf = (char *) palloc(12); /* sign, 10 digits, '\0' */
2615 	pg_ltoa(DatumGetInt32(datum), buf);
2616 	return buf;
2617 }
2618 
2619 Chunk *
ts_chunk_get_by_id(int32 id,bool fail_if_not_found)2620 ts_chunk_get_by_id(int32 id, bool fail_if_not_found)
2621 {
2622 	ScanKeyData scankey[1];
2623 	static const DisplayKeyData displaykey[1] = {
2624 		[0] = { .name = "id", .as_string = DatumGetInt32AsString },
2625 	};
2626 
2627 	/*
2628 	 * Perform an index scan on chunk id.
2629 	 */
2630 	ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(id));
2631 
2632 	return chunk_scan_find(CHUNK_ID_INDEX,
2633 						   scankey,
2634 						   1,
2635 						   CurrentMemoryContext,
2636 						   fail_if_not_found,
2637 						   displaykey);
2638 }
2639 
2640 /*
2641  * Number of chunks created after given chunk.
2642  * If chunk2.id > chunk1.id then chunk2 is created after chunk1
2643  */
2644 int
ts_chunk_num_of_chunks_created_after(const Chunk * chunk)2645 ts_chunk_num_of_chunks_created_after(const Chunk *chunk)
2646 {
2647 	ScanKeyData scankey[1];
2648 
2649 	/*
2650 	 * Try to find chunks with a greater Id then a given chunk
2651 	 */
2652 	ScanKeyInit(&scankey[0],
2653 				Anum_chunk_idx_id,
2654 				BTGreaterStrategyNumber,
2655 				F_INT4GT,
2656 				Int32GetDatum(chunk->fd.id));
2657 
2658 	return chunk_scan_internal(CHUNK_ID_INDEX,
2659 							   scankey,
2660 							   1,
2661 							   NULL,
2662 							   NULL,
2663 							   NULL,
2664 							   0,
2665 							   ForwardScanDirection,
2666 							   AccessShareLock,
2667 							   CurrentMemoryContext);
2668 }
2669 
2670 /*
2671  * Simple scans provide lightweight ways to access chunk information without the
2672  * overhead of getting a full chunk (i.e., no extra metadata, like constraints,
2673  * are joined in). This function forms the basis of a number of lookup functions
2674  * that, e.g., translates a chunk relid to a chunk_id, or vice versa.
2675  */
2676 static bool
chunk_simple_scan(ScanIterator * iterator,FormData_chunk * form,bool missing_ok,const DisplayKeyData displaykey[])2677 chunk_simple_scan(ScanIterator *iterator, FormData_chunk *form, bool missing_ok,
2678 				  const DisplayKeyData displaykey[])
2679 {
2680 	int count = 0;
2681 
2682 	ts_scanner_foreach(iterator)
2683 	{
2684 		TupleInfo *ti = ts_scan_iterator_tuple_info(iterator);
2685 		chunk_formdata_fill(form, ti);
2686 
2687 		if (!form->dropped)
2688 			count++;
2689 	}
2690 
2691 	Assert(count == 0 || count == 1);
2692 
2693 	if (count == 0 && !missing_ok)
2694 	{
2695 		int i = 0;
2696 		StringInfo info = makeStringInfo();
2697 		while (i < iterator->ctx.nkeys)
2698 		{
2699 			appendStringInfo(info,
2700 							 "%s: %s",
2701 							 displaykey[i].name,
2702 							 displaykey[i].as_string(iterator->ctx.scankey[i].sk_argument));
2703 			if (++i < iterator->ctx.nkeys)
2704 				appendStringInfoString(info, ", ");
2705 		}
2706 		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("chunk not found")));
2707 	}
2708 
2709 	return count == 1;
2710 }
2711 
2712 static bool
chunk_simple_scan_by_name(const char * schema,const char * table,FormData_chunk * form,bool missing_ok)2713 chunk_simple_scan_by_name(const char *schema, const char *table, FormData_chunk *form,
2714 						  bool missing_ok)
2715 {
2716 	ScanIterator iterator;
2717 	static const DisplayKeyData displaykey[] = {
2718 		[0] = { .name = "schema_name", .as_string = DatumGetNameString },
2719 		[1] = { .name = "table_name", .as_string = DatumGetNameString },
2720 	};
2721 
2722 	if (schema == NULL || table == NULL)
2723 		return false;
2724 
2725 	iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
2726 	init_scan_by_qualified_table_name(&iterator, schema, table);
2727 
2728 	return chunk_simple_scan(&iterator, form, missing_ok, displaykey);
2729 }
2730 
2731 static bool
chunk_simple_scan_by_relid(Oid relid,FormData_chunk * form,bool missing_ok)2732 chunk_simple_scan_by_relid(Oid relid, FormData_chunk *form, bool missing_ok)
2733 {
2734 	bool found = false;
2735 
2736 	if (OidIsValid(relid))
2737 	{
2738 		const char *table = get_rel_name(relid);
2739 
2740 		if (table != NULL)
2741 		{
2742 			Oid nspid = get_rel_namespace(relid);
2743 			const char *schema = get_namespace_name(nspid);
2744 
2745 			found = chunk_simple_scan_by_name(schema, table, form, missing_ok);
2746 		}
2747 	}
2748 
2749 	if (!found && !missing_ok)
2750 		ereport(ERROR,
2751 				(errcode(ERRCODE_UNDEFINED_OBJECT),
2752 				 errmsg("chunk with relid %u not found", relid)));
2753 
2754 	return found;
2755 }
2756 
2757 static bool
chunk_simple_scan_by_id(int32 chunk_id,FormData_chunk * form,bool missing_ok)2758 chunk_simple_scan_by_id(int32 chunk_id, FormData_chunk *form, bool missing_ok)
2759 {
2760 	ScanIterator iterator;
2761 	static const DisplayKeyData displaykey[] = {
2762 		[0] = { .name = "id", .as_string = DatumGetInt32AsString },
2763 	};
2764 
2765 	iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
2766 	init_scan_by_chunk_id(&iterator, chunk_id);
2767 
2768 	return chunk_simple_scan(&iterator, form, missing_ok, displaykey);
2769 }
2770 
2771 /*
2772  * Lookup a Chunk ID from a chunk's relid.
2773  */
2774 Datum
ts_chunk_id_from_relid(PG_FUNCTION_ARGS)2775 ts_chunk_id_from_relid(PG_FUNCTION_ARGS)
2776 {
2777 	static Oid last_relid = InvalidOid;
2778 	static int32 last_id = 0;
2779 	Oid relid = PG_GETARG_OID(0);
2780 	FormData_chunk form;
2781 
2782 	if (last_relid == relid)
2783 		return last_id;
2784 
2785 	chunk_simple_scan_by_relid(relid, &form, false);
2786 
2787 	last_relid = relid;
2788 	last_id = form.id;
2789 
2790 	PG_RETURN_INT32(last_id);
2791 }
2792 
2793 bool
ts_chunk_exists_relid(Oid relid)2794 ts_chunk_exists_relid(Oid relid)
2795 {
2796 	FormData_chunk form;
2797 
2798 	return chunk_simple_scan_by_relid(relid, &form, true);
2799 }
2800 
2801 /*
2802  * Get the relid of a chunk given its ID.
2803  */
2804 Oid
ts_chunk_get_relid(int32 chunk_id,bool missing_ok)2805 ts_chunk_get_relid(int32 chunk_id, bool missing_ok)
2806 {
2807 	FormData_chunk form = { 0 };
2808 	Oid relid = InvalidOid;
2809 
2810 	if (chunk_simple_scan_by_id(chunk_id, &form, missing_ok))
2811 	{
2812 		Oid schemaid = get_namespace_oid(NameStr(form.schema_name), missing_ok);
2813 
2814 		if (OidIsValid(schemaid))
2815 			relid = get_relname_relid(NameStr(form.table_name), schemaid);
2816 	}
2817 
2818 	if (!OidIsValid(relid) && !missing_ok)
2819 		ereport(ERROR,
2820 				(errcode(ERRCODE_UNDEFINED_SCHEMA),
2821 				 errmsg("chunk with id %d not found", chunk_id)));
2822 
2823 	return relid;
2824 }
2825 
2826 /*
2827  * Get the schema (namespace) of a chunk given its ID.
2828  *
2829  * This is a lightweight way to get the schema of a chunk without creating a
2830  * full Chunk object that joins in constraints, etc.
2831  */
2832 Oid
ts_chunk_get_schema_id(int32 chunk_id,bool missing_ok)2833 ts_chunk_get_schema_id(int32 chunk_id, bool missing_ok)
2834 {
2835 	FormData_chunk form = { 0 };
2836 
2837 	if (!chunk_simple_scan_by_id(chunk_id, &form, missing_ok))
2838 		return InvalidOid;
2839 
2840 	return get_namespace_oid(NameStr(form.schema_name), missing_ok);
2841 }
2842 
2843 bool
ts_chunk_get_id(const char * schema,const char * table,int32 * chunk_id,bool missing_ok)2844 ts_chunk_get_id(const char *schema, const char *table, int32 *chunk_id, bool missing_ok)
2845 {
2846 	FormData_chunk form = { 0 };
2847 
2848 	if (!chunk_simple_scan_by_name(schema, table, &form, missing_ok))
2849 		return false;
2850 
2851 	if (NULL != chunk_id)
2852 		*chunk_id = form.id;
2853 
2854 	return true;
2855 }
2856 
2857 /*
2858  * Results of deleting a chunk.
2859  *
2860  * A chunk can be deleted in two ways: (1) full delete of data and metadata,
2861  * (2) delete data but preserve metadata (marked with dropped=true). The
2862  * deletion mode (preserve or not) combined with the current state of the
2863  * "dropped" flag on a chunk metadata row leads to a cross-product resulting
2864  * in the following outcomes:
2865  */
2866 typedef enum ChunkDeleteResult
2867 {
2868 	/* Deleted a live chunk */
2869 	CHUNK_DELETED,
2870 	/* Deleted a chunk previously marked "dropped" */
2871 	CHUNK_DELETED_DROPPED,
2872 	/* Marked a chunk as dropped instead of deleting */
2873 	CHUNK_MARKED_DROPPED,
2874 	/* Tried to mark a chunk as dropped when it was already marked */
2875 	CHUNK_ALREADY_MARKED_DROPPED,
2876 } ChunkDeleteResult;
2877 
2878 /* Delete the chunk tuple.
2879  *
2880  * preserve_chunk_catalog_row - instead of deleting the row, mark it as dropped.
2881  * this is used when we need to preserve catalog information about the chunk
2882  * after dropping it. Currently only used when preserving continuous aggregates
2883  * on the chunk after the raw data was dropped. Otherwise, we'd have dangling
2884  * chunk ids left over in the materialization table. Preserve the space dimension
2885  * info about these chunks too.
2886  *
2887  * When chunk rows are preserved, the rows need to be updated to set the
2888  * 'dropped' flag to TRUE. But since this produces a new tuple into the
2889  * metadata table we will process also the new tuple in the same loop, which
2890  * is not only inefficent but could also lead to bugs. For now, we just ignore
2891  * those tuples (the CHUNK_ALREADY_MARKED_DROPPED case), but ideally we
2892  * shouldn't scan the updated tuples at all since it means double the number
2893  * of tuples to process.
2894  */
2895 static ChunkDeleteResult
chunk_tuple_delete(TupleInfo * ti,DropBehavior behavior,bool preserve_chunk_catalog_row)2896 chunk_tuple_delete(TupleInfo *ti, DropBehavior behavior, bool preserve_chunk_catalog_row)
2897 {
2898 	FormData_chunk form;
2899 	CatalogSecurityContext sec_ctx;
2900 	ChunkConstraints *ccs = ts_chunk_constraints_alloc(2, ti->mctx);
2901 	ChunkDeleteResult res;
2902 	int i;
2903 
2904 	chunk_formdata_fill(&form, ti);
2905 
2906 	if (preserve_chunk_catalog_row && form.dropped)
2907 		return CHUNK_ALREADY_MARKED_DROPPED;
2908 
2909 	/* if only marking as deleted, keep the constraints and dimension info */
2910 	if (!preserve_chunk_catalog_row)
2911 	{
2912 		ts_chunk_constraint_delete_by_chunk_id(form.id, ccs);
2913 
2914 		/* Check for dimension slices that are orphaned by the chunk deletion */
2915 		for (i = 0; i < ccs->num_constraints; i++)
2916 		{
2917 			ChunkConstraint *cc = &ccs->constraints[i];
2918 
2919 			/*
2920 			 * Delete the dimension slice if there are no remaining constraints
2921 			 * referencing it
2922 			 */
2923 			if (is_dimension_constraint(cc))
2924 			{
2925 				/*
2926 				 * Dimension slices are shared between chunk constraints and
2927 				 * subsequently between chunks as well. Since different chunks
2928 				 * can reference the same dimension slice (through the chunk
2929 				 * constraint), we must lock the dimension slice in FOR UPDATE
2930 				 * mode *prior* to scanning the chunk constraints table. If we
2931 				 * do not do that, we can have the following scenario:
2932 				 *
2933 				 * - T1: Prepares to create a chunk that uses an existing dimension slice X
2934 				 * - T2: Deletes a chunk and dimension slice X because it is not
2935 				 *   references by a chunk constraint.
2936 				 * - T1: Adds a chunk constraint referencing dimension
2937 				 *   slice X (which is about to be deleted by T2).
2938 				 */
2939 				ScanTupLock tuplock = {
2940 					.lockmode = LockTupleExclusive,
2941 					.waitpolicy = LockWaitBlock,
2942 				};
2943 				DimensionSlice *slice =
2944 					ts_dimension_slice_scan_by_id_and_lock(cc->fd.dimension_slice_id,
2945 														   &tuplock,
2946 														   CurrentMemoryContext);
2947 				/* If the slice is not found in the scan above, the table is
2948 				 * broken so we do not delete the slice. We proceed
2949 				 * anyway since users need to be able to drop broken tables or
2950 				 * remove broken chunks. */
2951 				if (!slice)
2952 				{
2953 					const Hypertable *const ht = ts_hypertable_get_by_id(form.hypertable_id);
2954 					ereport(WARNING,
2955 							(errmsg("unexpected state for chunk %s.%s, dropping anyway",
2956 									quote_identifier(NameStr(form.schema_name)),
2957 									quote_identifier(NameStr(form.table_name))),
2958 							 errdetail("The integrity of hypertable %s.%s might be "
2959 									   "compromised "
2960 									   "since one of its chunks lacked a dimension slice.",
2961 									   quote_identifier(NameStr(ht->fd.schema_name)),
2962 									   quote_identifier(NameStr(ht->fd.table_name)))));
2963 				}
2964 				else if (ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id,
2965 																		NULL,
2966 																		CurrentMemoryContext) == 0)
2967 					ts_dimension_slice_delete_by_id(cc->fd.dimension_slice_id, false);
2968 			}
2969 		}
2970 	}
2971 
2972 	ts_chunk_index_delete_by_chunk_id(form.id, true);
2973 	ts_compression_chunk_size_delete(form.id);
2974 	ts_chunk_data_node_delete_by_chunk_id(form.id);
2975 
2976 	/* Delete any row in bgw_policy_chunk-stats corresponding to this chunk */
2977 	ts_bgw_policy_chunk_stats_delete_by_chunk_id(form.id);
2978 
2979 	if (form.compressed_chunk_id != INVALID_CHUNK_ID)
2980 	{
2981 		Chunk *compressed_chunk = ts_chunk_get_by_id(form.compressed_chunk_id, false);
2982 
2983 		/* The chunk may have been delete by a CASCADE */
2984 		if (compressed_chunk != NULL)
2985 			/* Plain drop without preserving catalog row because this is the compressed
2986 			 * chunk */
2987 			ts_chunk_drop(compressed_chunk, behavior, DEBUG1);
2988 	}
2989 
2990 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
2991 
2992 	if (!preserve_chunk_catalog_row)
2993 	{
2994 		ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
2995 
2996 		if (form.dropped)
2997 			res = CHUNK_DELETED_DROPPED;
2998 		else
2999 			res = CHUNK_DELETED;
3000 	}
3001 	else
3002 	{
3003 		HeapTuple new_tuple;
3004 
3005 		Assert(!form.dropped);
3006 
3007 		form.compressed_chunk_id = INVALID_CHUNK_ID;
3008 		form.dropped = true;
3009 		new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3010 		ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3011 		heap_freetuple(new_tuple);
3012 		res = CHUNK_MARKED_DROPPED;
3013 	}
3014 
3015 	ts_catalog_restore_user(&sec_ctx);
3016 
3017 	return res;
3018 }
3019 
3020 static void
init_scan_by_qualified_table_name(ScanIterator * iterator,const char * schema_name,const char * table_name)3021 init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
3022 								  const char *table_name)
3023 {
3024 	iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_SCHEMA_NAME_INDEX);
3025 	ts_scan_iterator_scan_key_init(iterator,
3026 								   Anum_chunk_schema_name_idx_schema_name,
3027 								   BTEqualStrategyNumber,
3028 								   F_NAMEEQ,
3029 								   CStringGetDatum(schema_name));
3030 	ts_scan_iterator_scan_key_init(iterator,
3031 								   Anum_chunk_schema_name_idx_table_name,
3032 								   BTEqualStrategyNumber,
3033 								   F_NAMEEQ,
3034 								   CStringGetDatum(table_name));
3035 }
3036 
3037 static int
chunk_delete(ScanIterator * iterator,DropBehavior behavior,bool preserve_chunk_catalog_row)3038 chunk_delete(ScanIterator *iterator, DropBehavior behavior, bool preserve_chunk_catalog_row)
3039 {
3040 	int count = 0;
3041 
3042 	ts_scanner_foreach(iterator)
3043 	{
3044 		ChunkDeleteResult res;
3045 
3046 		res = chunk_tuple_delete(ts_scan_iterator_tuple_info(iterator),
3047 								 behavior,
3048 								 preserve_chunk_catalog_row);
3049 
3050 		switch (res)
3051 		{
3052 			case CHUNK_DELETED:
3053 			case CHUNK_MARKED_DROPPED:
3054 				count++;
3055 				break;
3056 			case CHUNK_ALREADY_MARKED_DROPPED:
3057 			case CHUNK_DELETED_DROPPED:
3058 				break;
3059 		}
3060 	}
3061 
3062 	return count;
3063 }
3064 
3065 static int
ts_chunk_delete_by_name_internal(const char * schema,const char * table,DropBehavior behavior,bool preserve_chunk_catalog_row)3066 ts_chunk_delete_by_name_internal(const char *schema, const char *table, DropBehavior behavior,
3067 								 bool preserve_chunk_catalog_row)
3068 {
3069 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3070 	int count;
3071 
3072 	init_scan_by_qualified_table_name(&iterator, schema, table);
3073 	count = chunk_delete(&iterator, behavior, preserve_chunk_catalog_row);
3074 
3075 	/* (schema,table) names and (hypertable_id) are unique so should only have
3076 	 * dropped one chunk or none (if not found) */
3077 	Assert(count == 1 || count == 0);
3078 
3079 	return count;
3080 }
3081 
3082 int
ts_chunk_delete_by_name(const char * schema,const char * table,DropBehavior behavior)3083 ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior)
3084 {
3085 	return ts_chunk_delete_by_name_internal(schema, table, behavior, false);
3086 }
3087 
3088 static int
ts_chunk_delete_by_relid(Oid relid,DropBehavior behavior,bool preserve_chunk_catalog_row)3089 ts_chunk_delete_by_relid(Oid relid, DropBehavior behavior, bool preserve_chunk_catalog_row)
3090 {
3091 	if (!OidIsValid(relid))
3092 		return 0;
3093 
3094 	return ts_chunk_delete_by_name_internal(get_namespace_name(get_rel_namespace(relid)),
3095 											get_rel_name(relid),
3096 											behavior,
3097 											preserve_chunk_catalog_row);
3098 }
3099 
3100 static void
init_scan_by_hypertable_id(ScanIterator * iterator,int32 hypertable_id)3101 init_scan_by_hypertable_id(ScanIterator *iterator, int32 hypertable_id)
3102 {
3103 	iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_HYPERTABLE_ID_INDEX);
3104 	ts_scan_iterator_scan_key_init(iterator,
3105 								   Anum_chunk_hypertable_id_idx_hypertable_id,
3106 								   BTEqualStrategyNumber,
3107 								   F_INT4EQ,
3108 								   Int32GetDatum(hypertable_id));
3109 }
3110 
3111 int
ts_chunk_delete_by_hypertable_id(int32 hypertable_id)3112 ts_chunk_delete_by_hypertable_id(int32 hypertable_id)
3113 {
3114 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3115 
3116 	init_scan_by_hypertable_id(&iterator, hypertable_id);
3117 
3118 	return chunk_delete(&iterator, DROP_RESTRICT, false);
3119 }
3120 
3121 bool
ts_chunk_exists_with_compression(int32 hypertable_id)3122 ts_chunk_exists_with_compression(int32 hypertable_id)
3123 {
3124 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
3125 	bool found = false;
3126 
3127 	init_scan_by_hypertable_id(&iterator, hypertable_id);
3128 	ts_scanner_foreach(&iterator)
3129 	{
3130 		bool isnull_dropped;
3131 		bool isnull_chunk_id =
3132 			slot_attisnull(ts_scan_iterator_slot(&iterator), Anum_chunk_compressed_chunk_id);
3133 		bool dropped = DatumGetBool(
3134 			slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_dropped, &isnull_dropped));
3135 		/* dropped is not NULLABLE */
3136 		Assert(!isnull_dropped);
3137 
3138 		if (!isnull_chunk_id && !dropped)
3139 		{
3140 			found = true;
3141 			break;
3142 		}
3143 	}
3144 	ts_scan_iterator_close(&iterator);
3145 	return found;
3146 }
3147 
3148 static void
init_scan_by_compressed_chunk_id(ScanIterator * iterator,int32 compressed_chunk_id)3149 init_scan_by_compressed_chunk_id(ScanIterator *iterator, int32 compressed_chunk_id)
3150 {
3151 	iterator->ctx.index =
3152 		catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_COMPRESSED_CHUNK_ID_INDEX);
3153 	ts_scan_iterator_scan_key_init(iterator,
3154 								   Anum_chunk_compressed_chunk_id_idx_compressed_chunk_id,
3155 								   BTEqualStrategyNumber,
3156 								   F_INT4EQ,
3157 								   Int32GetDatum(compressed_chunk_id));
3158 }
3159 
3160 Chunk *
ts_chunk_get_compressed_chunk_parent(const Chunk * chunk)3161 ts_chunk_get_compressed_chunk_parent(const Chunk *chunk)
3162 {
3163 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
3164 	Oid parent_id = InvalidOid;
3165 
3166 	init_scan_by_compressed_chunk_id(&iterator, chunk->fd.id);
3167 
3168 	ts_scanner_foreach(&iterator)
3169 	{
3170 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
3171 		Datum datum;
3172 		bool isnull;
3173 
3174 		Assert(!OidIsValid(parent_id));
3175 		datum = slot_getattr(ti->slot, Anum_chunk_id, &isnull);
3176 
3177 		if (!isnull)
3178 			parent_id = DatumGetObjectId(datum);
3179 	}
3180 
3181 	if (OidIsValid(parent_id))
3182 		return ts_chunk_get_by_id(parent_id, true);
3183 
3184 	return NULL;
3185 }
3186 
3187 bool
ts_chunk_contains_compressed_data(const Chunk * chunk)3188 ts_chunk_contains_compressed_data(const Chunk *chunk)
3189 {
3190 	Chunk *parent_chunk = ts_chunk_get_compressed_chunk_parent(chunk);
3191 
3192 	return parent_chunk != NULL;
3193 }
3194 
3195 List *
ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id)3196 ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id)
3197 {
3198 	List *chunkids = NIL;
3199 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3200 
3201 	init_scan_by_hypertable_id(&iterator, hypertable_id);
3202 	ts_scanner_foreach(&iterator)
3203 	{
3204 		bool isnull;
3205 		Datum id = slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_id, &isnull);
3206 		if (!isnull)
3207 			chunkids = lappend_int(chunkids, DatumGetInt32(id));
3208 	}
3209 
3210 	return chunkids;
3211 }
3212 
3213 static ChunkResult
chunk_recreate_constraint(ChunkScanCtx * ctx,ChunkStub * stub)3214 chunk_recreate_constraint(ChunkScanCtx *ctx, ChunkStub *stub)
3215 {
3216 	ChunkConstraints *ccs = stub->constraints;
3217 	ChunkStubScanCtx stubctx = {
3218 		.stub = stub,
3219 	};
3220 	Chunk *chunk;
3221 	int i;
3222 
3223 	chunk = chunk_create_from_stub(&stubctx);
3224 
3225 	if (stubctx.is_dropped)
3226 		elog(ERROR, "should not be recreating constraints on dropped chunks");
3227 
3228 	for (i = 0; i < ccs->num_constraints; i++)
3229 		ts_chunk_constraint_recreate(&ccs->constraints[i], chunk->table_id);
3230 
3231 	return CHUNK_PROCESSED;
3232 }
3233 
3234 void
ts_chunk_recreate_all_constraints_for_dimension(Hyperspace * hs,int32 dimension_id)3235 ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_id)
3236 {
3237 	DimensionVec *slices;
3238 	ChunkScanCtx chunkctx;
3239 	int i;
3240 
3241 	slices = ts_dimension_slice_scan_by_dimension(dimension_id, 0);
3242 
3243 	if (NULL == slices)
3244 		return;
3245 
3246 	chunk_scan_ctx_init(&chunkctx, hs, NULL);
3247 
3248 	for (i = 0; i < slices->num_slices; i++)
3249 		ts_chunk_constraint_scan_by_dimension_slice(slices->slices[i],
3250 													&chunkctx,
3251 													CurrentMemoryContext);
3252 
3253 	chunk_scan_ctx_foreach_chunk_stub(&chunkctx, chunk_recreate_constraint, 0);
3254 	chunk_scan_ctx_destroy(&chunkctx);
3255 }
3256 
3257 /*
3258  * Drops all FK constraints on a given chunk.
3259  * Currently it is used only for chunks, which have been compressed and
3260  * contain no data.
3261  */
3262 void
ts_chunk_drop_fks(const Chunk * const chunk)3263 ts_chunk_drop_fks(const Chunk *const chunk)
3264 {
3265 	Relation rel;
3266 	List *fks;
3267 	ListCell *lc;
3268 
3269 	ASSERT_IS_VALID_CHUNK(chunk);
3270 
3271 	rel = table_open(chunk->table_id, AccessShareLock);
3272 	fks = copyObject(RelationGetFKeyList(rel));
3273 	table_close(rel, AccessShareLock);
3274 
3275 	foreach (lc, fks)
3276 	{
3277 		const ForeignKeyCacheInfo *const fk = lfirst_node(ForeignKeyCacheInfo, lc);
3278 		ts_chunk_constraint_delete_by_constraint_name(chunk->fd.id,
3279 													  get_constraint_name(fk->conoid),
3280 													  true,
3281 													  true);
3282 	}
3283 }
3284 
3285 /*
3286  * Recreates all FK constraints on a chunk by using the constraints on the parent hypertable
3287  * as a template. Currently it is used only during chunk decompression, since FK constraints
3288  * are dropped during compression.
3289  */
3290 void
ts_chunk_create_fks(const Chunk * const chunk)3291 ts_chunk_create_fks(const Chunk *const chunk)
3292 {
3293 	Relation rel;
3294 	List *fks;
3295 	ListCell *lc;
3296 
3297 	ASSERT_IS_VALID_CHUNK(chunk);
3298 
3299 	rel = table_open(chunk->hypertable_relid, AccessShareLock);
3300 	fks = copyObject(RelationGetFKeyList(rel));
3301 	table_close(rel, AccessShareLock);
3302 	foreach (lc, fks)
3303 	{
3304 		ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc);
3305 		ts_chunk_constraint_create_on_chunk(chunk, fk->conoid);
3306 	}
3307 }
3308 
3309 static ScanTupleResult
chunk_tuple_update_schema_and_table(TupleInfo * ti,void * data)3310 chunk_tuple_update_schema_and_table(TupleInfo *ti, void *data)
3311 {
3312 	FormData_chunk form;
3313 	FormData_chunk *update = data;
3314 	CatalogSecurityContext sec_ctx;
3315 	HeapTuple new_tuple;
3316 
3317 	chunk_formdata_fill(&form, ti);
3318 
3319 	namestrcpy(&form.schema_name, NameStr(update->schema_name));
3320 	namestrcpy(&form.table_name, NameStr(update->table_name));
3321 
3322 	new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3323 
3324 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3325 	ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3326 	ts_catalog_restore_user(&sec_ctx);
3327 	heap_freetuple(new_tuple);
3328 	return SCAN_DONE;
3329 }
3330 
3331 static ScanTupleResult
chunk_tuple_update_status(TupleInfo * ti,void * data)3332 chunk_tuple_update_status(TupleInfo *ti, void *data)
3333 {
3334 	FormData_chunk form;
3335 	FormData_chunk *update = data;
3336 	CatalogSecurityContext sec_ctx;
3337 	HeapTuple new_tuple;
3338 
3339 	chunk_formdata_fill(&form, ti);
3340 	form.status = update->status;
3341 	new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3342 
3343 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3344 	ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3345 	ts_catalog_restore_user(&sec_ctx);
3346 	heap_freetuple(new_tuple);
3347 	return SCAN_DONE;
3348 }
3349 
3350 static bool
chunk_update_form(FormData_chunk * form)3351 chunk_update_form(FormData_chunk *form)
3352 {
3353 	ScanKeyData scankey[1];
3354 
3355 	ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, form->id);
3356 
3357 	return chunk_scan_internal(CHUNK_ID_INDEX,
3358 							   scankey,
3359 							   1,
3360 							   NULL,
3361 							   chunk_tuple_update_schema_and_table,
3362 							   form,
3363 							   0,
3364 							   ForwardScanDirection,
3365 							   RowExclusiveLock,
3366 							   CurrentMemoryContext) > 0;
3367 }
3368 
3369 /* update the status flag for chunk. Should not be called directly
3370  * Use chunk_update_status instead
3371  */
3372 static bool
chunk_update_status_internal(FormData_chunk * form)3373 chunk_update_status_internal(FormData_chunk *form)
3374 {
3375 	ScanKeyData scankey[1];
3376 
3377 	ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, form->id);
3378 
3379 	return chunk_scan_internal(CHUNK_ID_INDEX,
3380 							   scankey,
3381 							   1,
3382 							   NULL,
3383 							   chunk_tuple_update_status,
3384 							   form,
3385 							   0,
3386 							   ForwardScanDirection,
3387 							   RowExclusiveLock,
3388 							   CurrentMemoryContext) > 0;
3389 }
3390 
3391 /* status update is done in 2 steps.
3392  * Do the equivalent of SELECT for UPDATE, followed by UPDATE
3393  * 1. RowShare lock to read the status.
3394  * 2. if status != proposed new status
3395  *      update status using RowExclusiveLock
3396  * All callers who want to update chunk status should call this function so that locks
3397  * are acquired correctly.
3398  */
3399 static bool
chunk_update_status(FormData_chunk * form)3400 chunk_update_status(FormData_chunk *form)
3401 {
3402 	int32 chunk_id = form->id;
3403 	int32 new_status = form->status;
3404 	bool success = true, dropped = false;
3405 	/* lock the chunk tuple for update. Block till we get exclusivetuplelock */
3406 	ScanTupLock scantuplock = {
3407 		.waitpolicy = LockWaitBlock,
3408 		.lockmode = LockTupleExclusive,
3409 	};
3410 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowShareLock, CurrentMemoryContext);
3411 	iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
3412 	iterator.ctx.tuplock = &scantuplock;
3413 
3414 	/* see table_tuple_lock for details about flags that are set in TupleExclusive mode */
3415 	scantuplock.lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
3416 	if (!IsolationUsesXactSnapshot())
3417 	{
3418 		/* in read committed mode, we follow all updates to this tuple */
3419 		scantuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
3420 	}
3421 
3422 	ts_scan_iterator_scan_key_init(&iterator,
3423 								   Anum_chunk_idx_id,
3424 								   BTEqualStrategyNumber,
3425 								   F_INT4EQ,
3426 								   Int32GetDatum(chunk_id));
3427 
3428 	ts_scanner_foreach(&iterator)
3429 	{
3430 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
3431 		bool dropped_isnull, status_isnull;
3432 		int status;
3433 
3434 		dropped = DatumGetBool(slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull));
3435 		Assert(!dropped_isnull);
3436 		status = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_status, &status_isnull));
3437 		Assert(!status_isnull);
3438 		if (!dropped && status != new_status)
3439 		{
3440 			success = chunk_update_status_internal(form); // get RowExclusiveLock and update here
3441 		}
3442 	}
3443 	ts_scan_iterator_close(&iterator);
3444 	if (dropped)
3445 		ereport(ERROR,
3446 				(errcode(ERRCODE_INTERNAL_ERROR),
3447 				 errmsg("attempt to update status(%d) on dropped chunk %d", new_status, chunk_id)));
3448 	return success;
3449 }
3450 
3451 bool
ts_chunk_set_name(Chunk * chunk,const char * newname)3452 ts_chunk_set_name(Chunk *chunk, const char *newname)
3453 {
3454 	namestrcpy(&chunk->fd.table_name, newname);
3455 
3456 	return chunk_update_form(&chunk->fd);
3457 }
3458 
3459 bool
ts_chunk_set_schema(Chunk * chunk,const char * newschema)3460 ts_chunk_set_schema(Chunk *chunk, const char *newschema)
3461 {
3462 	namestrcpy(&chunk->fd.schema_name, newschema);
3463 
3464 	return chunk_update_form(&chunk->fd);
3465 }
3466 
3467 bool
ts_chunk_set_unordered(Chunk * chunk)3468 ts_chunk_set_unordered(Chunk *chunk)
3469 {
3470 	return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_UNORDERED);
3471 }
3472 
3473 bool
ts_chunk_add_status(Chunk * chunk,int32 status)3474 ts_chunk_add_status(Chunk *chunk, int32 status)
3475 {
3476 	return ts_chunk_set_status(chunk, ts_set_flags_32(chunk->fd.status, status));
3477 }
3478 
3479 bool
ts_chunk_set_status(Chunk * chunk,int32 status)3480 ts_chunk_set_status(Chunk *chunk, int32 status)
3481 {
3482 	chunk->fd.status = status;
3483 
3484 	return chunk_update_status(&chunk->fd);
3485 }
3486 
3487 /*
3488  * Setting (INVALID_CHUNK_ID, true) is valid for an Access Node. It means
3489  * the data nodes contain the actual compressed chunks, and the meta-chunk is
3490  * marked as compressed in the Access Node.
3491  * Setting (is_compressed => false) means that the chunk is uncompressed.
3492  */
3493 static ScanTupleResult
chunk_change_compressed_status_in_tuple(TupleInfo * ti,int32 compressed_chunk_id,bool is_compressed)3494 chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id,
3495 										bool is_compressed)
3496 {
3497 	FormData_chunk form;
3498 	HeapTuple new_tuple;
3499 	CatalogSecurityContext sec_ctx;
3500 
3501 	chunk_formdata_fill(&form, ti);
3502 	if (is_compressed)
3503 	{
3504 		form.compressed_chunk_id = compressed_chunk_id;
3505 		form.status = ts_set_flags_32(form.status, CHUNK_STATUS_COMPRESSED);
3506 	}
3507 	else
3508 	{
3509 		form.compressed_chunk_id = INVALID_CHUNK_ID;
3510 		form.status =
3511 			ts_clear_flags_32(form.status,
3512 							  CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED);
3513 	}
3514 	new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3515 
3516 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3517 	ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3518 	ts_catalog_restore_user(&sec_ctx);
3519 	heap_freetuple(new_tuple);
3520 
3521 	return SCAN_DONE;
3522 }
3523 
3524 static ScanTupleResult
chunk_clear_compressed_status_in_tuple(TupleInfo * ti,void * data)3525 chunk_clear_compressed_status_in_tuple(TupleInfo *ti, void *data)
3526 {
3527 	return chunk_change_compressed_status_in_tuple(ti, INVALID_CHUNK_ID, false);
3528 }
3529 
3530 static ScanTupleResult
chunk_set_compressed_id_in_tuple(TupleInfo * ti,void * data)3531 chunk_set_compressed_id_in_tuple(TupleInfo *ti, void *data)
3532 {
3533 	int32 compressed_chunk_id = *((int32 *) data);
3534 
3535 	return chunk_change_compressed_status_in_tuple(ti, compressed_chunk_id, true);
3536 }
3537 
3538 /*Assume permissions are already checked */
3539 bool
ts_chunk_set_compressed_chunk(Chunk * chunk,int32 compressed_chunk_id)3540 ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id)
3541 {
3542 	ScanKeyData scankey[1];
3543 	ScanKeyInit(&scankey[0],
3544 				Anum_chunk_idx_id,
3545 				BTEqualStrategyNumber,
3546 				F_INT4EQ,
3547 				Int32GetDatum(chunk->fd.id));
3548 	return chunk_scan_internal(CHUNK_ID_INDEX,
3549 							   scankey,
3550 							   1,
3551 							   chunk_check_ignorearg_dropped_filter,
3552 							   chunk_set_compressed_id_in_tuple,
3553 							   &compressed_chunk_id,
3554 							   0,
3555 							   ForwardScanDirection,
3556 							   RowExclusiveLock,
3557 							   CurrentMemoryContext) > 0;
3558 }
3559 
3560 /*Assume permissions are already checked */
3561 bool
ts_chunk_clear_compressed_chunk(Chunk * chunk)3562 ts_chunk_clear_compressed_chunk(Chunk *chunk)
3563 {
3564 	int32 compressed_chunk_id = INVALID_CHUNK_ID;
3565 	ScanKeyData scankey[1];
3566 	ScanKeyInit(&scankey[0],
3567 				Anum_chunk_idx_id,
3568 				BTEqualStrategyNumber,
3569 				F_INT4EQ,
3570 				Int32GetDatum(chunk->fd.id));
3571 	return chunk_scan_internal(CHUNK_ID_INDEX,
3572 							   scankey,
3573 							   1,
3574 							   chunk_check_ignorearg_dropped_filter,
3575 							   chunk_clear_compressed_status_in_tuple,
3576 							   &compressed_chunk_id,
3577 							   0,
3578 							   ForwardScanDirection,
3579 							   RowExclusiveLock,
3580 							   CurrentMemoryContext) > 0;
3581 }
3582 
3583 /* Used as a tuple found function */
3584 static ScanTupleResult
chunk_rename_schema_name(TupleInfo * ti,void * data)3585 chunk_rename_schema_name(TupleInfo *ti, void *data)
3586 {
3587 	FormData_chunk form;
3588 	HeapTuple new_tuple;
3589 	CatalogSecurityContext sec_ctx;
3590 
3591 	chunk_formdata_fill(&form, ti);
3592 	/* Rename schema name */
3593 	namestrcpy(&form.schema_name, (char *) data);
3594 	new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3595 
3596 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3597 	ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3598 	ts_catalog_restore_user(&sec_ctx);
3599 	heap_freetuple(new_tuple);
3600 	return SCAN_CONTINUE;
3601 }
3602 
3603 /* Go through the internal chunk table and rename all matching schemas */
3604 void
ts_chunks_rename_schema_name(char * old_schema,char * new_schema)3605 ts_chunks_rename_schema_name(char *old_schema, char *new_schema)
3606 {
3607 	NameData old_schema_name;
3608 	ScanKeyData scankey[1];
3609 	Catalog *catalog = ts_catalog_get();
3610 	ScannerCtx scanctx = {
3611 		.table = catalog_get_table_id(catalog, CHUNK),
3612 		.index = catalog_get_index(catalog, CHUNK, CHUNK_SCHEMA_NAME_INDEX),
3613 		.nkeys = 1,
3614 		.scankey = scankey,
3615 		.tuple_found = chunk_rename_schema_name,
3616 		.data = new_schema,
3617 		.lockmode = RowExclusiveLock,
3618 		.scandirection = ForwardScanDirection,
3619 	};
3620 
3621 	namestrcpy(&old_schema_name, old_schema);
3622 
3623 	ScanKeyInit(&scankey[0],
3624 				Anum_chunk_schema_name_idx_schema_name,
3625 				BTEqualStrategyNumber,
3626 				F_NAMEEQ,
3627 				NameGetDatum(&old_schema_name));
3628 
3629 	ts_scanner_scan(&scanctx);
3630 }
3631 
3632 static int
chunk_cmp(const void * ch1,const void * ch2)3633 chunk_cmp(const void *ch1, const void *ch2)
3634 {
3635 	const Chunk *v1 = ((const Chunk *) ch1);
3636 	const Chunk *v2 = ((const Chunk *) ch2);
3637 
3638 	if (v1->fd.hypertable_id < v2->fd.hypertable_id)
3639 		return -1;
3640 	if (v1->fd.hypertable_id > v2->fd.hypertable_id)
3641 		return 1;
3642 	if (v1->table_id < v2->table_id)
3643 		return -1;
3644 	if (v1->table_id > v2->table_id)
3645 		return 1;
3646 	return 0;
3647 }
3648 
3649 /*
3650  * This is a helper set returning function (SRF) that takes a set returning function context
3651  * and as argument and returns oids extracted from funcctx->user_fctx (which is Chunk*
3652  * array). Note that the caller needs to be registered as a set returning function for this
3653  * to work.
3654  */
3655 static Datum
chunks_return_srf(FunctionCallInfo fcinfo)3656 chunks_return_srf(FunctionCallInfo fcinfo)
3657 {
3658 	FuncCallContext *funcctx;
3659 	uint64 call_cntr;
3660 	TupleDesc tupdesc;
3661 	Chunk *result_set;
3662 
3663 	/* stuff done only on the first call of the function */
3664 	if (SRF_IS_FIRSTCALL())
3665 	{
3666 		/* Build a tuple descriptor for our result type */
3667 		/* not quite necessary */
3668 		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_SCALAR)
3669 			ereport(ERROR,
3670 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3671 					 errmsg("function returning record called in context "
3672 							"that cannot accept type record")));
3673 	}
3674 
3675 	/* stuff done on every call of the function */
3676 	funcctx = SRF_PERCALL_SETUP();
3677 
3678 	call_cntr = funcctx->call_cntr;
3679 	result_set = (Chunk *) funcctx->user_fctx;
3680 
3681 	/* do when there is more left to send */
3682 	if (call_cntr < funcctx->max_calls)
3683 		SRF_RETURN_NEXT(funcctx, result_set[call_cntr].table_id);
3684 	else /* do when there is no more left */
3685 		SRF_RETURN_DONE(funcctx);
3686 }
3687 
3688 static void
ts_chunk_drop_internal(const Chunk * chunk,DropBehavior behavior,int32 log_level,bool preserve_catalog_row)3689 ts_chunk_drop_internal(const Chunk *chunk, DropBehavior behavior, int32 log_level,
3690 					   bool preserve_catalog_row)
3691 {
3692 	ObjectAddress objaddr = {
3693 		.classId = RelationRelationId,
3694 		.objectId = chunk->table_id,
3695 	};
3696 
3697 	if (log_level >= 0)
3698 		elog(log_level,
3699 			 "dropping chunk %s.%s",
3700 			 chunk->fd.schema_name.data,
3701 			 chunk->fd.table_name.data);
3702 
3703 	/* Remove the chunk from the chunk table */
3704 	ts_chunk_delete_by_relid(chunk->table_id, behavior, preserve_catalog_row);
3705 
3706 	/* Drop the table */
3707 	performDeletion(&objaddr, behavior, 0);
3708 }
3709 
3710 void
ts_chunk_drop(const Chunk * chunk,DropBehavior behavior,int32 log_level)3711 ts_chunk_drop(const Chunk *chunk, DropBehavior behavior, int32 log_level)
3712 {
3713 	ts_chunk_drop_internal(chunk, behavior, log_level, false);
3714 }
3715 
3716 void
ts_chunk_drop_preserve_catalog_row(const Chunk * chunk,DropBehavior behavior,int32 log_level)3717 ts_chunk_drop_preserve_catalog_row(const Chunk *chunk, DropBehavior behavior, int32 log_level)
3718 {
3719 	ts_chunk_drop_internal(chunk, behavior, log_level, true);
3720 }
3721 
3722 static void
lock_referenced_tables(Oid table_relid)3723 lock_referenced_tables(Oid table_relid)
3724 {
3725 	List *fk_relids = NIL;
3726 	ListCell *lf;
3727 	List *cachedfkeys = NIL;
3728 	Relation table_rel = table_open(table_relid, AccessShareLock);
3729 
3730 	/* this list is from the relcache and can disappear with a cache flush, so
3731 	 * no further catalog access till we save the fk relids */
3732 	cachedfkeys = RelationGetFKeyList(table_rel);
3733 	foreach (lf, cachedfkeys)
3734 	{
3735 		ForeignKeyCacheInfo *cachedfk = lfirst_node(ForeignKeyCacheInfo, lf);
3736 
3737 		/* conrelid should always be that of the table we're considering */
3738 		Assert(cachedfk->conrelid == RelationGetRelid(table_rel));
3739 		fk_relids = lappend_oid(fk_relids, cachedfk->confrelid);
3740 	}
3741 	table_close(table_rel, AccessShareLock);
3742 	foreach (lf, fk_relids)
3743 		LockRelationOid(lfirst_oid(lf), AccessExclusiveLock);
3744 }
3745 
3746 List *
ts_chunk_do_drop_chunks(Hypertable * ht,int64 older_than,int64 newer_than,int32 log_level,List ** affected_data_nodes)3747 ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int32 log_level,
3748 						List **affected_data_nodes)
3749 
3750 {
3751 	uint64 i = 0;
3752 	uint64 num_chunks = 0;
3753 	Chunk *chunks;
3754 	List *dropped_chunk_names = NIL;
3755 	const char *schema_name, *table_name;
3756 	const int32 hypertable_id = ht->fd.id;
3757 	bool has_continuous_aggs;
3758 	List *data_nodes = NIL;
3759 	const MemoryContext oldcontext = CurrentMemoryContext;
3760 	ScanTupLock tuplock = {
3761 		.waitpolicy = LockWaitBlock,
3762 		.lockmode = LockTupleExclusive,
3763 	};
3764 
3765 	ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());
3766 
3767 	/* We have a FK between hypertable H and PAR. Hypertable H has number of
3768 	 * chunks C1, C2, etc. When we execute "drop table C", PG acquires locks
3769 	 * on C and PAR. If we have a query as "select * from hypertable", this
3770 	 * acquires a lock on C and PAR as well. But the order of the locks is not
3771 	 * the same and results in deadlocks. - github issue #865 We hope to
3772 	 * alleviate the problem by acquiring a lock on PAR before executing the
3773 	 * drop table stmt. This is not fool-proof as we could have multiple
3774 	 * fkrelids and the order of lock acquisition for these could differ as
3775 	 * well. Do not unlock - let the transaction semantics take care of it. */
3776 	lock_referenced_tables(ht->main_table_relid);
3777 
3778 	switch (ts_continuous_agg_hypertable_status(hypertable_id))
3779 	{
3780 		case HypertableIsMaterialization:
3781 			has_continuous_aggs = false;
3782 			break;
3783 		case HypertableIsMaterializationAndRaw:
3784 			has_continuous_aggs = true;
3785 			break;
3786 		case HypertableIsRawTable:
3787 			has_continuous_aggs = true;
3788 			break;
3789 		default:
3790 			has_continuous_aggs = false;
3791 			break;
3792 	}
3793 
3794 	PG_TRY();
3795 	{
3796 		chunks = get_chunks_in_time_range(ht,
3797 										  older_than,
3798 										  newer_than,
3799 										  DROP_CHUNKS_FUNCNAME,
3800 										  CurrentMemoryContext,
3801 										  &num_chunks,
3802 										  &tuplock);
3803 	}
3804 	PG_CATCH();
3805 	{
3806 		ErrorData *edata;
3807 		MemoryContextSwitchTo(oldcontext);
3808 		edata = CopyErrorData();
3809 		if (edata->sqlerrcode == ERRCODE_LOCK_NOT_AVAILABLE)
3810 		{
3811 			FlushErrorState();
3812 			edata->detail = edata->message;
3813 			edata->message =
3814 				psprintf("some chunks could not be read since they are being concurrently updated");
3815 		}
3816 		ReThrowError(edata);
3817 	}
3818 	PG_END_TRY();
3819 
3820 	DEBUG_WAITPOINT("drop_chunks_chunks_found");
3821 
3822 	if (has_continuous_aggs)
3823 	{
3824 		int i;
3825 
3826 		/* Exclusively lock all chunks, and invalidate the continuous
3827 		 * aggregates in the regions covered by the chunks. We do this in two
3828 		 * steps: first lock all the chunks and then invalidate the
3829 		 * regions. Since we are going to drop the chunks, there is no point
3830 		 * in allowing inserts into them.
3831 		 *
3832 		 * Locking prevents further modification of the dropped region during
3833 		 * this transaction, which allows moving the invalidation threshold
3834 		 * without having to worry about new invalidations while
3835 		 * refreshing. */
3836 		for (i = 0; i < num_chunks; i++)
3837 		{
3838 			LockRelationOid(chunks[i].table_id, ExclusiveLock);
3839 
3840 			Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
3841 				   chunks[i].cube->slices[0]->fd.dimension_id);
3842 		}
3843 
3844 		DEBUG_WAITPOINT("drop_chunks_locked");
3845 
3846 		/* Invalidate the dropped region to indicate that it was modified.
3847 		 *
3848 		 * The invalidation will allow the refresh command on a continuous
3849 		 * aggregate to see that this region was dropped and and will
3850 		 * therefore be able to refresh accordingly.*/
3851 		for (i = 0; i < num_chunks; i++)
3852 		{
3853 			int64 start = ts_chunk_primary_dimension_start(&chunks[i]);
3854 			int64 end = ts_chunk_primary_dimension_end(&chunks[i]);
3855 
3856 			ts_cm_functions->continuous_agg_invalidate(ht,
3857 													   HypertableIsRawTable,
3858 													   ht->fd.id,
3859 													   start,
3860 													   end);
3861 		}
3862 	}
3863 
3864 	for (i = 0; i < num_chunks; i++)
3865 	{
3866 		char *chunk_name;
3867 		ListCell *lc;
3868 
3869 		ASSERT_IS_VALID_CHUNK(&chunks[i]);
3870 
3871 		/* store chunk name for output */
3872 		schema_name = quote_identifier(chunks[i].fd.schema_name.data);
3873 		table_name = quote_identifier(chunks[i].fd.table_name.data);
3874 		chunk_name = psprintf("%s.%s", schema_name, table_name);
3875 		dropped_chunk_names = lappend(dropped_chunk_names, chunk_name);
3876 
3877 		if (has_continuous_aggs)
3878 			ts_chunk_drop_preserve_catalog_row(chunks + i, DROP_RESTRICT, log_level);
3879 		else
3880 			ts_chunk_drop(chunks + i, DROP_RESTRICT, log_level);
3881 
3882 		/* Collect a list of affected data nodes so that we know which data
3883 		 * nodes we need to drop chunks on */
3884 		foreach (lc, chunks[i].data_nodes)
3885 		{
3886 			ChunkDataNode *cdn = lfirst(lc);
3887 			data_nodes = list_append_unique_oid(data_nodes, cdn->foreign_server_oid);
3888 		}
3889 	}
3890 
3891 	if (affected_data_nodes)
3892 		*affected_data_nodes = data_nodes;
3893 
3894 	DEBUG_WAITPOINT("drop_chunks_end");
3895 
3896 	return dropped_chunk_names;
3897 }
3898 
3899 /*
3900  * This is a helper set returning function (SRF) that takes a set returning function context
3901  * and as argument and returns cstrings extracted from funcctx->user_fctx (which is a List).
3902  * Note that the caller needs to be registered as a set returning function for this to work.
3903  */
3904 static Datum
list_return_srf(FunctionCallInfo fcinfo)3905 list_return_srf(FunctionCallInfo fcinfo)
3906 {
3907 	FuncCallContext *funcctx;
3908 	uint64 call_cntr;
3909 	TupleDesc tupdesc;
3910 	List *result_set;
3911 	Datum retval;
3912 
3913 	/* stuff done only on the first call of the function */
3914 	if (SRF_IS_FIRSTCALL())
3915 	{
3916 		/* Build a tuple descriptor for our result type */
3917 		/* not quite necessary */
3918 		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_SCALAR)
3919 			ereport(ERROR,
3920 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3921 					 errmsg("function returning record called in context "
3922 							"that cannot accept type record")));
3923 	}
3924 
3925 	/* stuff done on every call of the function */
3926 	funcctx = SRF_PERCALL_SETUP();
3927 
3928 	call_cntr = funcctx->call_cntr;
3929 	result_set = castNode(List, funcctx->user_fctx);
3930 
3931 	/* do when there is more left to send */
3932 	if (call_cntr < funcctx->max_calls)
3933 	{
3934 		/* store return value and increment linked list */
3935 		retval = CStringGetTextDatum(linitial(result_set));
3936 		funcctx->user_fctx = list_delete_first(result_set);
3937 		SRF_RETURN_NEXT(funcctx, retval);
3938 	}
3939 	else /* do when there is no more left */
3940 		SRF_RETURN_DONE(funcctx);
3941 }
3942 
3943 /*
3944  * Find either the hypertable or the materialized hypertable, if the relid is
3945  * a continuous aggregate, for the relid.
3946  *
3947  * Will error if relid is not a hypertable or view (or cannot be found) or if
3948  * it is a materialized hypertable.
3949  */
3950 static Hypertable *
find_hypertable_from_table_or_cagg(Cache * hcache,Oid relid)3951 find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid)
3952 {
3953 	const char *rel_name;
3954 	Hypertable *ht;
3955 
3956 	rel_name = get_rel_name(relid);
3957 
3958 	if (!rel_name)
3959 		ereport(ERROR,
3960 				(errcode(ERRCODE_UNDEFINED_TABLE),
3961 				 errmsg("invalid hypertable or continuous aggregate")));
3962 
3963 	ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);
3964 
3965 	if (ht)
3966 	{
3967 		const ContinuousAggHypertableStatus status = ts_continuous_agg_hypertable_status(ht->fd.id);
3968 		switch (status)
3969 		{
3970 			case HypertableIsMaterialization:
3971 			case HypertableIsMaterializationAndRaw:
3972 				ereport(ERROR,
3973 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3974 						 errmsg("operation not supported on materialized hypertable"),
3975 						 errhint("Try the operation on the continuous aggregate instead."),
3976 						 errdetail("Hypertable \"%s\" is a materialized hypertable.", rel_name)));
3977 				break;
3978 
3979 			default:
3980 				break;
3981 		}
3982 	}
3983 	else
3984 	{
3985 		ContinuousAgg *const cagg = ts_continuous_agg_find_by_relid(relid);
3986 
3987 		if (!cagg)
3988 			ereport(ERROR,
3989 					(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
3990 					 errmsg("\"%s\" is not a hypertable or a continuous aggregate", rel_name),
3991 					 errhint("The operation is only possible on a hypertable or continuous"
3992 							 " aggregate.")));
3993 
3994 		ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
3995 
3996 		if (!ht)
3997 			ereport(ERROR,
3998 					(errcode(ERRCODE_TS_INTERNAL_ERROR),
3999 					 errmsg("no materialized table for continuous aggregate"),
4000 					 errdetail("Continuous aggregate \"%s\" had a materialized hypertable"
4001 							   " with id %d but it was not found in the hypertable "
4002 							   "catalog.",
4003 							   rel_name,
4004 							   cagg->data.mat_hypertable_id)));
4005 	}
4006 
4007 	return ht;
4008 }
4009 
4010 Datum
ts_chunk_drop_chunks(PG_FUNCTION_ARGS)4011 ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
4012 {
4013 	MemoryContext oldcontext;
4014 	FuncCallContext *funcctx;
4015 	Hypertable *ht;
4016 	List *dc_temp = NIL;
4017 	List *dc_names = NIL;
4018 	Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
4019 	int64 older_than = PG_INT64_MAX;
4020 	int64 newer_than = PG_INT64_MIN;
4021 	bool verbose;
4022 	int elevel;
4023 	List *data_node_oids = NIL;
4024 	Cache *hcache;
4025 	const Dimension *time_dim;
4026 	Oid time_type;
4027 
4028 	TS_PREVENT_FUNC_IF_READ_ONLY();
4029 
4030 	/*
4031 	 * When past the first call of the SRF, dropping has already been completed,
4032 	 * so we just return the next chunk in the list of dropped chunks.
4033 	 */
4034 	if (!SRF_IS_FIRSTCALL())
4035 		return list_return_srf(fcinfo);
4036 
4037 	if (PG_ARGISNULL(0))
4038 		ereport(ERROR,
4039 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4040 				 errmsg("invalid hypertable or continuous aggregate"),
4041 				 errhint("Specify a hypertable or continuous aggregate.")));
4042 
4043 	if (PG_ARGISNULL(1) && PG_ARGISNULL(2))
4044 		ereport(ERROR,
4045 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4046 				 errmsg("invalid time range for dropping chunks"),
4047 				 errhint("At least one of older_than and newer_than must be provided.")));
4048 
4049 	/* Find either the hypertable or view, or error out if the relid is
4050 	 * neither.
4051 	 *
4052 	 * We should improve the printout since it can either be a proper relid
4053 	 * that does not refer to a hypertable or a continuous aggregate, or a
4054 	 * relid that does not refer to anything at all. */
4055 	hcache = ts_hypertable_cache_pin();
4056 	ht = find_hypertable_from_table_or_cagg(hcache, relid);
4057 	Assert(ht != NULL);
4058 	time_dim = hyperspace_get_open_dimension(ht->space, 0);
4059 	Assert(time_dim != NULL);
4060 	time_type = ts_dimension_get_partition_type(time_dim);
4061 
4062 	if (!PG_ARGISNULL(1))
4063 		older_than = ts_time_value_from_arg(PG_GETARG_DATUM(1),
4064 											get_fn_expr_argtype(fcinfo->flinfo, 1),
4065 											time_type);
4066 
4067 	if (!PG_ARGISNULL(2))
4068 		newer_than = ts_time_value_from_arg(PG_GETARG_DATUM(2),
4069 											get_fn_expr_argtype(fcinfo->flinfo, 2),
4070 											time_type);
4071 
4072 	verbose = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
4073 	elevel = verbose ? INFO : DEBUG2;
4074 
4075 	/* Initial multi function call setup */
4076 	funcctx = SRF_FIRSTCALL_INIT();
4077 
4078 	/* Drop chunks and store their names for return */
4079 	oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
4080 
4081 	PG_TRY();
4082 	{
4083 		dc_temp = ts_chunk_do_drop_chunks(ht, older_than, newer_than, elevel, &data_node_oids);
4084 	}
4085 	PG_CATCH();
4086 	{
4087 		/* An error is raised if there are dependent objects, but the original
4088 		 * message is not very helpful in suggesting that you should use
4089 		 * CASCADE (we don't support it), so we replace the hint with a more
4090 		 * accurate hint for our situation. */
4091 		ErrorData *edata;
4092 
4093 		MemoryContextSwitchTo(oldcontext);
4094 		edata = CopyErrorData();
4095 		FlushErrorState();
4096 
4097 		if (edata->sqlerrcode == ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST)
4098 			edata->hint = pstrdup("Use DROP ... to drop the dependent objects.");
4099 
4100 		ts_cache_release(hcache);
4101 		ReThrowError(edata);
4102 	}
4103 	PG_END_TRY();
4104 	ts_cache_release(hcache);
4105 	dc_names = list_concat(dc_names, dc_temp);
4106 
4107 	MemoryContextSwitchTo(oldcontext);
4108 
4109 	if (data_node_oids != NIL)
4110 		ts_cm_functions->func_call_on_data_nodes(fcinfo, data_node_oids);
4111 
4112 	/* store data for multi function call */
4113 	funcctx->max_calls = list_length(dc_names);
4114 	funcctx->user_fctx = dc_names;
4115 
4116 	return list_return_srf(fcinfo);
4117 }
4118 
4119 /**
4120  * This function is used to explicitly specify chunks that are being scanned. It's being
4121  * processed in the planning phase and removed from the query tree. This means that the
4122  * actual function implementation will only be executed if something went wrong during
4123  * explicit chunk exclusion.
4124  */
4125 Datum
ts_chunks_in(PG_FUNCTION_ARGS)4126 ts_chunks_in(PG_FUNCTION_ARGS)
4127 {
4128 	const char *funcname = get_func_name(FC_FN_OID(fcinfo));
4129 
4130 	ereport(ERROR,
4131 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4132 			 errmsg("illegal invocation of %s function", funcname),
4133 			 errhint("The %s function must appear in the WHERE clause and can only"
4134 					 " be combined with AND operator.",
4135 					 funcname)));
4136 	pg_unreachable();
4137 }
4138 
4139 /* Return the compression status for the chunk
4140  */
4141 ChunkCompressionStatus
ts_chunk_get_compression_status(int32 chunk_id)4142 ts_chunk_get_compression_status(int32 chunk_id)
4143 {
4144 	ChunkCompressionStatus st = CHUNK_COMPRESS_NONE;
4145 	ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
4146 	iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
4147 	ts_scan_iterator_scan_key_init(&iterator,
4148 								   Anum_chunk_idx_id,
4149 								   BTEqualStrategyNumber,
4150 								   F_INT4EQ,
4151 								   Int32GetDatum(chunk_id));
4152 
4153 	ts_scanner_foreach(&iterator)
4154 	{
4155 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
4156 		bool dropped_isnull, status_isnull;
4157 		Datum status;
4158 
4159 		bool dropped = DatumGetBool(slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull));
4160 		Assert(!dropped_isnull);
4161 
4162 		status = slot_getattr(ti->slot, Anum_chunk_status, &status_isnull);
4163 		Assert(!status_isnull);
4164 		/* Note that dropped attribute takes precedence over everything else.
4165 		 * We should not check status attribute for dropped chunks
4166 		 */
4167 		if (!dropped)
4168 		{
4169 			bool status_is_compressed =
4170 				ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
4171 			bool status_is_unordered =
4172 				ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_UNORDERED);
4173 			if (status_is_compressed)
4174 			{
4175 				if (status_is_unordered)
4176 					st = CHUNK_COMPRESS_UNORDERED;
4177 				else
4178 					st = CHUNK_COMPRESS_ORDERED;
4179 			}
4180 			else
4181 			{
4182 				Assert(!status_is_unordered);
4183 				st = CHUNK_COMPRESS_NONE;
4184 			}
4185 		}
4186 		else
4187 			st = CHUNK_DROPPED;
4188 	}
4189 	ts_scan_iterator_close(&iterator);
4190 	return st;
4191 }
4192 
4193 /*Note that only a compressed chunk can have unordered flag set */
4194 bool
ts_chunk_is_unordered(const Chunk * chunk)4195 ts_chunk_is_unordered(const Chunk *chunk)
4196 {
4197 	return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_UNORDERED);
4198 }
4199 
4200 bool
ts_chunk_is_compressed(const Chunk * chunk)4201 ts_chunk_is_compressed(const Chunk *chunk)
4202 {
4203 	return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
4204 }
4205 
4206 bool
ts_chunk_is_uncompressed_or_unordered(const Chunk * chunk)4207 ts_chunk_is_uncompressed_or_unordered(const Chunk *chunk)
4208 {
4209 	return (ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_UNORDERED) ||
4210 			!ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED));
4211 }
4212 
4213 Datum
ts_chunk_show(PG_FUNCTION_ARGS)4214 ts_chunk_show(PG_FUNCTION_ARGS)
4215 {
4216 	return ts_cm_functions->show_chunk(fcinfo);
4217 }
4218 
4219 Datum
ts_chunk_create(PG_FUNCTION_ARGS)4220 ts_chunk_create(PG_FUNCTION_ARGS)
4221 {
4222 	return ts_cm_functions->create_chunk(fcinfo);
4223 }
4224