1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <catalog/pg_class.h>
8 #include <catalog/namespace.h>
9 #include <catalog/pg_trigger.h>
10 #include <catalog/indexing.h>
11 #include <catalog/pg_inherits.h>
12 #include <catalog/toasting.h>
13 #include <commands/trigger.h>
14 #include <commands/tablecmds.h>
15 #include <commands/defrem.h>
16 #include <tcop/tcopprot.h>
17 #include <access/htup.h>
18 #include <access/htup_details.h>
19 #include <access/xact.h>
20 #include <access/reloptions.h>
21 #include <nodes/makefuncs.h>
22 #include <utils/builtins.h>
23 #include <utils/lsyscache.h>
24 #include <utils/syscache.h>
25 #include <utils/hsearch.h>
26 #include <storage/lmgr.h>
27 #include <miscadmin.h>
28 #include <funcapi.h>
29 #include <fmgr.h>
30 #include <utils/datum.h>
31 #include <catalog/pg_type.h>
32 #include <utils/acl.h>
33 #include <utils/timestamp.h>
34 #include <nodes/execnodes.h>
35 #include <executor/executor.h>
36 #include <access/tupdesc.h>
37
38 #include "export.h"
39 #include "debug_point.h"
40 #include "chunk.h"
41 #include "chunk_index.h"
42 #include "chunk_data_node.h"
43 #include "cross_module_fn.h"
44 #include "catalog.h"
45 #include "continuous_agg.h"
46 #include "cross_module_fn.h"
47 #include "dimension.h"
48 #include "dimension_slice.h"
49 #include "dimension_vector.h"
50 #include "errors.h"
51 #include "partitioning.h"
52 #include "hypertable.h"
53 #include "hypertable_data_node.h"
54 #include "hypercube.h"
55 #include "scanner.h"
56 #include "process_utility.h"
57 #include "time_utils.h"
58 #include "trigger.h"
59 #include "compat/compat.h"
60 #include "utils.h"
61 #include "hypertable_cache.h"
62 #include "cache.h"
63 #include "bgw_policy/chunk_stats.h"
64 #include "scan_iterator.h"
65 #include "compression_chunk_size.h"
66 #include "extension.h"
67
68 TS_FUNCTION_INFO_V1(ts_chunk_show_chunks);
69 TS_FUNCTION_INFO_V1(ts_chunk_drop_chunks);
70 TS_FUNCTION_INFO_V1(ts_chunks_in);
71 TS_FUNCTION_INFO_V1(ts_chunk_id_from_relid);
72 TS_FUNCTION_INFO_V1(ts_chunk_show);
73 TS_FUNCTION_INFO_V1(ts_chunk_create);
74
75 static const char *
DatumGetNameString(Datum datum)76 DatumGetNameString(Datum datum)
77 {
78 Name name = DatumGetName(datum);
79 return pstrdup(NameStr(*name));
80 }
81
82 /* Used when processing scanned chunks */
83 typedef enum ChunkResult
84 {
85 CHUNK_DONE,
86 CHUNK_IGNORED,
87 CHUNK_PROCESSED
88 } ChunkResult;
89
90 /*
91 * Context for scanning and building a chunk from a stub.
92 *
93 * If found, the chunk will be created and the chunk pointer member is set in
94 * the result. Optionally, a caller can pre-allocate the chunk member's memory,
95 * which is useful if one, e.g., wants to fill in an memory-aligned array of
96 * chunks.
97 *
98 * If the chunk is a tombstone (dropped flag set), then the Chunk will not be
99 * created and instead is_dropped will be TRUE.
100 */
101 typedef struct ChunkStubScanCtx
102 {
103 ChunkStub *stub;
104 Chunk *chunk;
105 bool is_dropped;
106 } ChunkStubScanCtx;
107
108 static bool
chunk_stub_is_valid(const ChunkStub * stub,unsigned int expected_slices)109 chunk_stub_is_valid(const ChunkStub *stub, unsigned int expected_slices)
110 {
111 return stub && stub->id > 0 && stub->constraints && expected_slices == stub->cube->num_slices &&
112 stub->cube->num_slices == stub->constraints->num_dimension_constraints;
113 }
114
115 typedef ChunkResult (*on_chunk_stub_func)(ChunkScanCtx *ctx, ChunkStub *stub);
116 static void chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *p);
117 static void chunk_scan_ctx_destroy(ChunkScanCtx *ctx);
118 static void chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube);
119 static int chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_func on_chunk,
120 uint16 limit);
121 static Datum chunks_return_srf(FunctionCallInfo fcinfo);
122 static int chunk_cmp(const void *ch1, const void *ch2);
123 static Chunk *chunk_find(const Hypertable *ht, const Point *p, bool resurrect, bool lock_slices);
124 static void init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
125 const char *table_name);
126 static Hypertable *find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid);
127 static Chunk *get_chunks_in_time_range(Hypertable *ht, int64 older_than, int64 newer_than,
128 const char *caller_name, MemoryContext mctx,
129 uint64 *num_chunks_returned, ScanTupLock *tuplock);
130
131 #define IS_VALID_CHUNK(chunk) \
132 ((chunk) && !(chunk)->fd.dropped && (chunk)->fd.id > 0 && (chunk)->fd.hypertable_id > 0 && \
133 OidIsValid((chunk)->table_id) && OidIsValid((chunk)->hypertable_relid) && \
134 (chunk)->constraints && (chunk)->cube && \
135 (chunk)->cube->num_slices == (chunk)->constraints->num_dimension_constraints && \
136 ((chunk)->relkind == RELKIND_RELATION || \
137 ((chunk)->relkind == RELKIND_FOREIGN_TABLE && (chunk)->data_nodes != NIL)))
138
139 #define ASSERT_IS_VALID_CHUNK(chunk) Assert(IS_VALID_CHUNK(chunk))
140
141 #define ASSERT_IS_NULL_OR_VALID_CHUNK(chunk) Assert(chunk == NULL || IS_VALID_CHUNK(chunk))
142
143 /*
144 * The chunk status field values are persisted in the database and must never be changed.
145 * Those values are used as flags and must always be powers of 2 to allow bitwise operations.
146 */
147 #define CHUNK_STATUS_DEFAULT 0
148 /*
149 * Setting a Data-Node chunk as CHUNK_STATUS_COMPRESSED means that the corresponding
150 * compressed_chunk_id field points to a chunk that holds the compressed data. Otherwise,
151 * the corresponding compressed_chunk_id is NULL.
152 *
153 * However, for Access-Nodes compressed_chunk_id is always NULL. CHUNK_STATUS_COMPRESSED being set
154 * means that a remote compress_chunk() operation has taken place for this distributed
155 * meta-chunk. On the other hand, if CHUNK_STATUS_COMPRESSED is cleared, then it is probable
156 * that a remote compress_chunk() has not taken place, but not certain.
157 *
158 * For the above reason, this flag should not be assumed to be consistent (when it is cleared)
159 * for Access-Nodes. When used in distributed hypertables one should take advantage of the
160 * idempotent properties of remote compress_chunk() and distributed compression policy to
161 * make progress.
162 */
163 #define CHUNK_STATUS_COMPRESSED 1
164 /*
165 * When inserting into a compressed chunk the configured compress_orderby is not retained.
166 * Any such chunks need an explicit Sort step to produce ordered output until the chunk
167 * ordering has been restored by recompress_chunk. This flag can only exist on compressed
168 * chunks.
169 */
170 #define CHUNK_STATUS_COMPRESSED_UNORDERED 2
171
172 static HeapTuple
chunk_formdata_make_tuple(const FormData_chunk * fd,TupleDesc desc)173 chunk_formdata_make_tuple(const FormData_chunk *fd, TupleDesc desc)
174 {
175 Datum values[Natts_chunk];
176 bool nulls[Natts_chunk] = { false };
177
178 memset(values, 0, sizeof(Datum) * Natts_chunk);
179
180 values[AttrNumberGetAttrOffset(Anum_chunk_id)] = Int32GetDatum(fd->id);
181 values[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)] = Int32GetDatum(fd->hypertable_id);
182 values[AttrNumberGetAttrOffset(Anum_chunk_schema_name)] = NameGetDatum(&fd->schema_name);
183 values[AttrNumberGetAttrOffset(Anum_chunk_table_name)] = NameGetDatum(&fd->table_name);
184 /*when we insert a chunk the compressed chunk id is always NULL */
185 if (fd->compressed_chunk_id == INVALID_CHUNK_ID)
186 nulls[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)] = true;
187 else
188 {
189 values[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)] =
190 Int32GetDatum(fd->compressed_chunk_id);
191 }
192 values[AttrNumberGetAttrOffset(Anum_chunk_dropped)] = BoolGetDatum(fd->dropped);
193 values[AttrNumberGetAttrOffset(Anum_chunk_status)] = Int32GetDatum(fd->status);
194
195 return heap_form_tuple(desc, values, nulls);
196 }
197
198 static void
chunk_formdata_fill(FormData_chunk * fd,const TupleInfo * ti)199 chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti)
200 {
201 bool should_free;
202 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
203 bool nulls[Natts_chunk];
204 Datum values[Natts_chunk];
205
206 memset(fd, 0, sizeof(FormData_chunk));
207 heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
208
209 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_id)]);
210 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)]);
211 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_schema_name)]);
212 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_table_name)]);
213 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_dropped)]);
214 Assert(!nulls[AttrNumberGetAttrOffset(Anum_chunk_status)]);
215
216 fd->id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_id)]);
217 fd->hypertable_id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_hypertable_id)]);
218 memcpy(&fd->schema_name,
219 DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_schema_name)]),
220 NAMEDATALEN);
221 memcpy(&fd->table_name,
222 DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_table_name)]),
223 NAMEDATALEN);
224
225 if (nulls[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)])
226 fd->compressed_chunk_id = INVALID_CHUNK_ID;
227 else
228 fd->compressed_chunk_id =
229 DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_compressed_chunk_id)]);
230
231 fd->dropped = DatumGetBool(values[AttrNumberGetAttrOffset(Anum_chunk_dropped)]);
232 fd->status = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_status)]);
233
234 if (should_free)
235 heap_freetuple(tuple);
236 }
237 int64
ts_chunk_primary_dimension_start(const Chunk * chunk)238 ts_chunk_primary_dimension_start(const Chunk *chunk)
239 {
240 return chunk->cube->slices[0]->fd.range_start;
241 }
242
243 int64
ts_chunk_primary_dimension_end(const Chunk * chunk)244 ts_chunk_primary_dimension_end(const Chunk *chunk)
245 {
246 return chunk->cube->slices[0]->fd.range_end;
247 }
248
249 static void
chunk_insert_relation(Relation rel,const Chunk * chunk)250 chunk_insert_relation(Relation rel, const Chunk *chunk)
251 {
252 HeapTuple new_tuple;
253 CatalogSecurityContext sec_ctx;
254
255 new_tuple = chunk_formdata_make_tuple(&chunk->fd, RelationGetDescr(rel));
256
257 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
258 ts_catalog_insert(rel, new_tuple);
259 ts_catalog_restore_user(&sec_ctx);
260
261 heap_freetuple(new_tuple);
262 }
263
264 void
ts_chunk_insert_lock(const Chunk * chunk,LOCKMODE lock)265 ts_chunk_insert_lock(const Chunk *chunk, LOCKMODE lock)
266 {
267 Catalog *catalog = ts_catalog_get();
268 Relation rel;
269
270 rel = table_open(catalog_get_table_id(catalog, CHUNK), lock);
271 chunk_insert_relation(rel, chunk);
272 table_close(rel, lock);
273 }
274
275 typedef struct CollisionInfo
276 {
277 Hypercube *cube;
278 ChunkStub *colliding_chunk;
279 } CollisionInfo;
280
281 /*-
282 * Align a chunk's hypercube in 'aligned' dimensions.
283 *
284 * Alignment ensures that chunks line up in a particular dimension, i.e., their
285 * ranges should either be identical or not overlap at all.
286 *
287 * Non-aligned:
288 *
289 * ' [---------] <- existing slice
290 * ' [---------] <- calculated (new) slice
291 *
292 * To align the slices above there are two cases depending on where the
293 * insertion point happens:
294 *
295 * Case 1 (reuse slice):
296 *
297 * ' [---------]
298 * ' [--x------]
299 *
300 * The insertion point x falls within the range of the existing slice. We should
301 * reuse the existing slice rather than creating a new one.
302 *
303 * Case 2 (cut to align):
304 *
305 * ' [---------]
306 * ' [-------x-]
307 *
308 * The insertion point falls outside the range of the existing slice and we need
309 * to cut the new slice to line up.
310 *
311 * ' [---------]
312 * ' cut [---]
313 * '
314 *
315 * Note that slice reuse (case 1) happens already when calculating the tentative
316 * hypercube for the chunk, and is thus already performed once reaching this
317 * function. Thus, we deal only with case 2 here. Also note that a new slice
318 * might overlap in complicated ways, requiring multiple cuts. For instance,
319 * consider the following situation:
320 *
321 * ' [------] [-] [---]
322 * ' [---x-------] <- calculated slice
323 *
324 * This should but cut-to-align as follows:
325 *
326 * ' [------] [-] [---]
327 * ' [x]
328 *
329 * After a chunk collision scan, this function is called for each chunk in the
330 * chunk scan context. Chunks in the scan context may have only a partial set of
331 * slices if they only overlap in some, but not all, dimensions (see
332 * illustrations below). Still, partial chunks may still be of interest for
333 * alignment in a particular dimension. Thus, if a chunk has an overlapping
334 * slice in an aligned dimension, we cut to not overlap with that slice.
335 */
336 static ChunkResult
do_dimension_alignment(ChunkScanCtx * scanctx,ChunkStub * stub)337 do_dimension_alignment(ChunkScanCtx *scanctx, ChunkStub *stub)
338 {
339 CollisionInfo *info = scanctx->data;
340 Hypercube *cube = info->cube;
341 const Hyperspace *space = scanctx->space;
342 ChunkResult res = CHUNK_IGNORED;
343 int i;
344
345 for (i = 0; i < space->num_dimensions; i++)
346 {
347 const Dimension *dim = &space->dimensions[i];
348 const DimensionSlice *chunk_slice;
349 DimensionSlice *cube_slice;
350 int64 coord = scanctx->point->coordinates[i];
351
352 if (!dim->fd.aligned)
353 continue;
354
355 /*
356 * The stub might not have a slice for each dimension, so we cannot
357 * use array indexing. Fetch slice by dimension ID instead.
358 */
359 chunk_slice = ts_hypercube_get_slice_by_dimension_id(stub->cube, dim->fd.id);
360
361 if (NULL == chunk_slice)
362 continue;
363
364 cube_slice = cube->slices[i];
365
366 /*
367 * Only cut-to-align if the slices collide and are not identical
368 * (i.e., if we are reusing an existing slice we should not cut it)
369 */
370 if (!ts_dimension_slices_equal(cube_slice, chunk_slice) &&
371 ts_dimension_slices_collide(cube_slice, chunk_slice))
372 {
373 ts_dimension_slice_cut(cube_slice, chunk_slice, coord);
374 res = CHUNK_PROCESSED;
375 }
376 }
377
378 return res;
379 }
380
381 /*
382 * Calculate, and potentially set, a new chunk interval for an open dimension.
383 */
384 static bool
calculate_and_set_new_chunk_interval(const Hypertable * ht,const Point * p)385 calculate_and_set_new_chunk_interval(const Hypertable *ht, const Point *p)
386 {
387 Hyperspace *hs = ht->space;
388 Dimension *dim = NULL;
389 Datum datum;
390 int64 chunk_interval, coord;
391 int i;
392
393 if (!OidIsValid(ht->chunk_sizing_func) || ht->fd.chunk_target_size <= 0)
394 return false;
395
396 /* Find first open dimension */
397 for (i = 0; i < hs->num_dimensions; i++)
398 {
399 dim = &hs->dimensions[i];
400
401 if (IS_OPEN_DIMENSION(dim))
402 break;
403
404 dim = NULL;
405 }
406
407 /* Nothing to do if no open dimension */
408 if (NULL == dim)
409 {
410 elog(WARNING,
411 "adaptive chunking enabled on hypertable \"%s\" without an open (time) dimension",
412 get_rel_name(ht->main_table_relid));
413
414 return false;
415 }
416
417 coord = p->coordinates[i];
418 datum = OidFunctionCall3(ht->chunk_sizing_func,
419 Int32GetDatum(dim->fd.id),
420 Int64GetDatum(coord),
421 Int64GetDatum(ht->fd.chunk_target_size));
422 chunk_interval = DatumGetInt64(datum);
423
424 /* Check if the function didn't set and interval or nothing changed */
425 if (chunk_interval <= 0 || chunk_interval == dim->fd.interval_length)
426 return false;
427
428 /* Update the dimension */
429 ts_dimension_set_chunk_interval(dim, chunk_interval);
430
431 return true;
432 }
433
434 /*
435 * Resolve chunk collisions.
436 *
437 * After a chunk collision scan, this function is called for each chunk in the
438 * chunk scan context. We only care about chunks that have a full set of
439 * slices/constraints that overlap with our tentative hypercube, i.e., they
440 * fully collide. We resolve those collisions by cutting the hypercube.
441 */
442 static ChunkResult
do_collision_resolution(ChunkScanCtx * scanctx,ChunkStub * stub)443 do_collision_resolution(ChunkScanCtx *scanctx, ChunkStub *stub)
444 {
445 CollisionInfo *info = scanctx->data;
446 Hypercube *cube = info->cube;
447 const Hyperspace *space = scanctx->space;
448 ChunkResult res = CHUNK_IGNORED;
449 int i;
450
451 if (stub->cube->num_slices != space->num_dimensions || !ts_hypercubes_collide(cube, stub->cube))
452 return CHUNK_IGNORED;
453
454 for (i = 0; i < space->num_dimensions; i++)
455 {
456 DimensionSlice *cube_slice = cube->slices[i];
457 DimensionSlice *chunk_slice = stub->cube->slices[i];
458 int64 coord = scanctx->point->coordinates[i];
459
460 /*
461 * Only cut if we aren't reusing an existing slice and there is a
462 * collision
463 */
464 if (!ts_dimension_slices_equal(cube_slice, chunk_slice) &&
465 ts_dimension_slices_collide(cube_slice, chunk_slice))
466 {
467 ts_dimension_slice_cut(cube_slice, chunk_slice, coord);
468 res = CHUNK_PROCESSED;
469
470 /*
471 * Redo the collision check after each cut since cutting in one
472 * dimension might have resolved the collision in another
473 */
474 if (!ts_hypercubes_collide(cube, stub->cube))
475 return res;
476 }
477 }
478
479 Assert(!ts_hypercubes_collide(cube, stub->cube));
480
481 return res;
482 }
483
484 static ChunkResult
check_for_collisions(ChunkScanCtx * scanctx,ChunkStub * stub)485 check_for_collisions(ChunkScanCtx *scanctx, ChunkStub *stub)
486 {
487 CollisionInfo *info = scanctx->data;
488 Hypercube *cube = info->cube;
489 const Hyperspace *space = scanctx->space;
490
491 /* Check if this chunk collides with our hypercube */
492 if (stub->cube->num_slices == space->num_dimensions && ts_hypercubes_collide(cube, stub->cube))
493 {
494 info->colliding_chunk = stub;
495 return CHUNK_DONE;
496 }
497
498 return CHUNK_IGNORED;
499 }
500
501 /*
502 * Check if a (tentative) chunk collides with existing chunks.
503 *
504 * Return the colliding chunk. Note that the chunk is a stub and not a full
505 * chunk.
506 */
507 static ChunkStub *
chunk_collides(const Hypertable * ht,const Hypercube * hc)508 chunk_collides(const Hypertable *ht, const Hypercube *hc)
509 {
510 ChunkScanCtx scanctx;
511 CollisionInfo info = {
512 .cube = (Hypercube *) hc,
513 .colliding_chunk = NULL,
514 };
515
516 chunk_scan_ctx_init(&scanctx, ht->space, NULL);
517
518 /* Scan for all chunks that collide with the hypercube of the new chunk */
519 chunk_collision_scan(&scanctx, hc);
520 scanctx.data = &info;
521
522 /* Find chunks that collide */
523 chunk_scan_ctx_foreach_chunk_stub(&scanctx, check_for_collisions, 0);
524
525 chunk_scan_ctx_destroy(&scanctx);
526
527 return info.colliding_chunk;
528 }
529
530 /*-
531 * Resolve collisions and perform alignmment.
532 *
533 * Chunks collide only if their hypercubes overlap in all dimensions. For
534 * instance, the 2D chunks below collide because they overlap in both the X and
535 * Y dimensions:
536 *
537 * ' _____
538 * ' | |
539 * ' | ___|__
540 * ' |_|__| |
541 * ' | |
542 * ' |_____|
543 *
544 * While the following chunks do not collide, although they still overlap in the
545 * X dimension:
546 *
547 * ' _____
548 * ' | |
549 * ' | |
550 * ' |____|
551 * ' ______
552 * ' | |
553 * ' | *|
554 * ' |_____|
555 *
556 * For the collision case above we obviously want to cut our hypercube to no
557 * longer collide with existing chunks. However, the second case might still be
558 * of interest for alignment in case X is an 'aligned' dimension. If '*' is the
559 * insertion point, then we still want to cut the hypercube to ensure that the
560 * dimension remains aligned, like so:
561 *
562 * ' _____
563 * ' | |
564 * ' | |
565 * ' |____|
566 * ' ___
567 * ' | |
568 * ' |*|
569 * ' |_|
570 *
571 *
572 * We perform alignment first as that might actually resolve chunk
573 * collisions. After alignment we check for any remaining collisions.
574 */
575 static void
chunk_collision_resolve(const Hypertable * ht,Hypercube * cube,const Point * p)576 chunk_collision_resolve(const Hypertable *ht, Hypercube *cube, const Point *p)
577 {
578 ChunkScanCtx scanctx;
579 CollisionInfo info = {
580 .cube = cube,
581 .colliding_chunk = NULL,
582 };
583
584 chunk_scan_ctx_init(&scanctx, ht->space, p);
585
586 /* Scan for all chunks that collide with the hypercube of the new chunk */
587 chunk_collision_scan(&scanctx, cube);
588 scanctx.data = &info;
589
590 /* Cut the hypercube in any aligned dimensions */
591 chunk_scan_ctx_foreach_chunk_stub(&scanctx, do_dimension_alignment, 0);
592
593 /*
594 * If there are any remaining collisions with chunks, then cut-to-fit to
595 * resolve those collisions
596 */
597 chunk_scan_ctx_foreach_chunk_stub(&scanctx, do_collision_resolution, 0);
598
599 chunk_scan_ctx_destroy(&scanctx);
600 }
601
602 static int
chunk_add_constraints(const Chunk * chunk)603 chunk_add_constraints(const Chunk *chunk)
604 {
605 int num_added;
606
607 num_added = ts_chunk_constraints_add_dimension_constraints(chunk->constraints,
608 chunk->fd.id,
609 chunk->cube);
610 num_added += ts_chunk_constraints_add_inheritable_constraints(chunk->constraints,
611 chunk->fd.id,
612 chunk->relkind,
613 chunk->hypertable_relid);
614
615 return num_added;
616 }
617
618 /* applies the attributes and statistics target for columns on the hypertable
619 to columns on the chunk */
620 static void
set_attoptions(Relation ht_rel,Oid chunk_oid)621 set_attoptions(Relation ht_rel, Oid chunk_oid)
622 {
623 TupleDesc tupleDesc = RelationGetDescr(ht_rel);
624 int natts = tupleDesc->natts;
625 int attno;
626
627 for (attno = 1; attno <= natts; attno++)
628 {
629 Form_pg_attribute attribute = TupleDescAttr(tupleDesc, attno - 1);
630 char *attributeName = NameStr(attribute->attname);
631 HeapTuple tuple;
632 Datum options;
633 bool isnull;
634
635 /* Ignore dropped */
636 if (attribute->attisdropped)
637 continue;
638
639 tuple = SearchSysCacheAttName(RelationGetRelid(ht_rel), attributeName);
640
641 Assert(tuple != NULL);
642
643 /*
644 * Pass down the attribute options (ALTER TABLE ALTER COLUMN SET
645 * attribute_option)
646 */
647 options = SysCacheGetAttr(ATTNAME, tuple, Anum_pg_attribute_attoptions, &isnull);
648
649 if (!isnull)
650 {
651 AlterTableCmd *cmd = makeNode(AlterTableCmd);
652
653 cmd->subtype = AT_SetOptions;
654 cmd->name = attributeName;
655 cmd->def = (Node *) untransformRelOptions(options);
656 AlterTableInternal(chunk_oid, list_make1(cmd), false);
657 }
658
659 /*
660 * Pass down the attribute options (ALTER TABLE ALTER COLUMN SET
661 * STATISTICS)
662 */
663 options = SysCacheGetAttr(ATTNAME, tuple, Anum_pg_attribute_attstattarget, &isnull);
664 if (!isnull)
665 {
666 int32 target = DatumGetInt32(options);
667
668 /* Don't do anything if it's set to the default */
669 if (target != -1)
670 {
671 AlterTableCmd *cmd = makeNode(AlterTableCmd);
672
673 cmd->subtype = AT_SetStatistics;
674 cmd->name = attributeName;
675 cmd->def = (Node *) makeInteger(target);
676 AlterTableInternal(chunk_oid, list_make1(cmd), false);
677 }
678 }
679
680 ReleaseSysCache(tuple);
681 }
682 }
683
684 static void
create_toast_table(CreateStmt * stmt,Oid chunk_oid)685 create_toast_table(CreateStmt *stmt, Oid chunk_oid)
686 {
687 /* similar to tcop/utility.c */
688 static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
689 Datum toast_options =
690 transformRelOptions((Datum) 0, stmt->options, "toast", validnsps, true, false);
691
692 (void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
693
694 NewRelationCreateToastTable(chunk_oid, toast_options);
695 }
696
697 /*
698 * Get the access method name for a relation.
699 */
700 static char *
get_am_name_for_rel(Oid relid)701 get_am_name_for_rel(Oid relid)
702 {
703 HeapTuple tuple;
704 Form_pg_class cform;
705 Oid amoid;
706
707 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
708
709 if (!HeapTupleIsValid(tuple))
710 elog(ERROR, "cache lookup failed for relation %u", relid);
711
712 cform = (Form_pg_class) GETSTRUCT(tuple);
713 amoid = cform->relam;
714 ReleaseSysCache(tuple);
715
716 return get_am_name(amoid);
717 }
718
719 static void
copy_hypertable_acl_to_relid(const Hypertable * ht,Oid owner_id,Oid relid)720 copy_hypertable_acl_to_relid(const Hypertable *ht, Oid owner_id, Oid relid)
721 {
722 HeapTuple ht_tuple;
723 bool is_null;
724 Datum acl_datum;
725 Relation class_rel;
726
727 /* We open it here since there is no point in trying to update the tuples
728 * if we cannot open the Relation catalog table */
729 class_rel = table_open(RelationRelationId, RowExclusiveLock);
730
731 ht_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(ht->main_table_relid));
732 Assert(HeapTupleIsValid(ht_tuple));
733
734 /* We only bother about setting the chunk ACL if the hypertable ACL is
735 * non-null */
736 acl_datum = SysCacheGetAttr(RELOID, ht_tuple, Anum_pg_class_relacl, &is_null);
737 if (!is_null)
738 {
739 HeapTuple chunk_tuple, newtuple;
740 Datum new_val[Natts_pg_class] = { 0 };
741 bool new_null[Natts_pg_class] = { false };
742 bool new_repl[Natts_pg_class] = { false };
743 Acl *acl = DatumGetAclP(acl_datum);
744
745 new_repl[Anum_pg_class_relacl - 1] = true;
746 new_val[Anum_pg_class_relacl - 1] = PointerGetDatum(acl);
747
748 /* Find the tuple for the chunk in `pg_class` */
749 chunk_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
750 Assert(HeapTupleIsValid(chunk_tuple));
751
752 /* Update the relacl for the chunk tuple to use the acl from the hypertable */
753 newtuple = heap_modify_tuple(chunk_tuple,
754 RelationGetDescr(class_rel),
755 new_val,
756 new_null,
757 new_repl);
758 CatalogTupleUpdate(class_rel, &newtuple->t_self, newtuple);
759
760 /* We need to update the shared dependencies as well to indicate that
761 * the chunk is dependent on any roles that the hypertable is
762 * dependent on. */
763 Oid *newmembers;
764 int nnewmembers = aclmembers(acl, &newmembers);
765
766 /* The list of old members is intentionally empty since we are using
767 * updateAclDependencies to set the ACL for the chunk. We can use NULL
768 * because getOidListDiff, which is called from updateAclDependencies,
769 * can handle that. */
770 updateAclDependencies(RelationRelationId,
771 relid,
772 0,
773 owner_id,
774 0,
775 NULL,
776 nnewmembers,
777 newmembers);
778
779 heap_freetuple(newtuple);
780 ReleaseSysCache(chunk_tuple);
781 }
782
783 ReleaseSysCache(ht_tuple);
784 table_close(class_rel, RowExclusiveLock);
785 }
786
787 /*
788 * Create a chunk's table.
789 *
790 * A chunk inherits from the main hypertable and will have the same owner. Since
791 * chunks can be created either in the TimescaleDB internal schema or in a
792 * user-specified schema, some care has to be taken to use the right
793 * permissions, depending on the case:
794 *
795 * 1. if the chunk is created in the internal schema, we create it as the
796 * catalog/schema owner (i.e., anyone can create chunks there via inserting into
797 * a hypertable, but can not do it via CREATE TABLE).
798 *
799 * 2. if the chunk is created in a user-specified "associated schema", then we
800 * shouldn't use the catalog owner to create the table since that typically
801 * implies super-user permissions. If we would allow that, anyone can specify
802 * someone else's schema in create_hypertable() and create chunks in it without
803 * having the proper permissions to do so. With this logic, the hypertable owner
804 * must have permissions to create tables in the associated schema, or else
805 * table creation will fail. If the schema doesn't yet exist, the table owner
806 * instead needs the proper permissions on the database to create the schema.
807 */
808 Oid
ts_chunk_create_table(const Chunk * chunk,const Hypertable * ht,const char * tablespacename)809 ts_chunk_create_table(const Chunk *chunk, const Hypertable *ht, const char *tablespacename)
810 {
811 Relation rel;
812 ObjectAddress objaddr;
813 int sec_ctx;
814
815 /*
816 * The CreateForeignTableStmt embeds a regular CreateStmt, so we can use
817 * it to create both regular and foreign tables
818 */
819 CreateForeignTableStmt stmt = {
820 .base.type = T_CreateStmt,
821 .base.relation = makeRangeVar((char *) NameStr(chunk->fd.schema_name),
822 (char *) NameStr(chunk->fd.table_name),
823 0),
824 .base.inhRelations = list_make1(makeRangeVar((char *) NameStr(ht->fd.schema_name),
825 (char *) NameStr(ht->fd.table_name),
826 0)),
827 .base.tablespacename = tablespacename ? (char *) tablespacename : NULL,
828 /* Propagate storage options of the main table to a regular chunk
829 * table, but avoid using it for a foreign chunk table. */
830 .base.options =
831 (chunk->relkind == RELKIND_RELATION) ? ts_get_reloptions(ht->main_table_relid) : NIL,
832 .base.accessMethod = (chunk->relkind == RELKIND_RELATION) ?
833 get_am_name_for_rel(chunk->hypertable_relid) :
834 NULL,
835 };
836 Oid uid, saved_uid;
837
838 Assert(chunk->hypertable_relid == ht->main_table_relid);
839
840 rel = table_open(ht->main_table_relid, AccessShareLock);
841
842 /*
843 * If the chunk is created in the internal schema, become the catalog
844 * owner, otherwise become the hypertable owner
845 */
846 if (namestrcmp((Name) &chunk->fd.schema_name, INTERNAL_SCHEMA_NAME) == 0)
847 uid = ts_catalog_database_info_get()->owner_uid;
848 else
849 uid = rel->rd_rel->relowner;
850
851 GetUserIdAndSecContext(&saved_uid, &sec_ctx);
852
853 if (uid != saved_uid)
854 SetUserIdAndSecContext(uid, sec_ctx | SECURITY_LOCAL_USERID_CHANGE);
855
856 objaddr = DefineRelation(&stmt.base, chunk->relkind, rel->rd_rel->relowner, NULL, NULL);
857
858 /* Make the newly defined relation visible so that we can update the
859 * ACL. */
860 CommandCounterIncrement();
861
862 /* Copy acl from hypertable to chunk relation record */
863 copy_hypertable_acl_to_relid(ht, rel->rd_rel->relowner, objaddr.objectId);
864
865 if (chunk->relkind == RELKIND_RELATION)
866 {
867 /*
868 * need to create a toast table explicitly for some of the option
869 * setting to work
870 */
871 create_toast_table(&stmt.base, objaddr.objectId);
872
873 if (uid != saved_uid)
874 SetUserIdAndSecContext(saved_uid, sec_ctx);
875 }
876 else if (chunk->relkind == RELKIND_FOREIGN_TABLE)
877 {
878 ChunkDataNode *cdn;
879
880 if (list_length(chunk->data_nodes) == 0)
881 ereport(ERROR,
882 (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
883 (errmsg("no data nodes associated with chunk \"%s\"",
884 get_rel_name(chunk->table_id)))));
885
886 /*
887 * Use the first chunk data node as the "primary" to put in the foreign
888 * table
889 */
890 cdn = linitial(chunk->data_nodes);
891 stmt.base.type = T_CreateForeignServerStmt;
892 stmt.servername = NameStr(cdn->fd.node_name);
893
894 /* Create the foreign table catalog information */
895 CreateForeignTable(&stmt, objaddr.objectId);
896
897 /*
898 * Need to restore security context to execute remote commands as the
899 * original user
900 */
901 if (uid != saved_uid)
902 SetUserIdAndSecContext(saved_uid, sec_ctx);
903
904 /* Create the corresponding chunk replicas on the remote data nodes */
905 ts_cm_functions->create_chunk_on_data_nodes(chunk, ht, NULL, NIL);
906
907 /* Record the remote data node chunk ID mappings */
908 ts_chunk_data_node_insert_multi(chunk->data_nodes);
909 }
910 else
911 elog(ERROR, "invalid relkind \"%c\" when creating chunk", chunk->relkind);
912
913 set_attoptions(rel, objaddr.objectId);
914
915 table_close(rel, AccessShareLock);
916
917 return objaddr.objectId;
918 }
919
920 static List *
chunk_assign_data_nodes(const Chunk * chunk,const Hypertable * ht)921 chunk_assign_data_nodes(const Chunk *chunk, const Hypertable *ht)
922 {
923 List *htnodes;
924 List *chunk_data_nodes = NIL;
925 ListCell *lc;
926
927 if (chunk->relkind != RELKIND_FOREIGN_TABLE)
928 return NIL;
929
930 if (ht->data_nodes == NIL)
931 ereport(ERROR,
932 (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
933 (errmsg("no data nodes associated with hypertable \"%s\"",
934 get_rel_name(ht->main_table_relid)))));
935
936 Assert(chunk->cube != NULL);
937
938 htnodes = ts_hypertable_assign_chunk_data_nodes(ht, chunk->cube);
939 Assert(htnodes != NIL);
940
941 foreach (lc, htnodes)
942 {
943 HypertableDataNode *htnode = lfirst(lc);
944 ForeignServer *foreign_server =
945 GetForeignServerByName(NameStr(htnode->fd.node_name), false);
946 ChunkDataNode *chunk_data_node = palloc0(sizeof(ChunkDataNode));
947
948 /*
949 * Create a stub data node (partially filled in entry). This will be
950 * fully filled in and persisted to metadata tables once we create the
951 * remote tables during insert
952 */
953 chunk_data_node->fd.chunk_id = chunk->fd.id;
954 chunk_data_node->fd.node_chunk_id = -1;
955 namestrcpy(&chunk_data_node->fd.node_name, foreign_server->servername);
956 chunk_data_node->foreign_server_oid = foreign_server->serverid;
957 chunk_data_nodes = lappend(chunk_data_nodes, chunk_data_node);
958 }
959
960 return chunk_data_nodes;
961 }
962
963 List *
ts_chunk_get_data_node_name_list(const Chunk * chunk)964 ts_chunk_get_data_node_name_list(const Chunk *chunk)
965 {
966 List *datanodes = NULL;
967 ListCell *lc;
968
969 foreach (lc, chunk->data_nodes)
970 {
971 ChunkDataNode *cdn = lfirst(lc);
972
973 datanodes = lappend(datanodes, NameStr(cdn->fd.node_name));
974 }
975
976 return datanodes;
977 }
978
979 bool
ts_chunk_has_data_node(const Chunk * chunk,const char * node_name)980 ts_chunk_has_data_node(const Chunk *chunk, const char *node_name)
981 {
982 ListCell *lc;
983 ChunkDataNode *cdn;
984 bool found = false;
985
986 if (chunk == NULL || node_name == NULL)
987 return false;
988
989 /* check that the chunk is indeed present on the specified data node */
990 foreach (lc, chunk->data_nodes)
991 {
992 cdn = lfirst(lc);
993 if (namestrcmp(&cdn->fd.node_name, node_name) == 0)
994 {
995 found = true;
996 break;
997 }
998 }
999
1000 return found;
1001 }
1002
1003 static int32
get_next_chunk_id()1004 get_next_chunk_id()
1005 {
1006 int32 chunk_id;
1007 CatalogSecurityContext sec_ctx;
1008 const Catalog *catalog = ts_catalog_get();
1009
1010 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
1011 chunk_id = ts_catalog_table_next_seq_id(catalog, CHUNK);
1012 ts_catalog_restore_user(&sec_ctx);
1013
1014 return chunk_id;
1015 }
1016
1017 /*
1018 * Create a chunk object from the dimensional constraints in the given hypercube.
1019 *
1020 * The chunk object is then used to create the actual chunk table and update the
1021 * metadata separately.
1022 *
1023 * The table name for the chunk can be given explicitly, or generated if
1024 * table_name is NULL. If the table name is generated, it will use the given
1025 * prefix or, if NULL, use the hypertable's associated table prefix. Similarly,
1026 * if schema_name is NULL it will use the hypertable's associated schema for
1027 * the chunk.
1028 */
1029 static Chunk *
chunk_create_object(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix,int32 chunk_id)1030 chunk_create_object(const Hypertable *ht, Hypercube *cube, const char *schema_name,
1031 const char *table_name, const char *prefix, int32 chunk_id)
1032 {
1033 const Hyperspace *hs = ht->space;
1034 Chunk *chunk;
1035 const char relkind = hypertable_chunk_relkind(ht);
1036
1037 if (NULL == schema_name || schema_name[0] == '\0')
1038 schema_name = NameStr(ht->fd.associated_schema_name);
1039
1040 /* Create a new chunk based on the hypercube */
1041 chunk = ts_chunk_create_base(chunk_id, hs->num_dimensions, relkind);
1042
1043 chunk->fd.hypertable_id = hs->hypertable_id;
1044 chunk->cube = cube;
1045 chunk->hypertable_relid = ht->main_table_relid;
1046 namestrcpy(&chunk->fd.schema_name, schema_name);
1047
1048 if (NULL == table_name || table_name[0] == '\0')
1049 {
1050 int len;
1051
1052 if (NULL == prefix)
1053 prefix = NameStr(ht->fd.associated_table_prefix);
1054
1055 len = snprintf(chunk->fd.table_name.data, NAMEDATALEN, "%s_%d_chunk", prefix, chunk->fd.id);
1056
1057 if (len >= NAMEDATALEN)
1058 elog(ERROR, "chunk table name too long");
1059 }
1060 else
1061 namestrcpy(&chunk->fd.table_name, table_name);
1062
1063 if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1064 chunk->data_nodes = chunk_assign_data_nodes(chunk, ht);
1065
1066 return chunk;
1067 }
1068
1069 static void
chunk_insert_into_metadata_after_lock(const Chunk * chunk)1070 chunk_insert_into_metadata_after_lock(const Chunk *chunk)
1071 {
1072 /* Insert chunk */
1073 ts_chunk_insert_lock(chunk, RowExclusiveLock);
1074
1075 /* Add metadata for dimensional and inheritable constraints */
1076 ts_chunk_constraints_insert_metadata(chunk->constraints);
1077 }
1078
1079 static void
chunk_create_table_constraints(const Chunk * chunk)1080 chunk_create_table_constraints(const Chunk *chunk)
1081 {
1082 /* Create the chunk's constraints, triggers, and indexes */
1083 ts_chunk_constraints_create(chunk->constraints,
1084 chunk->table_id,
1085 chunk->fd.id,
1086 chunk->hypertable_relid,
1087 chunk->fd.hypertable_id);
1088
1089 if (chunk->relkind == RELKIND_RELATION)
1090 {
1091 ts_trigger_create_all_on_chunk(chunk);
1092 ts_chunk_index_create_all(chunk->fd.hypertable_id,
1093 chunk->hypertable_relid,
1094 chunk->fd.id,
1095 chunk->table_id);
1096 }
1097 }
1098
1099 static Oid
chunk_create_table(Chunk * chunk,const Hypertable * ht)1100 chunk_create_table(Chunk *chunk, const Hypertable *ht)
1101 {
1102 /* Create the actual table relation for the chunk */
1103 const char *tablespace = ts_hypertable_select_tablespace_name(ht, chunk);
1104
1105 chunk->table_id = ts_chunk_create_table(chunk, ht, tablespace);
1106
1107 Assert(OidIsValid(chunk->table_id));
1108
1109 return chunk->table_id;
1110 }
1111
1112 static void
init_scan_by_chunk_id(ScanIterator * iterator,int32 chunk_id)1113 init_scan_by_chunk_id(ScanIterator *iterator, int32 chunk_id)
1114 {
1115 iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
1116 ts_scan_iterator_scan_key_init(iterator,
1117 Anum_chunk_idx_id,
1118 BTEqualStrategyNumber,
1119 F_INT4EQ,
1120 Int32GetDatum(chunk_id));
1121 }
1122
1123 /*
1124 * Creates only a table for a chunk.
1125 * Either table name or chunk id needs to be provided.
1126 */
1127 static Chunk *
chunk_create_only_table_after_lock(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix,int32 chunk_id)1128 chunk_create_only_table_after_lock(const Hypertable *ht, Hypercube *cube, const char *schema_name,
1129 const char *table_name, const char *prefix, int32 chunk_id)
1130 {
1131 Chunk *chunk;
1132
1133 Assert(table_name != NULL || chunk_id != INVALID_CHUNK_ID);
1134
1135 chunk = chunk_create_object(ht, cube, schema_name, table_name, prefix, chunk_id);
1136 Assert(chunk != NULL);
1137
1138 chunk_create_table(chunk, ht);
1139
1140 return chunk;
1141 }
1142
1143 static void
chunk_table_drop_inherit(const Chunk * chunk,Hypertable * ht)1144 chunk_table_drop_inherit(const Chunk *chunk, Hypertable *ht)
1145 {
1146 AlterTableCmd drop_inh_cmd = {
1147 .type = T_AlterTableCmd,
1148 .subtype = AT_DropInherit,
1149 .def = (Node *) makeRangeVar(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name), -1),
1150 .missing_ok = false
1151 };
1152
1153 AlterTableInternal(chunk->table_id, list_make1(&drop_inh_cmd), false);
1154 }
1155
1156 /*
1157 * Checks that given hypercube does not collide with existing chunks and
1158 * creates an empty table for a chunk without any metadata modifications.
1159 */
1160 Chunk *
ts_chunk_create_only_table(Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name)1161 ts_chunk_create_only_table(Hypertable *ht, Hypercube *cube, const char *schema_name,
1162 const char *table_name)
1163 {
1164 ChunkStub *stub;
1165 Chunk *chunk;
1166 ScanTupLock tuplock = {
1167 .lockmode = LockTupleKeyShare,
1168 .waitpolicy = LockWaitBlock,
1169 };
1170
1171 /*
1172 * Chunk table can be created if no chunk collides with the dimension slices.
1173 */
1174 stub = chunk_collides(ht, cube);
1175 if (stub != NULL)
1176 ereport(ERROR,
1177 (errcode(ERRCODE_TS_CHUNK_COLLISION),
1178 errmsg("chunk table creation failed due to dimension slice collision")));
1179
1180 /*
1181 * Serialize chunk creation around a lock on the "main table" to avoid
1182 * multiple processes trying to create the same chunk. We use a
1183 * ShareUpdateExclusiveLock, which is the weakest lock possible that
1184 * conflicts with itself. The lock needs to be held until transaction end.
1185 */
1186 LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1187
1188 ts_hypercube_find_existing_slices(cube, &tuplock);
1189
1190 chunk = chunk_create_only_table_after_lock(ht,
1191 cube,
1192 schema_name,
1193 table_name,
1194 NULL,
1195 INVALID_CHUNK_ID);
1196 chunk_table_drop_inherit(chunk, ht);
1197
1198 return chunk;
1199 }
1200
1201 static Chunk *
chunk_create_from_hypercube_after_lock(const Hypertable * ht,Hypercube * cube,const char * schema_name,const char * table_name,const char * prefix)1202 chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
1203 const char *schema_name, const char *table_name,
1204 const char *prefix)
1205 {
1206 /* Insert any new dimension slices into metadata */
1207 ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);
1208
1209 Chunk *chunk = chunk_create_only_table_after_lock(ht,
1210 cube,
1211 schema_name,
1212 table_name,
1213 prefix,
1214 get_next_chunk_id());
1215
1216 chunk_add_constraints(chunk);
1217 chunk_insert_into_metadata_after_lock(chunk);
1218 chunk_create_table_constraints(chunk);
1219
1220 return chunk;
1221 }
1222
1223 /*
1224 * Make a chunk table inherit a hypertable.
1225 *
1226 * Execution happens via high-level ALTER TABLE statement. This includes
1227 * numerous checks to ensure that the chunk table has all the prerequisites to
1228 * properly inherit the hypertable.
1229 */
1230 static void
chunk_add_inheritance(Chunk * chunk,const Hypertable * ht)1231 chunk_add_inheritance(Chunk *chunk, const Hypertable *ht)
1232 {
1233 AlterTableCmd altercmd = {
1234 .type = T_AlterTableCmd,
1235 .subtype = AT_AddInherit,
1236 .def = (Node *) makeRangeVar((char *) NameStr(ht->fd.schema_name),
1237 (char *) NameStr(ht->fd.table_name),
1238 0),
1239 .missing_ok = false,
1240 };
1241 AlterTableStmt alterstmt = {
1242 .type = T_AlterTableStmt,
1243 .cmds = list_make1(&altercmd),
1244 .missing_ok = false,
1245 #if PG14_GE
1246 .objtype = OBJECT_TABLE,
1247 #else
1248 .relkind = OBJECT_TABLE,
1249 #endif
1250 .relation = makeRangeVar((char *) NameStr(chunk->fd.schema_name),
1251 (char *) NameStr(chunk->fd.table_name),
1252 0),
1253 };
1254 LOCKMODE lockmode = AlterTableGetLockLevel(alterstmt.cmds);
1255 #if PG13_GE
1256 AlterTableUtilityContext atcontext = {
1257 .relid = AlterTableLookupRelation(&alterstmt, lockmode),
1258 };
1259
1260 AlterTable(&alterstmt, lockmode, &atcontext);
1261 #else
1262 AlterTable(AlterTableLookupRelation(&alterstmt, lockmode), lockmode, &alterstmt);
1263 #endif
1264 }
1265
1266 static Chunk *
chunk_create_from_hypercube_and_table_after_lock(const Hypertable * ht,Hypercube * cube,Oid chunk_table_relid,const char * schema_name,const char * table_name,const char * prefix)1267 chunk_create_from_hypercube_and_table_after_lock(const Hypertable *ht, Hypercube *cube,
1268 Oid chunk_table_relid, const char *schema_name,
1269 const char *table_name, const char *prefix)
1270 {
1271 Oid current_chunk_schemaid = get_rel_namespace(chunk_table_relid);
1272 Oid new_chunk_schemaid = InvalidOid;
1273 Chunk *chunk;
1274
1275 Assert(OidIsValid(chunk_table_relid));
1276 Assert(OidIsValid(current_chunk_schemaid));
1277
1278 /* Insert any new dimension slices into metadata */
1279 ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);
1280 chunk = chunk_create_object(ht, cube, schema_name, table_name, prefix, get_next_chunk_id());
1281 chunk->table_id = chunk_table_relid;
1282 chunk->hypertable_relid = ht->main_table_relid;
1283 Assert(OidIsValid(ht->main_table_relid));
1284
1285 new_chunk_schemaid = get_namespace_oid(NameStr(chunk->fd.schema_name), false);
1286
1287 if (current_chunk_schemaid != new_chunk_schemaid)
1288 {
1289 Relation chunk_rel = table_open(chunk_table_relid, AccessExclusiveLock);
1290 ObjectAddresses *objects;
1291
1292 CheckSetNamespace(current_chunk_schemaid, new_chunk_schemaid);
1293 objects = new_object_addresses();
1294 AlterTableNamespaceInternal(chunk_rel, current_chunk_schemaid, new_chunk_schemaid, objects);
1295 free_object_addresses(objects);
1296 table_close(chunk_rel, NoLock);
1297 /* Make changes visible */
1298 CommandCounterIncrement();
1299 }
1300
1301 if (namestrcmp(&chunk->fd.table_name, get_rel_name(chunk_table_relid)) != 0)
1302 {
1303 /* Renaming will acquire and keep an AccessExclusivelock on the chunk
1304 * table */
1305 RenameRelationInternal(chunk_table_relid, NameStr(chunk->fd.table_name), true, false);
1306 /* Make changes visible */
1307 CommandCounterIncrement();
1308 }
1309
1310 /* Note that we do not automatically add constrains and triggers to the
1311 * chunk table when the chunk is created from an existing table. However,
1312 * PostgreSQL currently validates that CHECK constraints exists, but no
1313 * validation is done for other objects, including triggers, UNIQUE,
1314 * PRIMARY KEY, and FOREIGN KEY constraints. We might want to either
1315 * enforce that these constraints exist prior to creating the chunk from a
1316 * table, or we ensure that they are automatically added when the chunk is
1317 * created. However, for the latter case, we risk duplicating constraints
1318 * and triggers if some of them already exist on the chunk table prior to
1319 * creating the chunk from it. */
1320 chunk_add_constraints(chunk);
1321 chunk_insert_into_metadata_after_lock(chunk);
1322 chunk_add_inheritance(chunk, ht);
1323 chunk_create_table_constraints(chunk);
1324
1325 return chunk;
1326 }
1327
1328 static Chunk *
chunk_create_from_point_after_lock(const Hypertable * ht,const Point * p,const char * schema_name,const char * table_name,const char * prefix)1329 chunk_create_from_point_after_lock(const Hypertable *ht, const Point *p, const char *schema_name,
1330 const char *table_name, const char *prefix)
1331 {
1332 Hyperspace *hs = ht->space;
1333 Hypercube *cube;
1334 ScanTupLock tuplock = {
1335 .lockmode = LockTupleKeyShare,
1336 .waitpolicy = LockWaitBlock,
1337 };
1338
1339 /*
1340 * If the user has enabled adaptive chunking, call the function to
1341 * calculate and set the new chunk time interval.
1342 */
1343 calculate_and_set_new_chunk_interval(ht, p);
1344
1345 /* Calculate the hypercube for a new chunk that covers the tuple's point.
1346 *
1347 * We lock the tuple in KEY SHARE mode since we are concerned with
1348 * ensuring that it is not deleted (or the key value changed) while we are
1349 * adding chunk constraints (in `ts_chunk_constraints_insert_metadata`
1350 * called in `chunk_create_metadata_after_lock`). The range of a dimension
1351 * slice does not change, but we should use the weakest lock possible to
1352 * not unnecessarily block other operations. */
1353 cube = ts_hypercube_calculate_from_point(hs, p, &tuplock);
1354
1355 /* Resolve collisions with other chunks by cutting the new hypercube */
1356 chunk_collision_resolve(ht, cube, p);
1357
1358 return chunk_create_from_hypercube_after_lock(ht, cube, schema_name, table_name, prefix);
1359 }
1360
1361 Chunk *
ts_chunk_find_or_create_without_cuts(const Hypertable * ht,Hypercube * hc,const char * schema_name,const char * table_name,Oid chunk_table_relid,bool * created)1362 ts_chunk_find_or_create_without_cuts(const Hypertable *ht, Hypercube *hc, const char *schema_name,
1363 const char *table_name, Oid chunk_table_relid, bool *created)
1364 {
1365 ChunkStub *stub;
1366 Chunk *chunk = NULL;
1367
1368 DEBUG_WAITPOINT("find_or_create_chunk_start");
1369
1370 stub = chunk_collides(ht, hc);
1371
1372 if (NULL == stub)
1373 {
1374 /* Serialize chunk creation around the root hypertable */
1375 LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1376
1377 /* Check again after lock */
1378 stub = chunk_collides(ht, hc);
1379
1380 if (NULL == stub)
1381 {
1382 ScanTupLock tuplock = {
1383 .lockmode = LockTupleKeyShare,
1384 .waitpolicy = LockWaitBlock,
1385 };
1386
1387 /* Lock all slices that already exist to ensure they remain when we
1388 * commit since we won't create those slices ourselves. */
1389 ts_hypercube_find_existing_slices(hc, &tuplock);
1390
1391 if (OidIsValid(chunk_table_relid))
1392 chunk = chunk_create_from_hypercube_and_table_after_lock(ht,
1393 hc,
1394 chunk_table_relid,
1395 schema_name,
1396 table_name,
1397 NULL);
1398 else
1399 chunk =
1400 chunk_create_from_hypercube_after_lock(ht, hc, schema_name, table_name, NULL);
1401
1402 if (NULL != created)
1403 *created = true;
1404
1405 ASSERT_IS_VALID_CHUNK(chunk);
1406
1407 DEBUG_WAITPOINT("find_or_create_chunk_created");
1408
1409 return chunk;
1410 }
1411
1412 /* We didn't need the lock, so release it */
1413 UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1414 }
1415
1416 Assert(NULL != stub);
1417
1418 /* We can only use an existing chunk if it has identical dimensional
1419 * constraints. Otherwise, throw an error */
1420 if (!ts_hypercube_equal(stub->cube, hc))
1421 ereport(ERROR,
1422 (errcode(ERRCODE_TS_CHUNK_COLLISION),
1423 errmsg("chunk creation failed due to collision")));
1424
1425 /* chunk_collides only returned a stub, so we need to lookup the full
1426 * chunk. */
1427 chunk = ts_chunk_get_by_id(stub->id, true);
1428
1429 if (NULL != created)
1430 *created = false;
1431
1432 DEBUG_WAITPOINT("find_or_create_chunk_found");
1433
1434 ASSERT_IS_VALID_CHUNK(chunk);
1435
1436 return chunk;
1437 }
1438
1439 /*
1440 * Create a chunk through insertion of a tuple at a given point.
1441 */
1442 Chunk *
ts_chunk_create_from_point(const Hypertable * ht,const Point * p,const char * schema,const char * prefix)1443 ts_chunk_create_from_point(const Hypertable *ht, const Point *p, const char *schema,
1444 const char *prefix)
1445 {
1446 Chunk *chunk;
1447
1448 /*
1449 * Serialize chunk creation around a lock on the "main table" to avoid
1450 * multiple processes trying to create the same chunk. We use a
1451 * ShareUpdateExclusiveLock, which is the weakest lock possible that
1452 * conflicts with itself. The lock needs to be held until transaction end.
1453 */
1454 LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1455
1456 /* Recheck if someone else created the chunk before we got the table
1457 * lock. The returned chunk will have all slices locked so that they
1458 * aren't removed. */
1459 chunk = chunk_find(ht, p, true, true);
1460
1461 if (NULL == chunk)
1462 {
1463 if (hypertable_is_distributed_member(ht))
1464 ereport(ERROR,
1465 (errcode(ERRCODE_TS_INTERNAL_ERROR),
1466 errmsg("distributed hypertable member cannot create chunk on its own"),
1467 errhint("Chunk creation should only happen through an access node.")));
1468
1469 chunk = chunk_create_from_point_after_lock(ht, p, schema, NULL, prefix);
1470 }
1471 else
1472 {
1473 /* Chunk was not created, so we can release the lock early */
1474 UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
1475 }
1476
1477 ASSERT_IS_VALID_CHUNK(chunk);
1478
1479 return chunk;
1480 }
1481
1482 ChunkStub *
ts_chunk_stub_create(int32 id,int16 num_constraints)1483 ts_chunk_stub_create(int32 id, int16 num_constraints)
1484 {
1485 ChunkStub *stub;
1486
1487 stub = palloc0(sizeof(*stub));
1488 stub->id = id;
1489
1490 if (num_constraints > 0)
1491 stub->constraints = ts_chunk_constraints_alloc(num_constraints, CurrentMemoryContext);
1492
1493 return stub;
1494 }
1495
1496 Chunk *
ts_chunk_create_base(int32 id,int16 num_constraints,const char relkind)1497 ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind)
1498 {
1499 Chunk *chunk;
1500
1501 chunk = palloc0(sizeof(Chunk));
1502 chunk->fd.id = id;
1503 chunk->fd.compressed_chunk_id = INVALID_CHUNK_ID;
1504 chunk->relkind = relkind;
1505
1506 if (num_constraints > 0)
1507 chunk->constraints = ts_chunk_constraints_alloc(num_constraints, CurrentMemoryContext);
1508
1509 return chunk;
1510 }
1511
1512 /*
1513 * Build a chunk from a chunk tuple and a stub.
1514 *
1515 * The stub allows the chunk to be constructed more efficiently. But if the stub
1516 * is not "valid", dimension slices and constraints are fully
1517 * rescanned/recreated.
1518 */
1519 static Chunk *
chunk_build_from_tuple_and_stub(Chunk ** chunkptr,TupleInfo * ti,const ChunkStub * stub)1520 chunk_build_from_tuple_and_stub(Chunk **chunkptr, TupleInfo *ti, const ChunkStub *stub)
1521 {
1522 Chunk *chunk = NULL;
1523 int num_constraints_hint = stub ? stub->constraints->num_constraints : 2;
1524
1525 if (NULL == chunkptr)
1526 chunkptr = &chunk;
1527
1528 if (NULL == *chunkptr)
1529 *chunkptr = MemoryContextAllocZero(ti->mctx, sizeof(Chunk));
1530
1531 chunk = *chunkptr;
1532 chunk_formdata_fill(&chunk->fd, ti);
1533
1534 /*
1535 * When searching for the chunk stub matching the dimensional point, we
1536 * only scanned for dimensional constraints. We now need to rescan the
1537 * constraints to also get the inherited constraints.
1538 */
1539 chunk->constraints =
1540 ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, num_constraints_hint, ti->mctx);
1541
1542 /* If a stub is provided then reuse its hypercube. Note that stubs that
1543 * are results of a point or range scan might be incomplete (in terms of
1544 * number of slices and constraints). Only a chunk stub that matches in
1545 * all dimensions will have a complete hypercube. Thus, we need to check
1546 * the validity of the stub before we can reuse it.
1547 */
1548 if (chunk_stub_is_valid(stub, chunk->constraints->num_dimension_constraints))
1549 {
1550 MemoryContext oldctx = MemoryContextSwitchTo(ti->mctx);
1551
1552 chunk->cube = ts_hypercube_copy(stub->cube);
1553 MemoryContextSwitchTo(oldctx);
1554
1555 /*
1556 * The hypercube slices were filled in during the scan. Now we need to
1557 * sort them in dimension order.
1558 */
1559 ts_hypercube_slice_sort(chunk->cube);
1560 }
1561 else
1562 chunk->cube = ts_hypercube_from_constraints(chunk->constraints, ti->mctx);
1563
1564 return chunk;
1565 }
1566
1567 static ScanFilterResult
chunk_tuple_dropped_filter(const TupleInfo * ti,void * arg)1568 chunk_tuple_dropped_filter(const TupleInfo *ti, void *arg)
1569 {
1570 ChunkStubScanCtx *stubctx = arg;
1571 bool isnull;
1572 Datum dropped = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
1573
1574 Assert(!isnull);
1575 stubctx->is_dropped = DatumGetBool(dropped);
1576
1577 return stubctx->is_dropped ? SCAN_EXCLUDE : SCAN_INCLUDE;
1578 }
1579
1580 /* This is a modified version of chunk_tuple_dropped_filter that does
1581 * not use ChunkStubScanCtx as the arg, it just ignores the passed in
1582 * argument.
1583 * We need a variant as the ScannerCtx assumes that the the filter function
1584 * and tuple_found function share the argument.
1585 */
1586 static ScanFilterResult
chunk_check_ignorearg_dropped_filter(const TupleInfo * ti,void * arg)1587 chunk_check_ignorearg_dropped_filter(const TupleInfo *ti, void *arg)
1588 {
1589 bool isnull;
1590 Datum dropped = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
1591
1592 Assert(!isnull);
1593 bool is_dropped = DatumGetBool(dropped);
1594
1595 return is_dropped ? SCAN_EXCLUDE : SCAN_INCLUDE;
1596 }
1597
1598 static ScanTupleResult
chunk_tuple_found(TupleInfo * ti,void * arg)1599 chunk_tuple_found(TupleInfo *ti, void *arg)
1600 {
1601 ChunkStubScanCtx *stubctx = arg;
1602 Chunk *chunk;
1603
1604 chunk = chunk_build_from_tuple_and_stub(&stubctx->chunk, ti, stubctx->stub);
1605 Assert(!chunk->fd.dropped);
1606
1607 /* Fill in table relids. Note that we cannot do this in
1608 * chunk_build_from_tuple_and_stub() since chunk_resurrect() also uses
1609 * that function and, in that case, the chunk object is needed to create
1610 * the data table and related objects. */
1611 chunk->table_id = get_relname_relid(chunk->fd.table_name.data,
1612 get_namespace_oid(chunk->fd.schema_name.data, true));
1613 chunk->hypertable_relid = ts_hypertable_id_to_relid(chunk->fd.hypertable_id);
1614 chunk->relkind = get_rel_relkind(chunk->table_id);
1615
1616 if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1617 chunk->data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, ti->mctx);
1618
1619 return SCAN_DONE;
1620 }
1621
1622 /* Create a chunk by scanning on chunk ID. A stub must be provided as input. */
1623 static Chunk *
chunk_create_from_stub(ChunkStubScanCtx * stubctx)1624 chunk_create_from_stub(ChunkStubScanCtx *stubctx)
1625 {
1626 ScanKeyData scankey[1];
1627 Catalog *catalog = ts_catalog_get();
1628 int num_found;
1629 ScannerCtx scanctx = {
1630 .table = catalog_get_table_id(catalog, CHUNK),
1631 .index = catalog_get_index(catalog, CHUNK, CHUNK_ID_INDEX),
1632 .nkeys = 1,
1633 .scankey = scankey,
1634 .data = stubctx,
1635 .filter = chunk_tuple_dropped_filter,
1636 .tuple_found = chunk_tuple_found,
1637 .lockmode = AccessShareLock,
1638 .scandirection = ForwardScanDirection,
1639 };
1640
1641 /*
1642 * Perform an index scan on chunk ID.
1643 */
1644 ScanKeyInit(&scankey[0],
1645 Anum_chunk_idx_id,
1646 BTEqualStrategyNumber,
1647 F_INT4EQ,
1648 Int32GetDatum(stubctx->stub->id));
1649
1650 num_found = ts_scanner_scan(&scanctx);
1651
1652 Assert(num_found == 0 || num_found == 1);
1653
1654 if (stubctx->is_dropped)
1655 {
1656 Assert(num_found == 0);
1657 return NULL;
1658 }
1659
1660 if (num_found != 1)
1661 elog(ERROR, "no chunk found with ID %d", stubctx->stub->id);
1662
1663 Assert(NULL != stubctx->chunk);
1664
1665 return stubctx->chunk;
1666 }
1667
1668 /*
1669 * Initialize a chunk scan context.
1670 *
1671 * A chunk scan context is used to join chunk-related information from metadata
1672 * tables during scans.
1673 */
1674 static void
chunk_scan_ctx_init(ChunkScanCtx * ctx,const Hyperspace * hs,const Point * p)1675 chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *p)
1676 {
1677 struct HASHCTL hctl = {
1678 .keysize = sizeof(int32),
1679 .entrysize = sizeof(ChunkScanEntry),
1680 .hcxt = CurrentMemoryContext,
1681 };
1682
1683 memset(ctx, 0, sizeof(*ctx));
1684 ctx->htab = hash_create("chunk-scan-context", 20, &hctl, HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1685 ctx->space = hs;
1686 ctx->point = p;
1687 ctx->lockmode = NoLock;
1688 }
1689
1690 /*
1691 * Destroy the chunk scan context.
1692 *
1693 * This will free the hash table in the context, but not the chunks within since
1694 * they are not allocated on the hash tables memory context.
1695 */
1696 static void
chunk_scan_ctx_destroy(ChunkScanCtx * ctx)1697 chunk_scan_ctx_destroy(ChunkScanCtx *ctx)
1698 {
1699 hash_destroy(ctx->htab);
1700 }
1701
1702 static inline void
dimension_slice_and_chunk_constraint_join(ChunkScanCtx * scanctx,const DimensionVec * vec)1703 dimension_slice_and_chunk_constraint_join(ChunkScanCtx *scanctx, const DimensionVec *vec)
1704 {
1705 int i;
1706
1707 for (i = 0; i < vec->num_slices; i++)
1708 {
1709 /*
1710 * For each dimension slice, find matching constraints. These will be
1711 * saved in the scan context
1712 */
1713 ts_chunk_constraint_scan_by_dimension_slice(vec->slices[i], scanctx, CurrentMemoryContext);
1714 }
1715 }
1716
1717 /*
1718 * Scan for the chunk that encloses the given point.
1719 *
1720 * In each dimension there can be one or more slices that match the point's
1721 * coordinate in that dimension. Slices are collected in the scan context's hash
1722 * table according to the chunk IDs they are associated with. A slice might
1723 * represent the dimensional bound of multiple chunks, and thus is added to all
1724 * the hash table slots of those chunks. At the end of the scan there will be at
1725 * most one chunk that has a complete set of slices, since a point cannot belong
1726 * to two chunks.
1727 */
1728 static void
chunk_point_scan(ChunkScanCtx * scanctx,const Point * p,bool lock_slices)1729 chunk_point_scan(ChunkScanCtx *scanctx, const Point *p, bool lock_slices)
1730 {
1731 int i;
1732
1733 /* Scan all dimensions for slices enclosing the point */
1734 for (i = 0; i < scanctx->space->num_dimensions; i++)
1735 {
1736 DimensionVec *vec;
1737 ScanTupLock tuplock = {
1738 .lockmode = LockTupleKeyShare,
1739 .waitpolicy = LockWaitBlock,
1740 };
1741
1742 vec = ts_dimension_slice_scan_limit(scanctx->space->dimensions[i].fd.id,
1743 p->coordinates[i],
1744 0,
1745 lock_slices ? &tuplock : NULL);
1746
1747 dimension_slice_and_chunk_constraint_join(scanctx, vec);
1748 }
1749 }
1750
1751 /*
1752 * Scan for chunks that collide with the given hypercube.
1753 *
1754 * Collisions are determined using axis-aligned bounding box collision detection
1755 * generalized to N dimensions. Slices are collected in the scan context's hash
1756 * table according to the chunk IDs they are associated with. A slice might
1757 * represent the dimensional bound of multiple chunks, and thus is added to all
1758 * the hash table slots of those chunks. At the end of the scan, those chunks
1759 * that have a full set of slices are the ones that actually collide with the
1760 * given hypercube.
1761 *
1762 * Chunks in the scan context that do not collide (do not have a full set of
1763 * slices), might still be important for ensuring alignment in those dimensions
1764 * that require alignment.
1765 */
1766 static void
chunk_collision_scan(ChunkScanCtx * scanctx,const Hypercube * cube)1767 chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube)
1768 {
1769 int i;
1770
1771 /* Scan all dimensions for colliding slices */
1772 for (i = 0; i < scanctx->space->num_dimensions; i++)
1773 {
1774 DimensionVec *vec;
1775 DimensionSlice *slice = cube->slices[i];
1776
1777 vec = dimension_slice_collision_scan(slice->fd.dimension_id,
1778 slice->fd.range_start,
1779 slice->fd.range_end);
1780
1781 /* Add the slices to all the chunks they are associated with */
1782 dimension_slice_and_chunk_constraint_join(scanctx, vec);
1783 }
1784 }
1785
1786 /*
1787 * Apply a function to each stub in the scan context's hash table. If the limit
1788 * is greater than zero only a limited number of chunks will be processed.
1789 *
1790 * The chunk handler function (on_chunk_func) should return CHUNK_PROCESSED if
1791 * the chunk should be considered processed and count towards the given
1792 * limit. CHUNK_IGNORE can be returned to have a chunk NOT count towards the
1793 * limit. CHUNK_DONE counts the chunk but aborts processing irrespective of
1794 * whether the limit is reached or not.
1795 *
1796 * Returns the number of processed chunks.
1797 */
1798 static int
chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx * ctx,on_chunk_stub_func on_chunk,uint16 limit)1799 chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_func on_chunk, uint16 limit)
1800 {
1801 HASH_SEQ_STATUS status;
1802 ChunkScanEntry *entry;
1803
1804 ctx->num_processed = 0;
1805 hash_seq_init(&status, ctx->htab);
1806
1807 for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status))
1808 {
1809 switch (on_chunk(ctx, entry->stub))
1810 {
1811 case CHUNK_DONE:
1812 ctx->num_processed++;
1813 hash_seq_term(&status);
1814 return ctx->num_processed;
1815 case CHUNK_PROCESSED:
1816 ctx->num_processed++;
1817
1818 if (limit > 0 && ctx->num_processed == limit)
1819 {
1820 hash_seq_term(&status);
1821 return ctx->num_processed;
1822 }
1823 break;
1824 case CHUNK_IGNORED:
1825 break;
1826 }
1827 }
1828
1829 return ctx->num_processed;
1830 }
1831
1832 static ChunkResult
set_complete_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)1833 set_complete_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
1834 {
1835 if (chunk_stub_is_complete(stub, scanctx->space))
1836 {
1837 scanctx->data = stub;
1838 #ifdef USE_ASSERT_CHECKING
1839 return CHUNK_PROCESSED;
1840 #else
1841 return CHUNK_DONE;
1842 #endif
1843 }
1844 return CHUNK_IGNORED;
1845 }
1846
1847 typedef struct ChunkScanCtxAddChunkData
1848 {
1849 Chunk *chunks;
1850 uint64 max_chunks;
1851 uint64 num_chunks;
1852 } ChunkScanCtxAddChunkData;
1853
1854 static ChunkResult
chunk_scan_context_add_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)1855 chunk_scan_context_add_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
1856 {
1857 ChunkScanCtxAddChunkData *data = scanctx->data;
1858 ChunkStubScanCtx stubctx = {
1859 .chunk = &data->chunks[data->num_chunks],
1860 .stub = stub,
1861 };
1862
1863 Assert(data->num_chunks < data->max_chunks);
1864 chunk_create_from_stub(&stubctx);
1865
1866 if (stubctx.is_dropped)
1867 return CHUNK_IGNORED;
1868
1869 data->num_chunks++;
1870
1871 return CHUNK_PROCESSED;
1872 }
1873
1874 /* Finds the first chunk that has a complete set of constraints. There should be
1875 * only one such chunk in the scan context when scanning for the chunk that
1876 * holds a particular tuple/point. */
1877 static ChunkStub *
chunk_scan_ctx_get_chunk_stub(ChunkScanCtx * ctx)1878 chunk_scan_ctx_get_chunk_stub(ChunkScanCtx *ctx)
1879 {
1880 ctx->data = NULL;
1881
1882 #ifdef USE_ASSERT_CHECKING
1883 {
1884 int n = chunk_scan_ctx_foreach_chunk_stub(ctx, set_complete_chunk, 0);
1885
1886 Assert(n == 0 || n == 1);
1887 }
1888 #else
1889 chunk_scan_ctx_foreach_chunk_stub(ctx, set_complete_chunk, 1);
1890 #endif
1891
1892 return ctx->data;
1893 }
1894
1895 /*
1896 * Resurrect a chunk from a tombstone.
1897 *
1898 * A chunk can be dropped while retaining its metadata as a tombstone. Such a
1899 * chunk is marked with dropped=true.
1900 *
1901 * This function resurrects such a dropped chunk based on the original metadata,
1902 * including recreating the table and related objects.
1903 */
1904 static Chunk *
chunk_resurrect(const Hypertable * ht,const ChunkStub * stub)1905 chunk_resurrect(const Hypertable *ht, const ChunkStub *stub)
1906 {
1907 ScanIterator iterator;
1908 Chunk *chunk = NULL;
1909 int count = 0;
1910
1911 Assert(NULL != stub->constraints);
1912 Assert(NULL != stub->cube);
1913
1914 iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
1915 init_scan_by_chunk_id(&iterator, stub->id);
1916
1917 ts_scanner_foreach(&iterator)
1918 {
1919 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
1920 HeapTuple new_tuple;
1921
1922 Assert(count == 0 && chunk == NULL);
1923 chunk = chunk_build_from_tuple_and_stub(NULL, ti, stub);
1924 Assert(chunk->fd.dropped);
1925
1926 /* Create data table and related objects */
1927 chunk->hypertable_relid = ht->main_table_relid;
1928 chunk->relkind = hypertable_chunk_relkind(ht);
1929 if (chunk->relkind == RELKIND_FOREIGN_TABLE)
1930 {
1931 chunk->data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, ti->mctx);
1932 /* If the Data-Node replica list information has been deleted reassign them */
1933 if (!chunk->data_nodes)
1934 chunk->data_nodes = chunk_assign_data_nodes(chunk, ht);
1935 }
1936 chunk->table_id = chunk_create_table(chunk, ht);
1937 chunk_create_table_constraints(chunk);
1938
1939 /* Finally, update the chunk tuple to no longer be a tombstone */
1940 chunk->fd.dropped = false;
1941 new_tuple = chunk_formdata_make_tuple(&chunk->fd, ts_scan_iterator_tupledesc(&iterator));
1942 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
1943 heap_freetuple(new_tuple);
1944 count++;
1945
1946 /* Assume there's only one match. (We break early to avoid scanning
1947 * also the updated tuple.) */
1948 break;
1949 }
1950
1951 ts_scan_iterator_close(&iterator);
1952
1953 Assert(count == 0 || count == 1);
1954
1955 /* If count == 0 and chunk is NULL here, the tombstone (metadata) must
1956 * have been removed before we had a chance to resurrect the chunk */
1957 return chunk;
1958 }
1959
1960 /*
1961 * Find a chunk matching a point in a hypertable's N-dimensional hyperspace.
1962 *
1963 * This involves:
1964 *
1965 * 1) For each dimension:
1966 * - Find all dimension slices that match the dimension
1967 * 2) For each dimension slice:
1968 * - Find all chunk constraints matching the dimension slice
1969 * 3) For each matching chunk constraint
1970 * - Insert a chunk stub into a hash table and add the constraint to the chunk
1971 * - If chunk already exists in hash table, add the constraint to the chunk
1972 * 4) At the end of the scan, only one chunk in the hash table should have
1973 * N number of constraints. This is the matching chunk.
1974 *
1975 * NOTE: this function allocates transient data, e.g., dimension slice,
1976 * constraints and chunks, that in the end are not part of the returned
1977 * chunk. Therefore, this scan should be executed on a transient memory
1978 * context. The returned chunk needs to be copied into another memory context in
1979 * case it needs to live beyond the lifetime of the other data.
1980 */
1981 static Chunk *
chunk_find(const Hypertable * ht,const Point * p,bool resurrect,bool lock_slices)1982 chunk_find(const Hypertable *ht, const Point *p, bool resurrect, bool lock_slices)
1983 {
1984 ChunkStub *stub;
1985 Chunk *chunk = NULL;
1986 ChunkScanCtx ctx;
1987
1988 /* The scan context will keep the state accumulated during the scan */
1989 chunk_scan_ctx_init(&ctx, ht->space, p);
1990
1991 /* Abort the scan when the chunk is found */
1992 ctx.early_abort = true;
1993
1994 /* Scan for the chunk matching the point */
1995 chunk_point_scan(&ctx, p, lock_slices);
1996
1997 /* Find the stub that has N matching dimension constraints */
1998 stub = chunk_scan_ctx_get_chunk_stub(&ctx);
1999
2000 chunk_scan_ctx_destroy(&ctx);
2001
2002 if (NULL != stub)
2003 {
2004 ChunkStubScanCtx stubctx = {
2005 .stub = stub,
2006 };
2007
2008 /* Fill in the rest of the chunk's data from the chunk table, unless
2009 * the chunk is marked as dropped. */
2010 chunk = chunk_create_from_stub(&stubctx);
2011
2012 /* Check if the found metadata is a tombstone (dropped=true) */
2013 if (stubctx.is_dropped)
2014 {
2015 Assert(chunk == NULL);
2016
2017 /* Resurrect the chunk if requested by the caller */
2018 if (resurrect)
2019 chunk = chunk_resurrect(ht, stub);
2020 }
2021 }
2022
2023 ASSERT_IS_NULL_OR_VALID_CHUNK(chunk);
2024
2025 return chunk;
2026 }
2027
2028 Chunk *
ts_chunk_find(const Hypertable * ht,const Point * p,bool lock_slices)2029 ts_chunk_find(const Hypertable *ht, const Point *p, bool lock_slices)
2030 {
2031 return chunk_find(ht, p, false, lock_slices);
2032 }
2033
2034 /*
2035 * Find all the chunks in hyperspace that include elements (dimension slices)
2036 * calculated by given range constraints and return the corresponding
2037 * ChunkScanCxt. It is the caller's responsibility to destroy this context after
2038 * usage.
2039 */
2040 static void
chunks_find_all_in_range_limit(const Hypertable * ht,const Dimension * time_dim,StrategyNumber start_strategy,int64 start_value,StrategyNumber end_strategy,int64 end_value,int limit,uint64 * num_found,ScanTupLock * tuplock,ChunkScanCtx * ctx)2041 chunks_find_all_in_range_limit(const Hypertable *ht, const Dimension *time_dim,
2042 StrategyNumber start_strategy, int64 start_value,
2043 StrategyNumber end_strategy, int64 end_value, int limit,
2044 uint64 *num_found, ScanTupLock *tuplock, ChunkScanCtx *ctx)
2045 {
2046 DimensionVec *slices;
2047
2048 Assert(ht != NULL);
2049
2050 /* must have been checked earlier that this is the case */
2051 Assert(time_dim != NULL);
2052
2053 slices = ts_dimension_slice_scan_range_limit(time_dim->fd.id,
2054 start_strategy,
2055 start_value,
2056 end_strategy,
2057 end_value,
2058 limit,
2059 tuplock);
2060
2061 /* The scan context will keep the state accumulated during the scan */
2062 chunk_scan_ctx_init(ctx, ht->space, NULL);
2063
2064 /* No abort when the first chunk is found */
2065 ctx->early_abort = false;
2066
2067 /* Scan for chunks that are in range */
2068 dimension_slice_and_chunk_constraint_join(ctx, slices);
2069
2070 *num_found += hash_get_num_entries(ctx->htab);
2071 }
2072
2073 static ChunkResult
append_chunk_common(ChunkScanCtx * scanctx,ChunkStub * stub,Chunk ** chunk)2074 append_chunk_common(ChunkScanCtx *scanctx, ChunkStub *stub, Chunk **chunk)
2075 {
2076 ChunkStubScanCtx stubctx = {
2077 .stub = stub,
2078 };
2079
2080 *chunk = NULL;
2081
2082 if (!chunk_stub_is_complete(stub, scanctx->space))
2083 return CHUNK_IGNORED;
2084
2085 /* Fill in the rest of the chunk's data from the chunk table */
2086 *chunk = chunk_create_from_stub(&stubctx);
2087
2088 if (stubctx.is_dropped)
2089 return CHUNK_IGNORED;
2090
2091 Assert(OidIsValid((*chunk)->table_id));
2092
2093 if (scanctx->lockmode != NoLock)
2094 LockRelationOid((*chunk)->table_id, scanctx->lockmode);
2095
2096 return CHUNK_PROCESSED;
2097 }
2098
2099 static ChunkResult
append_chunk_oid(ChunkScanCtx * scanctx,ChunkStub * stub)2100 append_chunk_oid(ChunkScanCtx *scanctx, ChunkStub *stub)
2101 {
2102 Chunk *chunk;
2103 ChunkResult res = append_chunk_common(scanctx, stub, &chunk);
2104
2105 if (res == CHUNK_PROCESSED)
2106 {
2107 Assert(chunk != NULL);
2108 scanctx->data = lappend_oid(scanctx->data, chunk->table_id);
2109 }
2110
2111 return res;
2112 }
2113
2114 static ChunkResult
append_chunk(ChunkScanCtx * scanctx,ChunkStub * stub)2115 append_chunk(ChunkScanCtx *scanctx, ChunkStub *stub)
2116 {
2117 Chunk *chunk;
2118 ChunkResult res = append_chunk_common(scanctx, stub, &chunk);
2119
2120 if (res == CHUNK_PROCESSED)
2121 {
2122 Chunk **chunks = scanctx->data;
2123
2124 Assert(chunk != NULL);
2125 Assert(scanctx->num_processed < scanctx->num_complete_chunks);
2126
2127 if (NULL == chunks)
2128 {
2129 Assert(scanctx->num_complete_chunks > 0);
2130 scanctx->data = chunks = palloc(sizeof(Chunk *) * scanctx->num_complete_chunks);
2131 }
2132
2133 chunks[scanctx->num_processed] = chunk;
2134 }
2135
2136 return res;
2137 }
2138
2139 static void *
chunk_find_all(const Hypertable * ht,const List * dimension_vecs,on_chunk_stub_func on_chunk,LOCKMODE lockmode,unsigned int * num_chunks)2140 chunk_find_all(const Hypertable *ht, const List *dimension_vecs, on_chunk_stub_func on_chunk,
2141 LOCKMODE lockmode, unsigned int *num_chunks)
2142 {
2143 ChunkScanCtx ctx;
2144 ListCell *lc;
2145 int num_processed;
2146
2147 /* The scan context will keep the state accumulated during the scan */
2148 chunk_scan_ctx_init(&ctx, ht->space, NULL);
2149
2150 /* Do not abort the scan when one chunk is found */
2151 ctx.early_abort = false;
2152 ctx.lockmode = lockmode;
2153
2154 /* Scan all dimensions for slices enclosing the point */
2155 foreach (lc, dimension_vecs)
2156 {
2157 const DimensionVec *vec = lfirst(lc);
2158
2159 dimension_slice_and_chunk_constraint_join(&ctx, vec);
2160 }
2161
2162 ctx.data = NULL;
2163 num_processed = chunk_scan_ctx_foreach_chunk_stub(&ctx, on_chunk, 0);
2164
2165 if (NULL != num_chunks)
2166 *num_chunks = num_processed;
2167
2168 chunk_scan_ctx_destroy(&ctx);
2169
2170 return ctx.data;
2171 }
2172
2173 Chunk **
ts_chunk_find_all(const Hypertable * ht,const List * dimension_vecs,LOCKMODE lockmode,unsigned int * num_chunks)2174 ts_chunk_find_all(const Hypertable *ht, const List *dimension_vecs, LOCKMODE lockmode,
2175 unsigned int *num_chunks)
2176 {
2177 Chunk **chunks = chunk_find_all(ht, dimension_vecs, append_chunk, lockmode, num_chunks);
2178
2179 #ifdef USE_ASSERT_CHECKING
2180 /* Assert that we never return dropped chunks */
2181 int i;
2182
2183 for (i = 0; i < *num_chunks; i++)
2184 ASSERT_IS_VALID_CHUNK(chunks[i]);
2185 #endif
2186
2187 return chunks;
2188 }
2189
2190 List *
ts_chunk_find_all_oids(const Hypertable * ht,const List * dimension_vecs,LOCKMODE lockmode)2191 ts_chunk_find_all_oids(const Hypertable *ht, const List *dimension_vecs, LOCKMODE lockmode)
2192 {
2193 List *chunks = chunk_find_all(ht, dimension_vecs, append_chunk_oid, lockmode, NULL);
2194
2195 #ifdef USE_ASSERT_CHECKING
2196 /* Assert that we never return dropped chunks */
2197 ListCell *lc;
2198
2199 foreach (lc, chunks)
2200 {
2201 Chunk *chunk = ts_chunk_get_by_relid(lfirst_oid(lc), true);
2202 ASSERT_IS_VALID_CHUNK(chunk);
2203 }
2204 #endif
2205
2206 return chunks;
2207 }
2208
2209 /* show_chunks SQL function handler */
2210 Datum
ts_chunk_show_chunks(PG_FUNCTION_ARGS)2211 ts_chunk_show_chunks(PG_FUNCTION_ARGS)
2212 {
2213 /*
2214 * chunks_return_srf is called even when it is not the first call but only
2215 * after doing some computation first
2216 */
2217 if (SRF_IS_FIRSTCALL())
2218 {
2219 FuncCallContext *funcctx;
2220 Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
2221 Hypertable *ht;
2222 const Dimension *time_dim;
2223 Cache *hcache;
2224 int64 older_than = PG_INT64_MAX;
2225 int64 newer_than = PG_INT64_MIN;
2226 Oid time_type;
2227
2228 hcache = ts_hypertable_cache_pin();
2229 ht = find_hypertable_from_table_or_cagg(hcache, relid);
2230 Assert(ht != NULL);
2231 time_dim = hyperspace_get_open_dimension(ht->space, 0);
2232 Assert(time_dim != NULL);
2233 time_type = ts_dimension_get_partition_type(time_dim);
2234
2235 if (!PG_ARGISNULL(1))
2236 older_than = ts_time_value_from_arg(PG_GETARG_DATUM(1),
2237 get_fn_expr_argtype(fcinfo->flinfo, 1),
2238 time_type);
2239
2240 if (!PG_ARGISNULL(2))
2241 newer_than = ts_time_value_from_arg(PG_GETARG_DATUM(2),
2242 get_fn_expr_argtype(fcinfo->flinfo, 2),
2243 time_type);
2244
2245 funcctx = SRF_FIRSTCALL_INIT();
2246 funcctx->user_fctx = get_chunks_in_time_range(ht,
2247 older_than,
2248 newer_than,
2249 "show_chunks",
2250 funcctx->multi_call_memory_ctx,
2251 &funcctx->max_calls,
2252 NULL);
2253 ts_cache_release(hcache);
2254 }
2255
2256 return chunks_return_srf(fcinfo);
2257 }
2258
2259 static Chunk *
get_chunks_in_time_range(Hypertable * ht,int64 older_than,int64 newer_than,const char * caller_name,MemoryContext mctx,uint64 * num_chunks_returned,ScanTupLock * tuplock)2260 get_chunks_in_time_range(Hypertable *ht, int64 older_than, int64 newer_than,
2261 const char *caller_name, MemoryContext mctx, uint64 *num_chunks_returned,
2262 ScanTupLock *tuplock)
2263 {
2264 MemoryContext oldcontext;
2265 ChunkScanCtx chunk_scan_ctx;
2266 Chunk *chunks;
2267 ChunkScanCtxAddChunkData data;
2268 const Dimension *time_dim;
2269 StrategyNumber start_strategy;
2270 StrategyNumber end_strategy;
2271 uint64 num_chunks = 0;
2272
2273 if (older_than <= newer_than)
2274 ereport(ERROR,
2275 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2276 errmsg("invalid time range"),
2277 errhint("The start of the time range must be before the end.")));
2278
2279 if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
2280 elog(ERROR, "invalid operation on compressed hypertable");
2281
2282 start_strategy = (newer_than == PG_INT64_MIN) ? InvalidStrategy : BTGreaterEqualStrategyNumber;
2283 end_strategy = (older_than == PG_INT64_MAX) ? InvalidStrategy : BTLessStrategyNumber;
2284 time_dim = hyperspace_get_open_dimension(ht->space, 0);
2285
2286 oldcontext = MemoryContextSwitchTo(mctx);
2287 chunks_find_all_in_range_limit(ht,
2288 time_dim,
2289 start_strategy,
2290 newer_than,
2291 end_strategy,
2292 older_than,
2293 -1,
2294 &num_chunks,
2295 tuplock,
2296 &chunk_scan_ctx);
2297 MemoryContextSwitchTo(oldcontext);
2298
2299 chunks = MemoryContextAllocZero(mctx, sizeof(Chunk) * num_chunks);
2300 data = (ChunkScanCtxAddChunkData){
2301 .chunks = chunks,
2302 .max_chunks = num_chunks,
2303 .num_chunks = 0,
2304 };
2305
2306 /* Get all the chunks from the context */
2307 chunk_scan_ctx.data = &data;
2308 chunk_scan_ctx_foreach_chunk_stub(&chunk_scan_ctx, chunk_scan_context_add_chunk, -1);
2309 /*
2310 * only affects ctx.htab Got all the chunk already so can now safely
2311 * destroy the context
2312 */
2313 chunk_scan_ctx_destroy(&chunk_scan_ctx);
2314
2315 *num_chunks_returned = data.num_chunks;
2316 qsort(chunks, *num_chunks_returned, sizeof(Chunk), chunk_cmp);
2317
2318 #ifdef USE_ASSERT_CHECKING
2319 do
2320 {
2321 uint64 i = 0;
2322 /* Assert that we never return dropped chunks */
2323 for (i = 0; i < *num_chunks_returned; i++)
2324 ASSERT_IS_VALID_CHUNK(&chunks[i]);
2325 } while (false);
2326 #endif
2327
2328 return chunks;
2329 }
2330
2331 List *
ts_chunk_data_nodes_copy(const Chunk * chunk)2332 ts_chunk_data_nodes_copy(const Chunk *chunk)
2333 {
2334 List *lcopy = NIL;
2335 ListCell *lc;
2336
2337 foreach (lc, chunk->data_nodes)
2338 {
2339 ChunkDataNode *node = lfirst(lc);
2340 ChunkDataNode *copy = palloc(sizeof(ChunkDataNode));
2341
2342 memcpy(copy, node, sizeof(ChunkDataNode));
2343
2344 lcopy = lappend(lcopy, copy);
2345 }
2346
2347 return lcopy;
2348 }
2349
2350 Chunk *
ts_chunk_copy(const Chunk * chunk)2351 ts_chunk_copy(const Chunk *chunk)
2352 {
2353 Chunk *copy;
2354
2355 ASSERT_IS_VALID_CHUNK(chunk);
2356 copy = palloc(sizeof(Chunk));
2357 memcpy(copy, chunk, sizeof(Chunk));
2358
2359 if (NULL != chunk->constraints)
2360 copy->constraints = ts_chunk_constraints_copy(chunk->constraints);
2361
2362 if (NULL != chunk->cube)
2363 copy->cube = ts_hypercube_copy(chunk->cube);
2364
2365 copy->data_nodes = ts_chunk_data_nodes_copy(chunk);
2366
2367 return copy;
2368 }
2369
2370 static int
chunk_scan_internal(int indexid,ScanKeyData scankey[],int nkeys,tuple_filter_func filter,tuple_found_func tuple_found,void * data,int limit,ScanDirection scandir,LOCKMODE lockmode,MemoryContext mctx)2371 chunk_scan_internal(int indexid, ScanKeyData scankey[], int nkeys, tuple_filter_func filter,
2372 tuple_found_func tuple_found, void *data, int limit, ScanDirection scandir,
2373 LOCKMODE lockmode, MemoryContext mctx)
2374 {
2375 Catalog *catalog = ts_catalog_get();
2376 ScannerCtx ctx = {
2377 .table = catalog_get_table_id(catalog, CHUNK),
2378 .index = catalog_get_index(catalog, CHUNK, indexid),
2379 .nkeys = nkeys,
2380 .data = data,
2381 .scankey = scankey,
2382 .filter = filter,
2383 .tuple_found = tuple_found,
2384 .limit = limit,
2385 .lockmode = lockmode,
2386 .scandirection = scandir,
2387 .result_mctx = mctx,
2388 };
2389
2390 return ts_scanner_scan(&ctx);
2391 }
2392
2393 /*
2394 * Get a window of chunks that "precedes" the given dimensional point.
2395 *
2396 * For instance, if the dimension is "time", then given a point in time the
2397 * function returns the recent chunks that come before the chunk that includes
2398 * that point. The count parameter determines the number or slices the window
2399 * should include in the given dimension. Note, that with multi-dimensional
2400 * partitioning, there might be multiple chunks in each dimensional slice that
2401 * all precede the given point. For instance, the example below shows two
2402 * different situations that each go "back" two slices (count = 2) in the
2403 * x-dimension, but returns two vs. eight chunks due to different
2404 * partitioning.
2405 *
2406 * '_____________
2407 * '| | | * |
2408 * '|___|___|___|
2409 * '
2410 * '
2411 * '____ ________
2412 * '| | | * |
2413 * '|___|___|___|
2414 * '| | | |
2415 * '|___|___|___|
2416 * '| | | |
2417 * '|___|___|___|
2418 * '| | | |
2419 * '|___|___|___|
2420 *
2421 * Note that the returned chunks will be allocated on the given memory
2422 * context, including the list itself. So, beware of not leaking the list if
2423 * the chunks are later cached somewhere else.
2424 */
2425 List *
ts_chunk_get_window(int32 dimension_id,int64 point,int count,MemoryContext mctx)2426 ts_chunk_get_window(int32 dimension_id, int64 point, int count, MemoryContext mctx)
2427 {
2428 List *chunks = NIL;
2429 DimensionVec *dimvec;
2430 int i;
2431
2432 /* Scan for "count" slices that precede the point in the given dimension */
2433 dimvec = ts_dimension_slice_scan_by_dimension_before_point(dimension_id,
2434 point,
2435 count,
2436 BackwardScanDirection,
2437 mctx);
2438
2439 /*
2440 * For each slice, join with any constraints that reference the slice.
2441 * There might be multiple constraints for each slice in case of
2442 * multi-dimensional partitioning.
2443 */
2444 for (i = 0; i < dimvec->num_slices; i++)
2445 {
2446 DimensionSlice *slice = dimvec->slices[i];
2447 ChunkConstraints *ccs = ts_chunk_constraints_alloc(1, mctx);
2448 int j;
2449
2450 ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id, ccs, mctx);
2451
2452 /* For each constraint, find the corresponding chunk */
2453 for (j = 0; j < ccs->num_constraints; j++)
2454 {
2455 ChunkConstraint *cc = &ccs->constraints[j];
2456 Chunk *chunk = ts_chunk_get_by_id(cc->fd.chunk_id, false);
2457 MemoryContext old;
2458
2459 /* Dropped chunks do not contain valid data and must not be returned */
2460 if (!chunk)
2461 continue;
2462 chunk->constraints = ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, 1, mctx);
2463 chunk->cube = ts_hypercube_from_constraints(chunk->constraints, mctx);
2464
2465 /* Allocate the list on the same memory context as the chunks */
2466 old = MemoryContextSwitchTo(mctx);
2467 chunks = lappend(chunks, chunk);
2468 MemoryContextSwitchTo(old);
2469 }
2470 }
2471
2472 #ifdef USE_ASSERT_CHECKING
2473 /* Assert that we never return dropped chunks */
2474 do
2475 {
2476 ListCell *lc;
2477
2478 foreach (lc, chunks)
2479 {
2480 Chunk *chunk = lfirst(lc);
2481 ASSERT_IS_VALID_CHUNK(chunk);
2482 }
2483 } while (false);
2484 #endif
2485
2486 return chunks;
2487 }
2488
2489 static Chunk *
chunk_scan_find(int indexid,ScanKeyData scankey[],int nkeys,MemoryContext mctx,bool fail_if_not_found,const DisplayKeyData displaykey[])2490 chunk_scan_find(int indexid, ScanKeyData scankey[], int nkeys, MemoryContext mctx,
2491 bool fail_if_not_found, const DisplayKeyData displaykey[])
2492 {
2493 ChunkStubScanCtx stubctx = { 0 };
2494 Chunk *chunk;
2495 int num_found;
2496
2497 num_found = chunk_scan_internal(indexid,
2498 scankey,
2499 nkeys,
2500 chunk_tuple_dropped_filter,
2501 chunk_tuple_found,
2502 &stubctx,
2503 1,
2504 ForwardScanDirection,
2505 AccessShareLock,
2506 mctx);
2507 Assert(num_found == 0 || (num_found == 1 && !stubctx.is_dropped));
2508 chunk = stubctx.chunk;
2509
2510 switch (num_found)
2511 {
2512 case 0:
2513 if (fail_if_not_found)
2514 {
2515 int i = 0;
2516 StringInfo info = makeStringInfo();
2517 while (i < nkeys)
2518 {
2519 appendStringInfo(info,
2520 "%s: %s",
2521 displaykey[i].name,
2522 displaykey[i].as_string(scankey[i].sk_argument));
2523 if (++i < nkeys)
2524 appendStringInfoString(info, ", ");
2525 }
2526 ereport(ERROR,
2527 (errcode(ERRCODE_UNDEFINED_OBJECT),
2528 errmsg("chunk not found"),
2529 errdetail("%s", info->data)));
2530 }
2531 break;
2532 case 1:
2533 ASSERT_IS_VALID_CHUNK(chunk);
2534 break;
2535 default:
2536 elog(ERROR, "expected a single chunk, found %d", num_found);
2537 }
2538
2539 return chunk;
2540 }
2541
2542 Chunk *
ts_chunk_get_by_name_with_memory_context(const char * schema_name,const char * table_name,MemoryContext mctx,bool fail_if_not_found)2543 ts_chunk_get_by_name_with_memory_context(const char *schema_name, const char *table_name,
2544 MemoryContext mctx, bool fail_if_not_found)
2545 {
2546 NameData schema, table;
2547 ScanKeyData scankey[2];
2548 static const DisplayKeyData displaykey[2] = {
2549 [0] = { .name = "schema_name", .as_string = DatumGetNameString },
2550 [1] = { .name = "table_name", .as_string = DatumGetNameString },
2551 };
2552
2553 /* Early check for rogue input */
2554 if (schema_name == NULL || table_name == NULL)
2555 {
2556 if (fail_if_not_found)
2557 ereport(ERROR,
2558 (errcode(ERRCODE_UNDEFINED_OBJECT),
2559 errmsg("chunk not found"),
2560 errdetail("schema_name: %s, table_name: %s",
2561 schema_name ? schema_name : "<null>",
2562 table_name ? table_name : "<null>")));
2563 else
2564 return NULL;
2565 }
2566
2567 namestrcpy(&schema, schema_name);
2568 namestrcpy(&table, table_name);
2569
2570 /*
2571 * Perform an index scan on chunk name.
2572 */
2573 ScanKeyInit(&scankey[0],
2574 Anum_chunk_schema_name_idx_schema_name,
2575 BTEqualStrategyNumber,
2576 F_NAMEEQ,
2577 NameGetDatum(&schema));
2578 ScanKeyInit(&scankey[1],
2579 Anum_chunk_schema_name_idx_table_name,
2580 BTEqualStrategyNumber,
2581 F_NAMEEQ,
2582 NameGetDatum(&table));
2583
2584 return chunk_scan_find(CHUNK_SCHEMA_NAME_INDEX,
2585 scankey,
2586 2,
2587 mctx,
2588 fail_if_not_found,
2589 displaykey);
2590 }
2591
2592 Chunk *
ts_chunk_get_by_relid(Oid relid,bool fail_if_not_found)2593 ts_chunk_get_by_relid(Oid relid, bool fail_if_not_found)
2594 {
2595 char *schema;
2596 char *table;
2597
2598 if (!OidIsValid(relid))
2599 {
2600 if (fail_if_not_found)
2601 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("invalid Oid")));
2602 else
2603 return NULL;
2604 }
2605
2606 schema = get_namespace_name(get_rel_namespace(relid));
2607 table = get_rel_name(relid);
2608 return chunk_get_by_name(schema, table, fail_if_not_found);
2609 }
2610
2611 static const char *
DatumGetInt32AsString(Datum datum)2612 DatumGetInt32AsString(Datum datum)
2613 {
2614 char *buf = (char *) palloc(12); /* sign, 10 digits, '\0' */
2615 pg_ltoa(DatumGetInt32(datum), buf);
2616 return buf;
2617 }
2618
2619 Chunk *
ts_chunk_get_by_id(int32 id,bool fail_if_not_found)2620 ts_chunk_get_by_id(int32 id, bool fail_if_not_found)
2621 {
2622 ScanKeyData scankey[1];
2623 static const DisplayKeyData displaykey[1] = {
2624 [0] = { .name = "id", .as_string = DatumGetInt32AsString },
2625 };
2626
2627 /*
2628 * Perform an index scan on chunk id.
2629 */
2630 ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(id));
2631
2632 return chunk_scan_find(CHUNK_ID_INDEX,
2633 scankey,
2634 1,
2635 CurrentMemoryContext,
2636 fail_if_not_found,
2637 displaykey);
2638 }
2639
2640 /*
2641 * Number of chunks created after given chunk.
2642 * If chunk2.id > chunk1.id then chunk2 is created after chunk1
2643 */
2644 int
ts_chunk_num_of_chunks_created_after(const Chunk * chunk)2645 ts_chunk_num_of_chunks_created_after(const Chunk *chunk)
2646 {
2647 ScanKeyData scankey[1];
2648
2649 /*
2650 * Try to find chunks with a greater Id then a given chunk
2651 */
2652 ScanKeyInit(&scankey[0],
2653 Anum_chunk_idx_id,
2654 BTGreaterStrategyNumber,
2655 F_INT4GT,
2656 Int32GetDatum(chunk->fd.id));
2657
2658 return chunk_scan_internal(CHUNK_ID_INDEX,
2659 scankey,
2660 1,
2661 NULL,
2662 NULL,
2663 NULL,
2664 0,
2665 ForwardScanDirection,
2666 AccessShareLock,
2667 CurrentMemoryContext);
2668 }
2669
2670 /*
2671 * Simple scans provide lightweight ways to access chunk information without the
2672 * overhead of getting a full chunk (i.e., no extra metadata, like constraints,
2673 * are joined in). This function forms the basis of a number of lookup functions
2674 * that, e.g., translates a chunk relid to a chunk_id, or vice versa.
2675 */
2676 static bool
chunk_simple_scan(ScanIterator * iterator,FormData_chunk * form,bool missing_ok,const DisplayKeyData displaykey[])2677 chunk_simple_scan(ScanIterator *iterator, FormData_chunk *form, bool missing_ok,
2678 const DisplayKeyData displaykey[])
2679 {
2680 int count = 0;
2681
2682 ts_scanner_foreach(iterator)
2683 {
2684 TupleInfo *ti = ts_scan_iterator_tuple_info(iterator);
2685 chunk_formdata_fill(form, ti);
2686
2687 if (!form->dropped)
2688 count++;
2689 }
2690
2691 Assert(count == 0 || count == 1);
2692
2693 if (count == 0 && !missing_ok)
2694 {
2695 int i = 0;
2696 StringInfo info = makeStringInfo();
2697 while (i < iterator->ctx.nkeys)
2698 {
2699 appendStringInfo(info,
2700 "%s: %s",
2701 displaykey[i].name,
2702 displaykey[i].as_string(iterator->ctx.scankey[i].sk_argument));
2703 if (++i < iterator->ctx.nkeys)
2704 appendStringInfoString(info, ", ");
2705 }
2706 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("chunk not found")));
2707 }
2708
2709 return count == 1;
2710 }
2711
2712 static bool
chunk_simple_scan_by_name(const char * schema,const char * table,FormData_chunk * form,bool missing_ok)2713 chunk_simple_scan_by_name(const char *schema, const char *table, FormData_chunk *form,
2714 bool missing_ok)
2715 {
2716 ScanIterator iterator;
2717 static const DisplayKeyData displaykey[] = {
2718 [0] = { .name = "schema_name", .as_string = DatumGetNameString },
2719 [1] = { .name = "table_name", .as_string = DatumGetNameString },
2720 };
2721
2722 if (schema == NULL || table == NULL)
2723 return false;
2724
2725 iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
2726 init_scan_by_qualified_table_name(&iterator, schema, table);
2727
2728 return chunk_simple_scan(&iterator, form, missing_ok, displaykey);
2729 }
2730
2731 static bool
chunk_simple_scan_by_relid(Oid relid,FormData_chunk * form,bool missing_ok)2732 chunk_simple_scan_by_relid(Oid relid, FormData_chunk *form, bool missing_ok)
2733 {
2734 bool found = false;
2735
2736 if (OidIsValid(relid))
2737 {
2738 const char *table = get_rel_name(relid);
2739
2740 if (table != NULL)
2741 {
2742 Oid nspid = get_rel_namespace(relid);
2743 const char *schema = get_namespace_name(nspid);
2744
2745 found = chunk_simple_scan_by_name(schema, table, form, missing_ok);
2746 }
2747 }
2748
2749 if (!found && !missing_ok)
2750 ereport(ERROR,
2751 (errcode(ERRCODE_UNDEFINED_OBJECT),
2752 errmsg("chunk with relid %u not found", relid)));
2753
2754 return found;
2755 }
2756
2757 static bool
chunk_simple_scan_by_id(int32 chunk_id,FormData_chunk * form,bool missing_ok)2758 chunk_simple_scan_by_id(int32 chunk_id, FormData_chunk *form, bool missing_ok)
2759 {
2760 ScanIterator iterator;
2761 static const DisplayKeyData displaykey[] = {
2762 [0] = { .name = "id", .as_string = DatumGetInt32AsString },
2763 };
2764
2765 iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
2766 init_scan_by_chunk_id(&iterator, chunk_id);
2767
2768 return chunk_simple_scan(&iterator, form, missing_ok, displaykey);
2769 }
2770
2771 /*
2772 * Lookup a Chunk ID from a chunk's relid.
2773 */
2774 Datum
ts_chunk_id_from_relid(PG_FUNCTION_ARGS)2775 ts_chunk_id_from_relid(PG_FUNCTION_ARGS)
2776 {
2777 static Oid last_relid = InvalidOid;
2778 static int32 last_id = 0;
2779 Oid relid = PG_GETARG_OID(0);
2780 FormData_chunk form;
2781
2782 if (last_relid == relid)
2783 return last_id;
2784
2785 chunk_simple_scan_by_relid(relid, &form, false);
2786
2787 last_relid = relid;
2788 last_id = form.id;
2789
2790 PG_RETURN_INT32(last_id);
2791 }
2792
2793 bool
ts_chunk_exists_relid(Oid relid)2794 ts_chunk_exists_relid(Oid relid)
2795 {
2796 FormData_chunk form;
2797
2798 return chunk_simple_scan_by_relid(relid, &form, true);
2799 }
2800
2801 /*
2802 * Get the relid of a chunk given its ID.
2803 */
2804 Oid
ts_chunk_get_relid(int32 chunk_id,bool missing_ok)2805 ts_chunk_get_relid(int32 chunk_id, bool missing_ok)
2806 {
2807 FormData_chunk form = { 0 };
2808 Oid relid = InvalidOid;
2809
2810 if (chunk_simple_scan_by_id(chunk_id, &form, missing_ok))
2811 {
2812 Oid schemaid = get_namespace_oid(NameStr(form.schema_name), missing_ok);
2813
2814 if (OidIsValid(schemaid))
2815 relid = get_relname_relid(NameStr(form.table_name), schemaid);
2816 }
2817
2818 if (!OidIsValid(relid) && !missing_ok)
2819 ereport(ERROR,
2820 (errcode(ERRCODE_UNDEFINED_SCHEMA),
2821 errmsg("chunk with id %d not found", chunk_id)));
2822
2823 return relid;
2824 }
2825
2826 /*
2827 * Get the schema (namespace) of a chunk given its ID.
2828 *
2829 * This is a lightweight way to get the schema of a chunk without creating a
2830 * full Chunk object that joins in constraints, etc.
2831 */
2832 Oid
ts_chunk_get_schema_id(int32 chunk_id,bool missing_ok)2833 ts_chunk_get_schema_id(int32 chunk_id, bool missing_ok)
2834 {
2835 FormData_chunk form = { 0 };
2836
2837 if (!chunk_simple_scan_by_id(chunk_id, &form, missing_ok))
2838 return InvalidOid;
2839
2840 return get_namespace_oid(NameStr(form.schema_name), missing_ok);
2841 }
2842
2843 bool
ts_chunk_get_id(const char * schema,const char * table,int32 * chunk_id,bool missing_ok)2844 ts_chunk_get_id(const char *schema, const char *table, int32 *chunk_id, bool missing_ok)
2845 {
2846 FormData_chunk form = { 0 };
2847
2848 if (!chunk_simple_scan_by_name(schema, table, &form, missing_ok))
2849 return false;
2850
2851 if (NULL != chunk_id)
2852 *chunk_id = form.id;
2853
2854 return true;
2855 }
2856
2857 /*
2858 * Results of deleting a chunk.
2859 *
2860 * A chunk can be deleted in two ways: (1) full delete of data and metadata,
2861 * (2) delete data but preserve metadata (marked with dropped=true). The
2862 * deletion mode (preserve or not) combined with the current state of the
2863 * "dropped" flag on a chunk metadata row leads to a cross-product resulting
2864 * in the following outcomes:
2865 */
2866 typedef enum ChunkDeleteResult
2867 {
2868 /* Deleted a live chunk */
2869 CHUNK_DELETED,
2870 /* Deleted a chunk previously marked "dropped" */
2871 CHUNK_DELETED_DROPPED,
2872 /* Marked a chunk as dropped instead of deleting */
2873 CHUNK_MARKED_DROPPED,
2874 /* Tried to mark a chunk as dropped when it was already marked */
2875 CHUNK_ALREADY_MARKED_DROPPED,
2876 } ChunkDeleteResult;
2877
2878 /* Delete the chunk tuple.
2879 *
2880 * preserve_chunk_catalog_row - instead of deleting the row, mark it as dropped.
2881 * this is used when we need to preserve catalog information about the chunk
2882 * after dropping it. Currently only used when preserving continuous aggregates
2883 * on the chunk after the raw data was dropped. Otherwise, we'd have dangling
2884 * chunk ids left over in the materialization table. Preserve the space dimension
2885 * info about these chunks too.
2886 *
2887 * When chunk rows are preserved, the rows need to be updated to set the
2888 * 'dropped' flag to TRUE. But since this produces a new tuple into the
2889 * metadata table we will process also the new tuple in the same loop, which
2890 * is not only inefficent but could also lead to bugs. For now, we just ignore
2891 * those tuples (the CHUNK_ALREADY_MARKED_DROPPED case), but ideally we
2892 * shouldn't scan the updated tuples at all since it means double the number
2893 * of tuples to process.
2894 */
2895 static ChunkDeleteResult
chunk_tuple_delete(TupleInfo * ti,DropBehavior behavior,bool preserve_chunk_catalog_row)2896 chunk_tuple_delete(TupleInfo *ti, DropBehavior behavior, bool preserve_chunk_catalog_row)
2897 {
2898 FormData_chunk form;
2899 CatalogSecurityContext sec_ctx;
2900 ChunkConstraints *ccs = ts_chunk_constraints_alloc(2, ti->mctx);
2901 ChunkDeleteResult res;
2902 int i;
2903
2904 chunk_formdata_fill(&form, ti);
2905
2906 if (preserve_chunk_catalog_row && form.dropped)
2907 return CHUNK_ALREADY_MARKED_DROPPED;
2908
2909 /* if only marking as deleted, keep the constraints and dimension info */
2910 if (!preserve_chunk_catalog_row)
2911 {
2912 ts_chunk_constraint_delete_by_chunk_id(form.id, ccs);
2913
2914 /* Check for dimension slices that are orphaned by the chunk deletion */
2915 for (i = 0; i < ccs->num_constraints; i++)
2916 {
2917 ChunkConstraint *cc = &ccs->constraints[i];
2918
2919 /*
2920 * Delete the dimension slice if there are no remaining constraints
2921 * referencing it
2922 */
2923 if (is_dimension_constraint(cc))
2924 {
2925 /*
2926 * Dimension slices are shared between chunk constraints and
2927 * subsequently between chunks as well. Since different chunks
2928 * can reference the same dimension slice (through the chunk
2929 * constraint), we must lock the dimension slice in FOR UPDATE
2930 * mode *prior* to scanning the chunk constraints table. If we
2931 * do not do that, we can have the following scenario:
2932 *
2933 * - T1: Prepares to create a chunk that uses an existing dimension slice X
2934 * - T2: Deletes a chunk and dimension slice X because it is not
2935 * references by a chunk constraint.
2936 * - T1: Adds a chunk constraint referencing dimension
2937 * slice X (which is about to be deleted by T2).
2938 */
2939 ScanTupLock tuplock = {
2940 .lockmode = LockTupleExclusive,
2941 .waitpolicy = LockWaitBlock,
2942 };
2943 DimensionSlice *slice =
2944 ts_dimension_slice_scan_by_id_and_lock(cc->fd.dimension_slice_id,
2945 &tuplock,
2946 CurrentMemoryContext);
2947 /* If the slice is not found in the scan above, the table is
2948 * broken so we do not delete the slice. We proceed
2949 * anyway since users need to be able to drop broken tables or
2950 * remove broken chunks. */
2951 if (!slice)
2952 {
2953 const Hypertable *const ht = ts_hypertable_get_by_id(form.hypertable_id);
2954 ereport(WARNING,
2955 (errmsg("unexpected state for chunk %s.%s, dropping anyway",
2956 quote_identifier(NameStr(form.schema_name)),
2957 quote_identifier(NameStr(form.table_name))),
2958 errdetail("The integrity of hypertable %s.%s might be "
2959 "compromised "
2960 "since one of its chunks lacked a dimension slice.",
2961 quote_identifier(NameStr(ht->fd.schema_name)),
2962 quote_identifier(NameStr(ht->fd.table_name)))));
2963 }
2964 else if (ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id,
2965 NULL,
2966 CurrentMemoryContext) == 0)
2967 ts_dimension_slice_delete_by_id(cc->fd.dimension_slice_id, false);
2968 }
2969 }
2970 }
2971
2972 ts_chunk_index_delete_by_chunk_id(form.id, true);
2973 ts_compression_chunk_size_delete(form.id);
2974 ts_chunk_data_node_delete_by_chunk_id(form.id);
2975
2976 /* Delete any row in bgw_policy_chunk-stats corresponding to this chunk */
2977 ts_bgw_policy_chunk_stats_delete_by_chunk_id(form.id);
2978
2979 if (form.compressed_chunk_id != INVALID_CHUNK_ID)
2980 {
2981 Chunk *compressed_chunk = ts_chunk_get_by_id(form.compressed_chunk_id, false);
2982
2983 /* The chunk may have been delete by a CASCADE */
2984 if (compressed_chunk != NULL)
2985 /* Plain drop without preserving catalog row because this is the compressed
2986 * chunk */
2987 ts_chunk_drop(compressed_chunk, behavior, DEBUG1);
2988 }
2989
2990 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
2991
2992 if (!preserve_chunk_catalog_row)
2993 {
2994 ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
2995
2996 if (form.dropped)
2997 res = CHUNK_DELETED_DROPPED;
2998 else
2999 res = CHUNK_DELETED;
3000 }
3001 else
3002 {
3003 HeapTuple new_tuple;
3004
3005 Assert(!form.dropped);
3006
3007 form.compressed_chunk_id = INVALID_CHUNK_ID;
3008 form.dropped = true;
3009 new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3010 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3011 heap_freetuple(new_tuple);
3012 res = CHUNK_MARKED_DROPPED;
3013 }
3014
3015 ts_catalog_restore_user(&sec_ctx);
3016
3017 return res;
3018 }
3019
3020 static void
init_scan_by_qualified_table_name(ScanIterator * iterator,const char * schema_name,const char * table_name)3021 init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
3022 const char *table_name)
3023 {
3024 iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_SCHEMA_NAME_INDEX);
3025 ts_scan_iterator_scan_key_init(iterator,
3026 Anum_chunk_schema_name_idx_schema_name,
3027 BTEqualStrategyNumber,
3028 F_NAMEEQ,
3029 CStringGetDatum(schema_name));
3030 ts_scan_iterator_scan_key_init(iterator,
3031 Anum_chunk_schema_name_idx_table_name,
3032 BTEqualStrategyNumber,
3033 F_NAMEEQ,
3034 CStringGetDatum(table_name));
3035 }
3036
3037 static int
chunk_delete(ScanIterator * iterator,DropBehavior behavior,bool preserve_chunk_catalog_row)3038 chunk_delete(ScanIterator *iterator, DropBehavior behavior, bool preserve_chunk_catalog_row)
3039 {
3040 int count = 0;
3041
3042 ts_scanner_foreach(iterator)
3043 {
3044 ChunkDeleteResult res;
3045
3046 res = chunk_tuple_delete(ts_scan_iterator_tuple_info(iterator),
3047 behavior,
3048 preserve_chunk_catalog_row);
3049
3050 switch (res)
3051 {
3052 case CHUNK_DELETED:
3053 case CHUNK_MARKED_DROPPED:
3054 count++;
3055 break;
3056 case CHUNK_ALREADY_MARKED_DROPPED:
3057 case CHUNK_DELETED_DROPPED:
3058 break;
3059 }
3060 }
3061
3062 return count;
3063 }
3064
3065 static int
ts_chunk_delete_by_name_internal(const char * schema,const char * table,DropBehavior behavior,bool preserve_chunk_catalog_row)3066 ts_chunk_delete_by_name_internal(const char *schema, const char *table, DropBehavior behavior,
3067 bool preserve_chunk_catalog_row)
3068 {
3069 ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3070 int count;
3071
3072 init_scan_by_qualified_table_name(&iterator, schema, table);
3073 count = chunk_delete(&iterator, behavior, preserve_chunk_catalog_row);
3074
3075 /* (schema,table) names and (hypertable_id) are unique so should only have
3076 * dropped one chunk or none (if not found) */
3077 Assert(count == 1 || count == 0);
3078
3079 return count;
3080 }
3081
3082 int
ts_chunk_delete_by_name(const char * schema,const char * table,DropBehavior behavior)3083 ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior)
3084 {
3085 return ts_chunk_delete_by_name_internal(schema, table, behavior, false);
3086 }
3087
3088 static int
ts_chunk_delete_by_relid(Oid relid,DropBehavior behavior,bool preserve_chunk_catalog_row)3089 ts_chunk_delete_by_relid(Oid relid, DropBehavior behavior, bool preserve_chunk_catalog_row)
3090 {
3091 if (!OidIsValid(relid))
3092 return 0;
3093
3094 return ts_chunk_delete_by_name_internal(get_namespace_name(get_rel_namespace(relid)),
3095 get_rel_name(relid),
3096 behavior,
3097 preserve_chunk_catalog_row);
3098 }
3099
3100 static void
init_scan_by_hypertable_id(ScanIterator * iterator,int32 hypertable_id)3101 init_scan_by_hypertable_id(ScanIterator *iterator, int32 hypertable_id)
3102 {
3103 iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_HYPERTABLE_ID_INDEX);
3104 ts_scan_iterator_scan_key_init(iterator,
3105 Anum_chunk_hypertable_id_idx_hypertable_id,
3106 BTEqualStrategyNumber,
3107 F_INT4EQ,
3108 Int32GetDatum(hypertable_id));
3109 }
3110
3111 int
ts_chunk_delete_by_hypertable_id(int32 hypertable_id)3112 ts_chunk_delete_by_hypertable_id(int32 hypertable_id)
3113 {
3114 ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3115
3116 init_scan_by_hypertable_id(&iterator, hypertable_id);
3117
3118 return chunk_delete(&iterator, DROP_RESTRICT, false);
3119 }
3120
3121 bool
ts_chunk_exists_with_compression(int32 hypertable_id)3122 ts_chunk_exists_with_compression(int32 hypertable_id)
3123 {
3124 ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
3125 bool found = false;
3126
3127 init_scan_by_hypertable_id(&iterator, hypertable_id);
3128 ts_scanner_foreach(&iterator)
3129 {
3130 bool isnull_dropped;
3131 bool isnull_chunk_id =
3132 slot_attisnull(ts_scan_iterator_slot(&iterator), Anum_chunk_compressed_chunk_id);
3133 bool dropped = DatumGetBool(
3134 slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_dropped, &isnull_dropped));
3135 /* dropped is not NULLABLE */
3136 Assert(!isnull_dropped);
3137
3138 if (!isnull_chunk_id && !dropped)
3139 {
3140 found = true;
3141 break;
3142 }
3143 }
3144 ts_scan_iterator_close(&iterator);
3145 return found;
3146 }
3147
3148 static void
init_scan_by_compressed_chunk_id(ScanIterator * iterator,int32 compressed_chunk_id)3149 init_scan_by_compressed_chunk_id(ScanIterator *iterator, int32 compressed_chunk_id)
3150 {
3151 iterator->ctx.index =
3152 catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_COMPRESSED_CHUNK_ID_INDEX);
3153 ts_scan_iterator_scan_key_init(iterator,
3154 Anum_chunk_compressed_chunk_id_idx_compressed_chunk_id,
3155 BTEqualStrategyNumber,
3156 F_INT4EQ,
3157 Int32GetDatum(compressed_chunk_id));
3158 }
3159
3160 Chunk *
ts_chunk_get_compressed_chunk_parent(const Chunk * chunk)3161 ts_chunk_get_compressed_chunk_parent(const Chunk *chunk)
3162 {
3163 ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
3164 Oid parent_id = InvalidOid;
3165
3166 init_scan_by_compressed_chunk_id(&iterator, chunk->fd.id);
3167
3168 ts_scanner_foreach(&iterator)
3169 {
3170 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
3171 Datum datum;
3172 bool isnull;
3173
3174 Assert(!OidIsValid(parent_id));
3175 datum = slot_getattr(ti->slot, Anum_chunk_id, &isnull);
3176
3177 if (!isnull)
3178 parent_id = DatumGetObjectId(datum);
3179 }
3180
3181 if (OidIsValid(parent_id))
3182 return ts_chunk_get_by_id(parent_id, true);
3183
3184 return NULL;
3185 }
3186
3187 bool
ts_chunk_contains_compressed_data(const Chunk * chunk)3188 ts_chunk_contains_compressed_data(const Chunk *chunk)
3189 {
3190 Chunk *parent_chunk = ts_chunk_get_compressed_chunk_parent(chunk);
3191
3192 return parent_chunk != NULL;
3193 }
3194
3195 List *
ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id)3196 ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id)
3197 {
3198 List *chunkids = NIL;
3199 ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowExclusiveLock, CurrentMemoryContext);
3200
3201 init_scan_by_hypertable_id(&iterator, hypertable_id);
3202 ts_scanner_foreach(&iterator)
3203 {
3204 bool isnull;
3205 Datum id = slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_id, &isnull);
3206 if (!isnull)
3207 chunkids = lappend_int(chunkids, DatumGetInt32(id));
3208 }
3209
3210 return chunkids;
3211 }
3212
3213 static ChunkResult
chunk_recreate_constraint(ChunkScanCtx * ctx,ChunkStub * stub)3214 chunk_recreate_constraint(ChunkScanCtx *ctx, ChunkStub *stub)
3215 {
3216 ChunkConstraints *ccs = stub->constraints;
3217 ChunkStubScanCtx stubctx = {
3218 .stub = stub,
3219 };
3220 Chunk *chunk;
3221 int i;
3222
3223 chunk = chunk_create_from_stub(&stubctx);
3224
3225 if (stubctx.is_dropped)
3226 elog(ERROR, "should not be recreating constraints on dropped chunks");
3227
3228 for (i = 0; i < ccs->num_constraints; i++)
3229 ts_chunk_constraint_recreate(&ccs->constraints[i], chunk->table_id);
3230
3231 return CHUNK_PROCESSED;
3232 }
3233
3234 void
ts_chunk_recreate_all_constraints_for_dimension(Hyperspace * hs,int32 dimension_id)3235 ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_id)
3236 {
3237 DimensionVec *slices;
3238 ChunkScanCtx chunkctx;
3239 int i;
3240
3241 slices = ts_dimension_slice_scan_by_dimension(dimension_id, 0);
3242
3243 if (NULL == slices)
3244 return;
3245
3246 chunk_scan_ctx_init(&chunkctx, hs, NULL);
3247
3248 for (i = 0; i < slices->num_slices; i++)
3249 ts_chunk_constraint_scan_by_dimension_slice(slices->slices[i],
3250 &chunkctx,
3251 CurrentMemoryContext);
3252
3253 chunk_scan_ctx_foreach_chunk_stub(&chunkctx, chunk_recreate_constraint, 0);
3254 chunk_scan_ctx_destroy(&chunkctx);
3255 }
3256
3257 /*
3258 * Drops all FK constraints on a given chunk.
3259 * Currently it is used only for chunks, which have been compressed and
3260 * contain no data.
3261 */
3262 void
ts_chunk_drop_fks(const Chunk * const chunk)3263 ts_chunk_drop_fks(const Chunk *const chunk)
3264 {
3265 Relation rel;
3266 List *fks;
3267 ListCell *lc;
3268
3269 ASSERT_IS_VALID_CHUNK(chunk);
3270
3271 rel = table_open(chunk->table_id, AccessShareLock);
3272 fks = copyObject(RelationGetFKeyList(rel));
3273 table_close(rel, AccessShareLock);
3274
3275 foreach (lc, fks)
3276 {
3277 const ForeignKeyCacheInfo *const fk = lfirst_node(ForeignKeyCacheInfo, lc);
3278 ts_chunk_constraint_delete_by_constraint_name(chunk->fd.id,
3279 get_constraint_name(fk->conoid),
3280 true,
3281 true);
3282 }
3283 }
3284
3285 /*
3286 * Recreates all FK constraints on a chunk by using the constraints on the parent hypertable
3287 * as a template. Currently it is used only during chunk decompression, since FK constraints
3288 * are dropped during compression.
3289 */
3290 void
ts_chunk_create_fks(const Chunk * const chunk)3291 ts_chunk_create_fks(const Chunk *const chunk)
3292 {
3293 Relation rel;
3294 List *fks;
3295 ListCell *lc;
3296
3297 ASSERT_IS_VALID_CHUNK(chunk);
3298
3299 rel = table_open(chunk->hypertable_relid, AccessShareLock);
3300 fks = copyObject(RelationGetFKeyList(rel));
3301 table_close(rel, AccessShareLock);
3302 foreach (lc, fks)
3303 {
3304 ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc);
3305 ts_chunk_constraint_create_on_chunk(chunk, fk->conoid);
3306 }
3307 }
3308
3309 static ScanTupleResult
chunk_tuple_update_schema_and_table(TupleInfo * ti,void * data)3310 chunk_tuple_update_schema_and_table(TupleInfo *ti, void *data)
3311 {
3312 FormData_chunk form;
3313 FormData_chunk *update = data;
3314 CatalogSecurityContext sec_ctx;
3315 HeapTuple new_tuple;
3316
3317 chunk_formdata_fill(&form, ti);
3318
3319 namestrcpy(&form.schema_name, NameStr(update->schema_name));
3320 namestrcpy(&form.table_name, NameStr(update->table_name));
3321
3322 new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3323
3324 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3325 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3326 ts_catalog_restore_user(&sec_ctx);
3327 heap_freetuple(new_tuple);
3328 return SCAN_DONE;
3329 }
3330
3331 static ScanTupleResult
chunk_tuple_update_status(TupleInfo * ti,void * data)3332 chunk_tuple_update_status(TupleInfo *ti, void *data)
3333 {
3334 FormData_chunk form;
3335 FormData_chunk *update = data;
3336 CatalogSecurityContext sec_ctx;
3337 HeapTuple new_tuple;
3338
3339 chunk_formdata_fill(&form, ti);
3340 form.status = update->status;
3341 new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3342
3343 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3344 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3345 ts_catalog_restore_user(&sec_ctx);
3346 heap_freetuple(new_tuple);
3347 return SCAN_DONE;
3348 }
3349
3350 static bool
chunk_update_form(FormData_chunk * form)3351 chunk_update_form(FormData_chunk *form)
3352 {
3353 ScanKeyData scankey[1];
3354
3355 ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, form->id);
3356
3357 return chunk_scan_internal(CHUNK_ID_INDEX,
3358 scankey,
3359 1,
3360 NULL,
3361 chunk_tuple_update_schema_and_table,
3362 form,
3363 0,
3364 ForwardScanDirection,
3365 RowExclusiveLock,
3366 CurrentMemoryContext) > 0;
3367 }
3368
3369 /* update the status flag for chunk. Should not be called directly
3370 * Use chunk_update_status instead
3371 */
3372 static bool
chunk_update_status_internal(FormData_chunk * form)3373 chunk_update_status_internal(FormData_chunk *form)
3374 {
3375 ScanKeyData scankey[1];
3376
3377 ScanKeyInit(&scankey[0], Anum_chunk_idx_id, BTEqualStrategyNumber, F_INT4EQ, form->id);
3378
3379 return chunk_scan_internal(CHUNK_ID_INDEX,
3380 scankey,
3381 1,
3382 NULL,
3383 chunk_tuple_update_status,
3384 form,
3385 0,
3386 ForwardScanDirection,
3387 RowExclusiveLock,
3388 CurrentMemoryContext) > 0;
3389 }
3390
3391 /* status update is done in 2 steps.
3392 * Do the equivalent of SELECT for UPDATE, followed by UPDATE
3393 * 1. RowShare lock to read the status.
3394 * 2. if status != proposed new status
3395 * update status using RowExclusiveLock
3396 * All callers who want to update chunk status should call this function so that locks
3397 * are acquired correctly.
3398 */
3399 static bool
chunk_update_status(FormData_chunk * form)3400 chunk_update_status(FormData_chunk *form)
3401 {
3402 int32 chunk_id = form->id;
3403 int32 new_status = form->status;
3404 bool success = true, dropped = false;
3405 /* lock the chunk tuple for update. Block till we get exclusivetuplelock */
3406 ScanTupLock scantuplock = {
3407 .waitpolicy = LockWaitBlock,
3408 .lockmode = LockTupleExclusive,
3409 };
3410 ScanIterator iterator = ts_scan_iterator_create(CHUNK, RowShareLock, CurrentMemoryContext);
3411 iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
3412 iterator.ctx.tuplock = &scantuplock;
3413
3414 /* see table_tuple_lock for details about flags that are set in TupleExclusive mode */
3415 scantuplock.lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
3416 if (!IsolationUsesXactSnapshot())
3417 {
3418 /* in read committed mode, we follow all updates to this tuple */
3419 scantuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
3420 }
3421
3422 ts_scan_iterator_scan_key_init(&iterator,
3423 Anum_chunk_idx_id,
3424 BTEqualStrategyNumber,
3425 F_INT4EQ,
3426 Int32GetDatum(chunk_id));
3427
3428 ts_scanner_foreach(&iterator)
3429 {
3430 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
3431 bool dropped_isnull, status_isnull;
3432 int status;
3433
3434 dropped = DatumGetBool(slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull));
3435 Assert(!dropped_isnull);
3436 status = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_status, &status_isnull));
3437 Assert(!status_isnull);
3438 if (!dropped && status != new_status)
3439 {
3440 success = chunk_update_status_internal(form); // get RowExclusiveLock and update here
3441 }
3442 }
3443 ts_scan_iterator_close(&iterator);
3444 if (dropped)
3445 ereport(ERROR,
3446 (errcode(ERRCODE_INTERNAL_ERROR),
3447 errmsg("attempt to update status(%d) on dropped chunk %d", new_status, chunk_id)));
3448 return success;
3449 }
3450
3451 bool
ts_chunk_set_name(Chunk * chunk,const char * newname)3452 ts_chunk_set_name(Chunk *chunk, const char *newname)
3453 {
3454 namestrcpy(&chunk->fd.table_name, newname);
3455
3456 return chunk_update_form(&chunk->fd);
3457 }
3458
3459 bool
ts_chunk_set_schema(Chunk * chunk,const char * newschema)3460 ts_chunk_set_schema(Chunk *chunk, const char *newschema)
3461 {
3462 namestrcpy(&chunk->fd.schema_name, newschema);
3463
3464 return chunk_update_form(&chunk->fd);
3465 }
3466
3467 bool
ts_chunk_set_unordered(Chunk * chunk)3468 ts_chunk_set_unordered(Chunk *chunk)
3469 {
3470 return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_UNORDERED);
3471 }
3472
3473 bool
ts_chunk_add_status(Chunk * chunk,int32 status)3474 ts_chunk_add_status(Chunk *chunk, int32 status)
3475 {
3476 return ts_chunk_set_status(chunk, ts_set_flags_32(chunk->fd.status, status));
3477 }
3478
3479 bool
ts_chunk_set_status(Chunk * chunk,int32 status)3480 ts_chunk_set_status(Chunk *chunk, int32 status)
3481 {
3482 chunk->fd.status = status;
3483
3484 return chunk_update_status(&chunk->fd);
3485 }
3486
3487 /*
3488 * Setting (INVALID_CHUNK_ID, true) is valid for an Access Node. It means
3489 * the data nodes contain the actual compressed chunks, and the meta-chunk is
3490 * marked as compressed in the Access Node.
3491 * Setting (is_compressed => false) means that the chunk is uncompressed.
3492 */
3493 static ScanTupleResult
chunk_change_compressed_status_in_tuple(TupleInfo * ti,int32 compressed_chunk_id,bool is_compressed)3494 chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id,
3495 bool is_compressed)
3496 {
3497 FormData_chunk form;
3498 HeapTuple new_tuple;
3499 CatalogSecurityContext sec_ctx;
3500
3501 chunk_formdata_fill(&form, ti);
3502 if (is_compressed)
3503 {
3504 form.compressed_chunk_id = compressed_chunk_id;
3505 form.status = ts_set_flags_32(form.status, CHUNK_STATUS_COMPRESSED);
3506 }
3507 else
3508 {
3509 form.compressed_chunk_id = INVALID_CHUNK_ID;
3510 form.status =
3511 ts_clear_flags_32(form.status,
3512 CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED);
3513 }
3514 new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3515
3516 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3517 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3518 ts_catalog_restore_user(&sec_ctx);
3519 heap_freetuple(new_tuple);
3520
3521 return SCAN_DONE;
3522 }
3523
3524 static ScanTupleResult
chunk_clear_compressed_status_in_tuple(TupleInfo * ti,void * data)3525 chunk_clear_compressed_status_in_tuple(TupleInfo *ti, void *data)
3526 {
3527 return chunk_change_compressed_status_in_tuple(ti, INVALID_CHUNK_ID, false);
3528 }
3529
3530 static ScanTupleResult
chunk_set_compressed_id_in_tuple(TupleInfo * ti,void * data)3531 chunk_set_compressed_id_in_tuple(TupleInfo *ti, void *data)
3532 {
3533 int32 compressed_chunk_id = *((int32 *) data);
3534
3535 return chunk_change_compressed_status_in_tuple(ti, compressed_chunk_id, true);
3536 }
3537
3538 /*Assume permissions are already checked */
3539 bool
ts_chunk_set_compressed_chunk(Chunk * chunk,int32 compressed_chunk_id)3540 ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id)
3541 {
3542 ScanKeyData scankey[1];
3543 ScanKeyInit(&scankey[0],
3544 Anum_chunk_idx_id,
3545 BTEqualStrategyNumber,
3546 F_INT4EQ,
3547 Int32GetDatum(chunk->fd.id));
3548 return chunk_scan_internal(CHUNK_ID_INDEX,
3549 scankey,
3550 1,
3551 chunk_check_ignorearg_dropped_filter,
3552 chunk_set_compressed_id_in_tuple,
3553 &compressed_chunk_id,
3554 0,
3555 ForwardScanDirection,
3556 RowExclusiveLock,
3557 CurrentMemoryContext) > 0;
3558 }
3559
3560 /*Assume permissions are already checked */
3561 bool
ts_chunk_clear_compressed_chunk(Chunk * chunk)3562 ts_chunk_clear_compressed_chunk(Chunk *chunk)
3563 {
3564 int32 compressed_chunk_id = INVALID_CHUNK_ID;
3565 ScanKeyData scankey[1];
3566 ScanKeyInit(&scankey[0],
3567 Anum_chunk_idx_id,
3568 BTEqualStrategyNumber,
3569 F_INT4EQ,
3570 Int32GetDatum(chunk->fd.id));
3571 return chunk_scan_internal(CHUNK_ID_INDEX,
3572 scankey,
3573 1,
3574 chunk_check_ignorearg_dropped_filter,
3575 chunk_clear_compressed_status_in_tuple,
3576 &compressed_chunk_id,
3577 0,
3578 ForwardScanDirection,
3579 RowExclusiveLock,
3580 CurrentMemoryContext) > 0;
3581 }
3582
3583 /* Used as a tuple found function */
3584 static ScanTupleResult
chunk_rename_schema_name(TupleInfo * ti,void * data)3585 chunk_rename_schema_name(TupleInfo *ti, void *data)
3586 {
3587 FormData_chunk form;
3588 HeapTuple new_tuple;
3589 CatalogSecurityContext sec_ctx;
3590
3591 chunk_formdata_fill(&form, ti);
3592 /* Rename schema name */
3593 namestrcpy(&form.schema_name, (char *) data);
3594 new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
3595
3596 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
3597 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
3598 ts_catalog_restore_user(&sec_ctx);
3599 heap_freetuple(new_tuple);
3600 return SCAN_CONTINUE;
3601 }
3602
3603 /* Go through the internal chunk table and rename all matching schemas */
3604 void
ts_chunks_rename_schema_name(char * old_schema,char * new_schema)3605 ts_chunks_rename_schema_name(char *old_schema, char *new_schema)
3606 {
3607 NameData old_schema_name;
3608 ScanKeyData scankey[1];
3609 Catalog *catalog = ts_catalog_get();
3610 ScannerCtx scanctx = {
3611 .table = catalog_get_table_id(catalog, CHUNK),
3612 .index = catalog_get_index(catalog, CHUNK, CHUNK_SCHEMA_NAME_INDEX),
3613 .nkeys = 1,
3614 .scankey = scankey,
3615 .tuple_found = chunk_rename_schema_name,
3616 .data = new_schema,
3617 .lockmode = RowExclusiveLock,
3618 .scandirection = ForwardScanDirection,
3619 };
3620
3621 namestrcpy(&old_schema_name, old_schema);
3622
3623 ScanKeyInit(&scankey[0],
3624 Anum_chunk_schema_name_idx_schema_name,
3625 BTEqualStrategyNumber,
3626 F_NAMEEQ,
3627 NameGetDatum(&old_schema_name));
3628
3629 ts_scanner_scan(&scanctx);
3630 }
3631
3632 static int
chunk_cmp(const void * ch1,const void * ch2)3633 chunk_cmp(const void *ch1, const void *ch2)
3634 {
3635 const Chunk *v1 = ((const Chunk *) ch1);
3636 const Chunk *v2 = ((const Chunk *) ch2);
3637
3638 if (v1->fd.hypertable_id < v2->fd.hypertable_id)
3639 return -1;
3640 if (v1->fd.hypertable_id > v2->fd.hypertable_id)
3641 return 1;
3642 if (v1->table_id < v2->table_id)
3643 return -1;
3644 if (v1->table_id > v2->table_id)
3645 return 1;
3646 return 0;
3647 }
3648
3649 /*
3650 * This is a helper set returning function (SRF) that takes a set returning function context
3651 * and as argument and returns oids extracted from funcctx->user_fctx (which is Chunk*
3652 * array). Note that the caller needs to be registered as a set returning function for this
3653 * to work.
3654 */
3655 static Datum
chunks_return_srf(FunctionCallInfo fcinfo)3656 chunks_return_srf(FunctionCallInfo fcinfo)
3657 {
3658 FuncCallContext *funcctx;
3659 uint64 call_cntr;
3660 TupleDesc tupdesc;
3661 Chunk *result_set;
3662
3663 /* stuff done only on the first call of the function */
3664 if (SRF_IS_FIRSTCALL())
3665 {
3666 /* Build a tuple descriptor for our result type */
3667 /* not quite necessary */
3668 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_SCALAR)
3669 ereport(ERROR,
3670 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3671 errmsg("function returning record called in context "
3672 "that cannot accept type record")));
3673 }
3674
3675 /* stuff done on every call of the function */
3676 funcctx = SRF_PERCALL_SETUP();
3677
3678 call_cntr = funcctx->call_cntr;
3679 result_set = (Chunk *) funcctx->user_fctx;
3680
3681 /* do when there is more left to send */
3682 if (call_cntr < funcctx->max_calls)
3683 SRF_RETURN_NEXT(funcctx, result_set[call_cntr].table_id);
3684 else /* do when there is no more left */
3685 SRF_RETURN_DONE(funcctx);
3686 }
3687
3688 static void
ts_chunk_drop_internal(const Chunk * chunk,DropBehavior behavior,int32 log_level,bool preserve_catalog_row)3689 ts_chunk_drop_internal(const Chunk *chunk, DropBehavior behavior, int32 log_level,
3690 bool preserve_catalog_row)
3691 {
3692 ObjectAddress objaddr = {
3693 .classId = RelationRelationId,
3694 .objectId = chunk->table_id,
3695 };
3696
3697 if (log_level >= 0)
3698 elog(log_level,
3699 "dropping chunk %s.%s",
3700 chunk->fd.schema_name.data,
3701 chunk->fd.table_name.data);
3702
3703 /* Remove the chunk from the chunk table */
3704 ts_chunk_delete_by_relid(chunk->table_id, behavior, preserve_catalog_row);
3705
3706 /* Drop the table */
3707 performDeletion(&objaddr, behavior, 0);
3708 }
3709
3710 void
ts_chunk_drop(const Chunk * chunk,DropBehavior behavior,int32 log_level)3711 ts_chunk_drop(const Chunk *chunk, DropBehavior behavior, int32 log_level)
3712 {
3713 ts_chunk_drop_internal(chunk, behavior, log_level, false);
3714 }
3715
3716 void
ts_chunk_drop_preserve_catalog_row(const Chunk * chunk,DropBehavior behavior,int32 log_level)3717 ts_chunk_drop_preserve_catalog_row(const Chunk *chunk, DropBehavior behavior, int32 log_level)
3718 {
3719 ts_chunk_drop_internal(chunk, behavior, log_level, true);
3720 }
3721
3722 static void
lock_referenced_tables(Oid table_relid)3723 lock_referenced_tables(Oid table_relid)
3724 {
3725 List *fk_relids = NIL;
3726 ListCell *lf;
3727 List *cachedfkeys = NIL;
3728 Relation table_rel = table_open(table_relid, AccessShareLock);
3729
3730 /* this list is from the relcache and can disappear with a cache flush, so
3731 * no further catalog access till we save the fk relids */
3732 cachedfkeys = RelationGetFKeyList(table_rel);
3733 foreach (lf, cachedfkeys)
3734 {
3735 ForeignKeyCacheInfo *cachedfk = lfirst_node(ForeignKeyCacheInfo, lf);
3736
3737 /* conrelid should always be that of the table we're considering */
3738 Assert(cachedfk->conrelid == RelationGetRelid(table_rel));
3739 fk_relids = lappend_oid(fk_relids, cachedfk->confrelid);
3740 }
3741 table_close(table_rel, AccessShareLock);
3742 foreach (lf, fk_relids)
3743 LockRelationOid(lfirst_oid(lf), AccessExclusiveLock);
3744 }
3745
3746 List *
ts_chunk_do_drop_chunks(Hypertable * ht,int64 older_than,int64 newer_than,int32 log_level,List ** affected_data_nodes)3747 ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int32 log_level,
3748 List **affected_data_nodes)
3749
3750 {
3751 uint64 i = 0;
3752 uint64 num_chunks = 0;
3753 Chunk *chunks;
3754 List *dropped_chunk_names = NIL;
3755 const char *schema_name, *table_name;
3756 const int32 hypertable_id = ht->fd.id;
3757 bool has_continuous_aggs;
3758 List *data_nodes = NIL;
3759 const MemoryContext oldcontext = CurrentMemoryContext;
3760 ScanTupLock tuplock = {
3761 .waitpolicy = LockWaitBlock,
3762 .lockmode = LockTupleExclusive,
3763 };
3764
3765 ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());
3766
3767 /* We have a FK between hypertable H and PAR. Hypertable H has number of
3768 * chunks C1, C2, etc. When we execute "drop table C", PG acquires locks
3769 * on C and PAR. If we have a query as "select * from hypertable", this
3770 * acquires a lock on C and PAR as well. But the order of the locks is not
3771 * the same and results in deadlocks. - github issue #865 We hope to
3772 * alleviate the problem by acquiring a lock on PAR before executing the
3773 * drop table stmt. This is not fool-proof as we could have multiple
3774 * fkrelids and the order of lock acquisition for these could differ as
3775 * well. Do not unlock - let the transaction semantics take care of it. */
3776 lock_referenced_tables(ht->main_table_relid);
3777
3778 switch (ts_continuous_agg_hypertable_status(hypertable_id))
3779 {
3780 case HypertableIsMaterialization:
3781 has_continuous_aggs = false;
3782 break;
3783 case HypertableIsMaterializationAndRaw:
3784 has_continuous_aggs = true;
3785 break;
3786 case HypertableIsRawTable:
3787 has_continuous_aggs = true;
3788 break;
3789 default:
3790 has_continuous_aggs = false;
3791 break;
3792 }
3793
3794 PG_TRY();
3795 {
3796 chunks = get_chunks_in_time_range(ht,
3797 older_than,
3798 newer_than,
3799 DROP_CHUNKS_FUNCNAME,
3800 CurrentMemoryContext,
3801 &num_chunks,
3802 &tuplock);
3803 }
3804 PG_CATCH();
3805 {
3806 ErrorData *edata;
3807 MemoryContextSwitchTo(oldcontext);
3808 edata = CopyErrorData();
3809 if (edata->sqlerrcode == ERRCODE_LOCK_NOT_AVAILABLE)
3810 {
3811 FlushErrorState();
3812 edata->detail = edata->message;
3813 edata->message =
3814 psprintf("some chunks could not be read since they are being concurrently updated");
3815 }
3816 ReThrowError(edata);
3817 }
3818 PG_END_TRY();
3819
3820 DEBUG_WAITPOINT("drop_chunks_chunks_found");
3821
3822 if (has_continuous_aggs)
3823 {
3824 int i;
3825
3826 /* Exclusively lock all chunks, and invalidate the continuous
3827 * aggregates in the regions covered by the chunks. We do this in two
3828 * steps: first lock all the chunks and then invalidate the
3829 * regions. Since we are going to drop the chunks, there is no point
3830 * in allowing inserts into them.
3831 *
3832 * Locking prevents further modification of the dropped region during
3833 * this transaction, which allows moving the invalidation threshold
3834 * without having to worry about new invalidations while
3835 * refreshing. */
3836 for (i = 0; i < num_chunks; i++)
3837 {
3838 LockRelationOid(chunks[i].table_id, ExclusiveLock);
3839
3840 Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
3841 chunks[i].cube->slices[0]->fd.dimension_id);
3842 }
3843
3844 DEBUG_WAITPOINT("drop_chunks_locked");
3845
3846 /* Invalidate the dropped region to indicate that it was modified.
3847 *
3848 * The invalidation will allow the refresh command on a continuous
3849 * aggregate to see that this region was dropped and and will
3850 * therefore be able to refresh accordingly.*/
3851 for (i = 0; i < num_chunks; i++)
3852 {
3853 int64 start = ts_chunk_primary_dimension_start(&chunks[i]);
3854 int64 end = ts_chunk_primary_dimension_end(&chunks[i]);
3855
3856 ts_cm_functions->continuous_agg_invalidate(ht,
3857 HypertableIsRawTable,
3858 ht->fd.id,
3859 start,
3860 end);
3861 }
3862 }
3863
3864 for (i = 0; i < num_chunks; i++)
3865 {
3866 char *chunk_name;
3867 ListCell *lc;
3868
3869 ASSERT_IS_VALID_CHUNK(&chunks[i]);
3870
3871 /* store chunk name for output */
3872 schema_name = quote_identifier(chunks[i].fd.schema_name.data);
3873 table_name = quote_identifier(chunks[i].fd.table_name.data);
3874 chunk_name = psprintf("%s.%s", schema_name, table_name);
3875 dropped_chunk_names = lappend(dropped_chunk_names, chunk_name);
3876
3877 if (has_continuous_aggs)
3878 ts_chunk_drop_preserve_catalog_row(chunks + i, DROP_RESTRICT, log_level);
3879 else
3880 ts_chunk_drop(chunks + i, DROP_RESTRICT, log_level);
3881
3882 /* Collect a list of affected data nodes so that we know which data
3883 * nodes we need to drop chunks on */
3884 foreach (lc, chunks[i].data_nodes)
3885 {
3886 ChunkDataNode *cdn = lfirst(lc);
3887 data_nodes = list_append_unique_oid(data_nodes, cdn->foreign_server_oid);
3888 }
3889 }
3890
3891 if (affected_data_nodes)
3892 *affected_data_nodes = data_nodes;
3893
3894 DEBUG_WAITPOINT("drop_chunks_end");
3895
3896 return dropped_chunk_names;
3897 }
3898
3899 /*
3900 * This is a helper set returning function (SRF) that takes a set returning function context
3901 * and as argument and returns cstrings extracted from funcctx->user_fctx (which is a List).
3902 * Note that the caller needs to be registered as a set returning function for this to work.
3903 */
3904 static Datum
list_return_srf(FunctionCallInfo fcinfo)3905 list_return_srf(FunctionCallInfo fcinfo)
3906 {
3907 FuncCallContext *funcctx;
3908 uint64 call_cntr;
3909 TupleDesc tupdesc;
3910 List *result_set;
3911 Datum retval;
3912
3913 /* stuff done only on the first call of the function */
3914 if (SRF_IS_FIRSTCALL())
3915 {
3916 /* Build a tuple descriptor for our result type */
3917 /* not quite necessary */
3918 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_SCALAR)
3919 ereport(ERROR,
3920 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3921 errmsg("function returning record called in context "
3922 "that cannot accept type record")));
3923 }
3924
3925 /* stuff done on every call of the function */
3926 funcctx = SRF_PERCALL_SETUP();
3927
3928 call_cntr = funcctx->call_cntr;
3929 result_set = castNode(List, funcctx->user_fctx);
3930
3931 /* do when there is more left to send */
3932 if (call_cntr < funcctx->max_calls)
3933 {
3934 /* store return value and increment linked list */
3935 retval = CStringGetTextDatum(linitial(result_set));
3936 funcctx->user_fctx = list_delete_first(result_set);
3937 SRF_RETURN_NEXT(funcctx, retval);
3938 }
3939 else /* do when there is no more left */
3940 SRF_RETURN_DONE(funcctx);
3941 }
3942
3943 /*
3944 * Find either the hypertable or the materialized hypertable, if the relid is
3945 * a continuous aggregate, for the relid.
3946 *
3947 * Will error if relid is not a hypertable or view (or cannot be found) or if
3948 * it is a materialized hypertable.
3949 */
3950 static Hypertable *
find_hypertable_from_table_or_cagg(Cache * hcache,Oid relid)3951 find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid)
3952 {
3953 const char *rel_name;
3954 Hypertable *ht;
3955
3956 rel_name = get_rel_name(relid);
3957
3958 if (!rel_name)
3959 ereport(ERROR,
3960 (errcode(ERRCODE_UNDEFINED_TABLE),
3961 errmsg("invalid hypertable or continuous aggregate")));
3962
3963 ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);
3964
3965 if (ht)
3966 {
3967 const ContinuousAggHypertableStatus status = ts_continuous_agg_hypertable_status(ht->fd.id);
3968 switch (status)
3969 {
3970 case HypertableIsMaterialization:
3971 case HypertableIsMaterializationAndRaw:
3972 ereport(ERROR,
3973 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3974 errmsg("operation not supported on materialized hypertable"),
3975 errhint("Try the operation on the continuous aggregate instead."),
3976 errdetail("Hypertable \"%s\" is a materialized hypertable.", rel_name)));
3977 break;
3978
3979 default:
3980 break;
3981 }
3982 }
3983 else
3984 {
3985 ContinuousAgg *const cagg = ts_continuous_agg_find_by_relid(relid);
3986
3987 if (!cagg)
3988 ereport(ERROR,
3989 (errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
3990 errmsg("\"%s\" is not a hypertable or a continuous aggregate", rel_name),
3991 errhint("The operation is only possible on a hypertable or continuous"
3992 " aggregate.")));
3993
3994 ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
3995
3996 if (!ht)
3997 ereport(ERROR,
3998 (errcode(ERRCODE_TS_INTERNAL_ERROR),
3999 errmsg("no materialized table for continuous aggregate"),
4000 errdetail("Continuous aggregate \"%s\" had a materialized hypertable"
4001 " with id %d but it was not found in the hypertable "
4002 "catalog.",
4003 rel_name,
4004 cagg->data.mat_hypertable_id)));
4005 }
4006
4007 return ht;
4008 }
4009
4010 Datum
ts_chunk_drop_chunks(PG_FUNCTION_ARGS)4011 ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
4012 {
4013 MemoryContext oldcontext;
4014 FuncCallContext *funcctx;
4015 Hypertable *ht;
4016 List *dc_temp = NIL;
4017 List *dc_names = NIL;
4018 Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
4019 int64 older_than = PG_INT64_MAX;
4020 int64 newer_than = PG_INT64_MIN;
4021 bool verbose;
4022 int elevel;
4023 List *data_node_oids = NIL;
4024 Cache *hcache;
4025 const Dimension *time_dim;
4026 Oid time_type;
4027
4028 TS_PREVENT_FUNC_IF_READ_ONLY();
4029
4030 /*
4031 * When past the first call of the SRF, dropping has already been completed,
4032 * so we just return the next chunk in the list of dropped chunks.
4033 */
4034 if (!SRF_IS_FIRSTCALL())
4035 return list_return_srf(fcinfo);
4036
4037 if (PG_ARGISNULL(0))
4038 ereport(ERROR,
4039 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4040 errmsg("invalid hypertable or continuous aggregate"),
4041 errhint("Specify a hypertable or continuous aggregate.")));
4042
4043 if (PG_ARGISNULL(1) && PG_ARGISNULL(2))
4044 ereport(ERROR,
4045 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4046 errmsg("invalid time range for dropping chunks"),
4047 errhint("At least one of older_than and newer_than must be provided.")));
4048
4049 /* Find either the hypertable or view, or error out if the relid is
4050 * neither.
4051 *
4052 * We should improve the printout since it can either be a proper relid
4053 * that does not refer to a hypertable or a continuous aggregate, or a
4054 * relid that does not refer to anything at all. */
4055 hcache = ts_hypertable_cache_pin();
4056 ht = find_hypertable_from_table_or_cagg(hcache, relid);
4057 Assert(ht != NULL);
4058 time_dim = hyperspace_get_open_dimension(ht->space, 0);
4059 Assert(time_dim != NULL);
4060 time_type = ts_dimension_get_partition_type(time_dim);
4061
4062 if (!PG_ARGISNULL(1))
4063 older_than = ts_time_value_from_arg(PG_GETARG_DATUM(1),
4064 get_fn_expr_argtype(fcinfo->flinfo, 1),
4065 time_type);
4066
4067 if (!PG_ARGISNULL(2))
4068 newer_than = ts_time_value_from_arg(PG_GETARG_DATUM(2),
4069 get_fn_expr_argtype(fcinfo->flinfo, 2),
4070 time_type);
4071
4072 verbose = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
4073 elevel = verbose ? INFO : DEBUG2;
4074
4075 /* Initial multi function call setup */
4076 funcctx = SRF_FIRSTCALL_INIT();
4077
4078 /* Drop chunks and store their names for return */
4079 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
4080
4081 PG_TRY();
4082 {
4083 dc_temp = ts_chunk_do_drop_chunks(ht, older_than, newer_than, elevel, &data_node_oids);
4084 }
4085 PG_CATCH();
4086 {
4087 /* An error is raised if there are dependent objects, but the original
4088 * message is not very helpful in suggesting that you should use
4089 * CASCADE (we don't support it), so we replace the hint with a more
4090 * accurate hint for our situation. */
4091 ErrorData *edata;
4092
4093 MemoryContextSwitchTo(oldcontext);
4094 edata = CopyErrorData();
4095 FlushErrorState();
4096
4097 if (edata->sqlerrcode == ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST)
4098 edata->hint = pstrdup("Use DROP ... to drop the dependent objects.");
4099
4100 ts_cache_release(hcache);
4101 ReThrowError(edata);
4102 }
4103 PG_END_TRY();
4104 ts_cache_release(hcache);
4105 dc_names = list_concat(dc_names, dc_temp);
4106
4107 MemoryContextSwitchTo(oldcontext);
4108
4109 if (data_node_oids != NIL)
4110 ts_cm_functions->func_call_on_data_nodes(fcinfo, data_node_oids);
4111
4112 /* store data for multi function call */
4113 funcctx->max_calls = list_length(dc_names);
4114 funcctx->user_fctx = dc_names;
4115
4116 return list_return_srf(fcinfo);
4117 }
4118
4119 /**
4120 * This function is used to explicitly specify chunks that are being scanned. It's being
4121 * processed in the planning phase and removed from the query tree. This means that the
4122 * actual function implementation will only be executed if something went wrong during
4123 * explicit chunk exclusion.
4124 */
4125 Datum
ts_chunks_in(PG_FUNCTION_ARGS)4126 ts_chunks_in(PG_FUNCTION_ARGS)
4127 {
4128 const char *funcname = get_func_name(FC_FN_OID(fcinfo));
4129
4130 ereport(ERROR,
4131 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4132 errmsg("illegal invocation of %s function", funcname),
4133 errhint("The %s function must appear in the WHERE clause and can only"
4134 " be combined with AND operator.",
4135 funcname)));
4136 pg_unreachable();
4137 }
4138
4139 /* Return the compression status for the chunk
4140 */
4141 ChunkCompressionStatus
ts_chunk_get_compression_status(int32 chunk_id)4142 ts_chunk_get_compression_status(int32 chunk_id)
4143 {
4144 ChunkCompressionStatus st = CHUNK_COMPRESS_NONE;
4145 ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
4146 iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
4147 ts_scan_iterator_scan_key_init(&iterator,
4148 Anum_chunk_idx_id,
4149 BTEqualStrategyNumber,
4150 F_INT4EQ,
4151 Int32GetDatum(chunk_id));
4152
4153 ts_scanner_foreach(&iterator)
4154 {
4155 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
4156 bool dropped_isnull, status_isnull;
4157 Datum status;
4158
4159 bool dropped = DatumGetBool(slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull));
4160 Assert(!dropped_isnull);
4161
4162 status = slot_getattr(ti->slot, Anum_chunk_status, &status_isnull);
4163 Assert(!status_isnull);
4164 /* Note that dropped attribute takes precedence over everything else.
4165 * We should not check status attribute for dropped chunks
4166 */
4167 if (!dropped)
4168 {
4169 bool status_is_compressed =
4170 ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
4171 bool status_is_unordered =
4172 ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_UNORDERED);
4173 if (status_is_compressed)
4174 {
4175 if (status_is_unordered)
4176 st = CHUNK_COMPRESS_UNORDERED;
4177 else
4178 st = CHUNK_COMPRESS_ORDERED;
4179 }
4180 else
4181 {
4182 Assert(!status_is_unordered);
4183 st = CHUNK_COMPRESS_NONE;
4184 }
4185 }
4186 else
4187 st = CHUNK_DROPPED;
4188 }
4189 ts_scan_iterator_close(&iterator);
4190 return st;
4191 }
4192
4193 /*Note that only a compressed chunk can have unordered flag set */
4194 bool
ts_chunk_is_unordered(const Chunk * chunk)4195 ts_chunk_is_unordered(const Chunk *chunk)
4196 {
4197 return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_UNORDERED);
4198 }
4199
4200 bool
ts_chunk_is_compressed(const Chunk * chunk)4201 ts_chunk_is_compressed(const Chunk *chunk)
4202 {
4203 return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
4204 }
4205
4206 bool
ts_chunk_is_uncompressed_or_unordered(const Chunk * chunk)4207 ts_chunk_is_uncompressed_or_unordered(const Chunk *chunk)
4208 {
4209 return (ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_UNORDERED) ||
4210 !ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED));
4211 }
4212
4213 Datum
ts_chunk_show(PG_FUNCTION_ARGS)4214 ts_chunk_show(PG_FUNCTION_ARGS)
4215 {
4216 return ts_cm_functions->show_chunk(fcinfo);
4217 }
4218
4219 Datum
ts_chunk_create(PG_FUNCTION_ARGS)4220 ts_chunk_create(PG_FUNCTION_ARGS)
4221 {
4222 return ts_cm_functions->create_chunk(fcinfo);
4223 }
4224