1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include "scanner.h"
7 #include <postgres.h>
8 #include <access/heapam.h>
9 #include <access/xact.h>
10 #include <catalog/dependency.h>
11 #include <catalog/indexing.h>
12 #include <catalog/objectaddress.h>
13 #include <catalog/pg_constraint.h>
14 #include <commands/tablecmds.h>
15 #include <funcapi.h>
16 #include <nodes/makefuncs.h>
17 #include <utils/builtins.h>
18 #include <utils/hsearch.h>
19 #include <utils/lsyscache.h>
20 #include <utils/relcache.h>
21 #include <utils/rel.h>
22 #include <utils/syscache.h>
23
24 #include "compat/compat.h"
25 #include "export.h"
26 #include "scan_iterator.h"
27 #include "chunk_constraint.h"
28 #include "chunk_index.h"
29 #include "constraint.h"
30 #include "dimension_vector.h"
31 #include "dimension_slice.h"
32 #include "hypercube.h"
33 #include "chunk.h"
34 #include "hypertable.h"
35 #include "errors.h"
36 #include "process_utility.h"
37
38 #define DEFAULT_EXTRA_CONSTRAINTS_SIZE 4
39
40 #define CHUNK_CONSTRAINTS_SIZE(num_constraints) (sizeof(ChunkConstraint) * (num_constraints))
41
42 ChunkConstraints *
ts_chunk_constraints_alloc(int size_hint,MemoryContext mctx)43 ts_chunk_constraints_alloc(int size_hint, MemoryContext mctx)
44 {
45 ChunkConstraints *ccs = MemoryContextAlloc(mctx, sizeof(ChunkConstraints));
46
47 ccs->mctx = mctx;
48 ccs->capacity = size_hint + DEFAULT_EXTRA_CONSTRAINTS_SIZE;
49 ccs->num_constraints = 0;
50 ccs->num_dimension_constraints = 0;
51 ccs->constraints = MemoryContextAllocZero(mctx, CHUNK_CONSTRAINTS_SIZE(ccs->capacity));
52
53 return ccs;
54 }
55
56 ChunkConstraints *
ts_chunk_constraints_copy(ChunkConstraints * ccs)57 ts_chunk_constraints_copy(ChunkConstraints *ccs)
58 {
59 ChunkConstraints *copy = palloc(sizeof(ChunkConstraints));
60
61 memcpy(copy, ccs, sizeof(ChunkConstraints));
62 copy->constraints = palloc0(CHUNK_CONSTRAINTS_SIZE(ccs->capacity));
63 memcpy(copy->constraints, ccs->constraints, CHUNK_CONSTRAINTS_SIZE(ccs->num_constraints));
64
65 return copy;
66 }
67
68 static void
chunk_constraints_expand(ChunkConstraints * ccs,int16 new_capacity)69 chunk_constraints_expand(ChunkConstraints *ccs, int16 new_capacity)
70 {
71 MemoryContext old;
72
73 if (new_capacity <= ccs->capacity)
74 return;
75
76 old = MemoryContextSwitchTo(ccs->mctx);
77 ccs->capacity = new_capacity;
78 Assert(ccs->constraints); /* repalloc() does not work with NULL argument */
79 ccs->constraints = repalloc(ccs->constraints, CHUNK_CONSTRAINTS_SIZE(new_capacity));
80 MemoryContextSwitchTo(old);
81 }
82
83 static void
chunk_constraint_dimension_choose_name(Name dst,int32 dimension_slice_id)84 chunk_constraint_dimension_choose_name(Name dst, int32 dimension_slice_id)
85 {
86 snprintf(NameStr(*dst), NAMEDATALEN, "constraint_%d", dimension_slice_id);
87 }
88
89 static void
chunk_constraint_choose_name(Name dst,const char * hypertable_constraint_name,int32 chunk_id)90 chunk_constraint_choose_name(Name dst, const char *hypertable_constraint_name, int32 chunk_id)
91 {
92 char constrname[NAMEDATALEN];
93 CatalogSecurityContext sec_ctx;
94
95 Assert(hypertable_constraint_name != NULL);
96
97 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
98 snprintf(constrname,
99 NAMEDATALEN,
100 "%d_" INT64_FORMAT "_%s",
101 chunk_id,
102 ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_CONSTRAINT),
103 hypertable_constraint_name);
104 ts_catalog_restore_user(&sec_ctx);
105
106 namestrcpy(dst, constrname);
107 }
108
109 static ChunkConstraint *
chunk_constraints_add(ChunkConstraints * ccs,int32 chunk_id,int32 dimension_slice_id,const char * constraint_name,const char * hypertable_constraint_name)110 chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id, int32 dimension_slice_id,
111 const char *constraint_name, const char *hypertable_constraint_name)
112 {
113 ChunkConstraint *cc;
114
115 chunk_constraints_expand(ccs, ccs->num_constraints + 1);
116 cc = &ccs->constraints[ccs->num_constraints++];
117 cc->fd.chunk_id = chunk_id;
118 cc->fd.dimension_slice_id = dimension_slice_id;
119
120 if (NULL == constraint_name)
121 {
122 if (is_dimension_constraint(cc))
123 {
124 chunk_constraint_dimension_choose_name(&cc->fd.constraint_name,
125 cc->fd.dimension_slice_id);
126 namestrcpy(&cc->fd.hypertable_constraint_name, "");
127 }
128 else
129 chunk_constraint_choose_name(&cc->fd.constraint_name,
130 hypertable_constraint_name,
131 cc->fd.chunk_id);
132 }
133 else
134 namestrcpy(&cc->fd.constraint_name, constraint_name);
135
136 if (NULL != hypertable_constraint_name)
137 namestrcpy(&cc->fd.hypertable_constraint_name, hypertable_constraint_name);
138
139 if (is_dimension_constraint(cc))
140 ccs->num_dimension_constraints++;
141
142 return cc;
143 }
144
145 static void
chunk_constraint_fill_tuple_values(const ChunkConstraint * cc,Datum values[Natts_chunk_constraint],bool nulls[Natts_chunk_constraint])146 chunk_constraint_fill_tuple_values(const ChunkConstraint *cc, Datum values[Natts_chunk_constraint],
147 bool nulls[Natts_chunk_constraint])
148 {
149 memset(values, 0, sizeof(Datum) * Natts_chunk_constraint);
150 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)] =
151 Int32GetDatum(cc->fd.chunk_id);
152 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)] =
153 Int32GetDatum(cc->fd.dimension_slice_id);
154 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
155 NameGetDatum(&cc->fd.constraint_name);
156 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
157 NameGetDatum(&cc->fd.hypertable_constraint_name);
158
159 if (is_dimension_constraint(cc))
160 nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
161 else
162 nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)] = true;
163 }
164
165 static void
chunk_constraint_insert_relation(const Relation rel,const ChunkConstraint * cc)166 chunk_constraint_insert_relation(const Relation rel, const ChunkConstraint *cc)
167 {
168 TupleDesc desc = RelationGetDescr(rel);
169 Datum values[Natts_chunk_constraint];
170 bool nulls[Natts_chunk_constraint] = { false };
171
172 chunk_constraint_fill_tuple_values(cc, values, nulls);
173 ts_catalog_insert_values(rel, desc, values, nulls);
174 }
175
176 /*
177 * Insert multiple chunk constraints into the metadata catalog.
178 */
179 void
ts_chunk_constraints_insert_metadata(const ChunkConstraints * ccs)180 ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs)
181 {
182 Catalog *catalog = ts_catalog_get();
183 CatalogSecurityContext sec_ctx;
184 Relation rel;
185 int i;
186
187 rel = table_open(catalog_get_table_id(catalog, CHUNK_CONSTRAINT), RowExclusiveLock);
188
189 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
190
191 for (i = 0; i < ccs->num_constraints; i++)
192 chunk_constraint_insert_relation(rel, &ccs->constraints[i]);
193
194 ts_catalog_restore_user(&sec_ctx);
195 table_close(rel, RowExclusiveLock);
196 }
197
198 /*
199 * Insert a single chunk constraints into the metadata catalog.
200 */
201 static void
chunk_constraint_insert(ChunkConstraint * constraint)202 chunk_constraint_insert(ChunkConstraint *constraint)
203 {
204 Catalog *catalog = ts_catalog_get();
205 CatalogSecurityContext sec_ctx;
206 Relation rel;
207
208 rel = table_open(catalog_get_table_id(catalog, CHUNK_CONSTRAINT), RowExclusiveLock);
209
210 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
211 chunk_constraint_insert_relation(rel, constraint);
212 ts_catalog_restore_user(&sec_ctx);
213 table_close(rel, RowExclusiveLock);
214 }
215
216 static ChunkConstraint *
chunk_constraints_add_from_tuple(ChunkConstraints * ccs,TupleInfo * ti)217 chunk_constraints_add_from_tuple(ChunkConstraints *ccs, TupleInfo *ti)
218 {
219 bool nulls[Natts_chunk_constraint];
220 Datum values[Natts_chunk_constraint];
221 ChunkConstraint *constraints;
222 int32 dimension_slice_id;
223 Name constraint_name;
224 Name hypertable_constraint_name;
225 bool should_free;
226 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
227
228 heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
229
230 constraint_name =
231 DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)]);
232
233 if (nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)])
234 {
235 dimension_slice_id = 0;
236 hypertable_constraint_name = DatumGetName(
237 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)]);
238 }
239 else
240 {
241 dimension_slice_id = DatumGetInt32(
242 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)]);
243 hypertable_constraint_name = DatumGetName(DirectFunctionCall1(namein, CStringGetDatum("")));
244 }
245
246 constraints =
247 chunk_constraints_add(ccs,
248 DatumGetInt32(
249 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)]),
250 dimension_slice_id,
251 NameStr(*constraint_name),
252 NameStr(*hypertable_constraint_name));
253
254 if (should_free)
255 heap_freetuple(tuple);
256
257 return constraints;
258 }
259
260 /*
261 * Add a constraint to a chunk table.
262 */
263 static Oid
chunk_constraint_create_on_table(const ChunkConstraint * cc,Oid chunk_oid)264 chunk_constraint_create_on_table(const ChunkConstraint *cc, Oid chunk_oid)
265 {
266 HeapTuple tuple;
267 Datum values[Natts_chunk_constraint];
268 bool nulls[Natts_chunk_constraint] = { false };
269 CatalogSecurityContext sec_ctx;
270 Relation rel;
271
272 chunk_constraint_fill_tuple_values(cc, values, nulls);
273
274 rel = RelationIdGetRelation(catalog_get_table_id(ts_catalog_get(), CHUNK_CONSTRAINT));
275 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
276 RelationClose(rel);
277
278 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
279 CatalogInternalCall1(DDL_ADD_CHUNK_CONSTRAINT, HeapTupleGetDatum(tuple));
280 ts_catalog_restore_user(&sec_ctx);
281 heap_freetuple(tuple);
282
283 return get_relation_constraint_oid(chunk_oid, NameStr(cc->fd.constraint_name), true);
284 }
285
286 /*
287 * Create a constraint on a chunk table, including adding relevant metadata to
288 * the catalog.
289 */
290 static Oid
chunk_constraint_create(const ChunkConstraint * cc,Oid chunk_oid,int32 chunk_id,Oid hypertable_oid,int32 hypertable_id)291 chunk_constraint_create(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id,
292 Oid hypertable_oid, int32 hypertable_id)
293 {
294 Oid chunk_constraint_oid;
295
296 ts_process_utility_set_expect_chunk_modification(true);
297 chunk_constraint_oid = chunk_constraint_create_on_table(cc, chunk_oid);
298 ts_process_utility_set_expect_chunk_modification(false);
299
300 /*
301 * The table constraint might not have been created if this constraint
302 * corresponds to a dimension slice that covers the entire range of values
303 * in the particular dimension. In that case, there is no need to add a
304 * table constraint.
305 */
306 if (!OidIsValid(chunk_constraint_oid))
307 return InvalidOid;
308
309 if (!is_dimension_constraint(cc))
310 {
311 Oid hypertable_constraint_oid =
312 get_relation_constraint_oid(hypertable_oid,
313 NameStr(cc->fd.hypertable_constraint_name),
314 false);
315 HeapTuple tuple = SearchSysCache1(CONSTROID, hypertable_constraint_oid);
316
317 if (HeapTupleIsValid(tuple))
318 {
319 FormData_pg_constraint *constr = (FormData_pg_constraint *) GETSTRUCT(tuple);
320
321 if (OidIsValid(constr->conindid) && constr->contype != CONSTRAINT_FOREIGN)
322 ts_chunk_index_create_from_constraint(hypertable_id,
323 hypertable_constraint_oid,
324 chunk_id,
325 chunk_constraint_oid);
326
327 ReleaseSysCache(tuple);
328 }
329 }
330
331 return chunk_constraint_oid;
332 }
333
334 /*
335 * Create a set of constraints on a chunk table.
336 */
337 void
ts_chunk_constraints_create(const ChunkConstraints * ccs,Oid chunk_oid,int32 chunk_id,Oid hypertable_oid,int32 hypertable_id)338 ts_chunk_constraints_create(const ChunkConstraints *ccs, Oid chunk_oid, int32 chunk_id,
339 Oid hypertable_oid, int32 hypertable_id)
340 {
341 int i;
342
343 for (i = 0; i < ccs->num_constraints; i++)
344 chunk_constraint_create(&ccs->constraints[i],
345 chunk_oid,
346 chunk_id,
347 hypertable_oid,
348 hypertable_id);
349 }
350
351 static void
init_scan_by_chunk_id(ScanIterator * iterator,int32 chunk_id)352 init_scan_by_chunk_id(ScanIterator *iterator, int32 chunk_id)
353 {
354 iterator->ctx.index = catalog_get_index(ts_catalog_get(),
355 CHUNK_CONSTRAINT,
356 CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
357 ts_scan_iterator_scan_key_init(iterator,
358 Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_chunk_id,
359 BTEqualStrategyNumber,
360 F_INT4EQ,
361 Int32GetDatum(chunk_id));
362 }
363
364 static void
init_scan_by_dimension_slice_id(ScanIterator * iterator,int32 dimension_slice_id)365 init_scan_by_dimension_slice_id(ScanIterator *iterator, int32 dimension_slice_id)
366 {
367 iterator->ctx.index = catalog_get_index(ts_catalog_get(),
368 CHUNK_CONSTRAINT,
369 CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
370
371 ts_scan_iterator_scan_key_init(
372 iterator,
373 Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_dimension_slice_id,
374 BTEqualStrategyNumber,
375 F_INT4EQ,
376 Int32GetDatum(dimension_slice_id));
377 }
378
379 static void
init_scan_by_chunk_id_constraint_name(ScanIterator * iterator,int32 chunk_id,const char * constraint_name)380 init_scan_by_chunk_id_constraint_name(ScanIterator *iterator, int32 chunk_id,
381 const char *constraint_name)
382 {
383 iterator->ctx.index = catalog_get_index(ts_catalog_get(),
384 CHUNK_CONSTRAINT,
385 CHUNK_CONSTRAINT_CHUNK_ID_CONSTRAINT_NAME_IDX);
386
387 ts_scan_iterator_scan_key_init(iterator,
388 Anum_chunk_constraint_chunk_id_constraint_name_idx_chunk_id,
389 BTEqualStrategyNumber,
390 F_INT4EQ,
391 Int32GetDatum(chunk_id));
392
393 ts_scan_iterator_scan_key_init(
394 iterator,
395 Anum_chunk_constraint_chunk_id_constraint_name_idx_constraint_name,
396 BTEqualStrategyNumber,
397 F_NAMEEQ,
398 CStringGetDatum(constraint_name));
399 }
400
401 /*
402 * Scan all the chunk's constraints given its chunk ID.
403 *
404 * Returns a set of chunk constraints.
405 */
406 ChunkConstraints *
ts_chunk_constraint_scan_by_chunk_id(int32 chunk_id,Size num_constraints_hint,MemoryContext mctx)407 ts_chunk_constraint_scan_by_chunk_id(int32 chunk_id, Size num_constraints_hint, MemoryContext mctx)
408 {
409 ChunkConstraints *constraints = ts_chunk_constraints_alloc(num_constraints_hint, mctx);
410 ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
411 int num_found = 0;
412
413 init_scan_by_chunk_id(&iterator, chunk_id);
414 ts_scanner_foreach(&iterator)
415 {
416 num_found++;
417 chunk_constraints_add_from_tuple(constraints, ts_scan_iterator_tuple_info(&iterator));
418 }
419
420 if (num_found != constraints->num_constraints)
421 elog(ERROR, "unexpected number of constraints found for chunk ID %d", chunk_id);
422
423 return constraints;
424 }
425
426 typedef struct ChunkConstraintScanData
427 {
428 ChunkScanCtx *scanctx;
429 DimensionSlice *slice;
430 } ChunkConstraintScanData;
431
432 /*
433 * Scan for all chunk constraints that match the given slice ID. The chunk
434 * constraints are saved in the chunk scan context.
435 */
436 int
ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice * slice,ChunkScanCtx * ctx,MemoryContext mctx)437 ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *slice, ChunkScanCtx *ctx,
438 MemoryContext mctx)
439 {
440 ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
441 int count = 0;
442
443 init_scan_by_dimension_slice_id(&iterator, slice->fd.id);
444 ts_scanner_foreach(&iterator)
445 {
446 const Hyperspace *hs = ctx->space;
447 ChunkStub *stub;
448 ChunkScanEntry *entry;
449 bool found;
450 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
451 Datum datum = slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &found);
452 int32 chunk_id = DatumGetInt32(datum);
453
454 if (slot_attisnull(ts_scan_iterator_slot(&iterator),
455 Anum_chunk_constraint_dimension_slice_id))
456 continue;
457
458 count++;
459
460 Assert(!slot_attisnull(ti->slot, Anum_chunk_constraint_dimension_slice_id));
461
462 entry = hash_search(ctx->htab, &chunk_id, HASH_ENTER, &found);
463
464 if (!found)
465 {
466 stub = ts_chunk_stub_create(chunk_id, hs->num_dimensions);
467 stub->cube = ts_hypercube_alloc(hs->num_dimensions);
468 entry->stub = stub;
469 }
470 else
471 stub = entry->stub;
472
473 chunk_constraints_add_from_tuple(stub->constraints, ti);
474
475 ts_hypercube_add_slice(stub->cube, slice);
476
477 /* A stub is complete when we've added slices for all its dimensions,
478 * i.e., a complete hypercube */
479 if (chunk_stub_is_complete(stub, ctx->space))
480 {
481 ctx->num_complete_chunks++;
482
483 if (ctx->early_abort)
484 {
485 ts_scan_iterator_close(&iterator);
486 break;
487 }
488 }
489 }
490 return count;
491 }
492
493 /*
494 * Similar to chunk_constraint_scan_by_dimension_slice, but stores only chunk_ids
495 * in a list, which is easier to traverse and provides deterministic chunk selection.
496 */
497 int
ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice * slice,List ** list,MemoryContext mctx)498 ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice *slice, List **list,
499 MemoryContext mctx)
500 {
501 ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
502 int count = 0;
503
504 init_scan_by_dimension_slice_id(&iterator, slice->fd.id);
505 ts_scanner_foreach(&iterator)
506 {
507 bool is_null;
508 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
509 Datum chunk_id;
510
511 if (slot_attisnull(ti->slot, Anum_chunk_constraint_dimension_slice_id))
512 continue;
513
514 count++;
515 chunk_id = slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &is_null);
516 Assert(!is_null);
517 *list = lappend_int(*list, DatumGetInt32(chunk_id));
518 }
519 return count;
520 }
521
522 /*
523 * Scan for chunk constraints given a dimension slice ID.
524 *
525 * Optionally, collect all chunk constraints if ChunkConstraints is non-NULL.
526 */
527 int
ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,ChunkConstraints * ccs,MemoryContext mctx)528 ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id, ChunkConstraints *ccs,
529 MemoryContext mctx)
530 {
531 ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
532 int count = 0;
533
534 init_scan_by_dimension_slice_id(&iterator, dimension_slice_id);
535 ts_scanner_foreach(&iterator)
536 {
537 if (slot_attisnull(ts_scan_iterator_slot(&iterator),
538 Anum_chunk_constraint_dimension_slice_id))
539 continue;
540
541 count++;
542 if (ccs != NULL)
543 chunk_constraints_add_from_tuple(ccs, ts_scan_iterator_tuple_info(&iterator));
544 }
545 return count;
546 }
547
548 static bool
chunk_constraint_need_on_chunk(const char chunk_relkind,Form_pg_constraint conform)549 chunk_constraint_need_on_chunk(const char chunk_relkind, Form_pg_constraint conform)
550 {
551 if (conform->contype == CONSTRAINT_CHECK)
552 {
553 /*
554 * check and not null constraints handled by regular inheritance (from
555 * docs): All check constraints and not-null constraints on a parent
556 * table are automatically inherited by its children, unless
557 * explicitly specified otherwise with NO INHERIT clauses. Other types
558 * of constraints (unique, primary key, and foreign key constraints)
559 * are not inherited."
560 */
561 return false;
562 }
563
564 /* Foreign tables do not support non-check constraints, so skip them */
565 if (chunk_relkind == RELKIND_FOREIGN_TABLE)
566 return false;
567
568 return true;
569 }
570
571 int
ts_chunk_constraints_add_dimension_constraints(ChunkConstraints * ccs,int32 chunk_id,const Hypercube * cube)572 ts_chunk_constraints_add_dimension_constraints(ChunkConstraints *ccs, int32 chunk_id,
573 const Hypercube *cube)
574 {
575 int i;
576
577 for (i = 0; i < cube->num_slices; i++)
578 chunk_constraints_add(ccs, chunk_id, cube->slices[i]->fd.id, NULL, NULL);
579
580 return cube->num_slices;
581 }
582
583 typedef struct ConstraintContext
584 {
585 int num_added;
586 char chunk_relkind;
587 ChunkConstraints *ccs;
588 int32 chunk_id;
589 } ConstraintContext;
590
591 static ConstraintProcessStatus
chunk_constraint_add(HeapTuple constraint_tuple,void * arg)592 chunk_constraint_add(HeapTuple constraint_tuple, void *arg)
593 {
594 ConstraintContext *cc = arg;
595 Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(constraint_tuple);
596
597 if (chunk_constraint_need_on_chunk(cc->chunk_relkind, constraint))
598 {
599 chunk_constraints_add(cc->ccs, cc->chunk_id, 0, NULL, NameStr(constraint->conname));
600 return CONSTR_PROCESSED;
601 }
602
603 return CONSTR_IGNORED;
604 }
605
606 int
ts_chunk_constraints_add_inheritable_constraints(ChunkConstraints * ccs,int32 chunk_id,const char chunk_relkind,Oid hypertable_oid)607 ts_chunk_constraints_add_inheritable_constraints(ChunkConstraints *ccs, int32 chunk_id,
608 const char chunk_relkind, Oid hypertable_oid)
609 {
610 ConstraintContext cc = {
611 .chunk_relkind = chunk_relkind,
612 .ccs = ccs,
613 .chunk_id = chunk_id,
614 };
615
616 return ts_constraint_process(hypertable_oid, chunk_constraint_add, &cc);
617 }
618
619 void
ts_chunk_constraint_create_on_chunk(const Chunk * chunk,Oid constraint_oid)620 ts_chunk_constraint_create_on_chunk(const Chunk *chunk, Oid constraint_oid)
621 {
622 HeapTuple tuple;
623 Form_pg_constraint con;
624
625 tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraint_oid));
626
627 if (!HeapTupleIsValid(tuple))
628 elog(ERROR, "cache lookup failed for constraint %u", constraint_oid);
629
630 con = (Form_pg_constraint) GETSTRUCT(tuple);
631
632 if (chunk_constraint_need_on_chunk(chunk->relkind, con))
633 {
634 ChunkConstraint *cc =
635 chunk_constraints_add(chunk->constraints, chunk->fd.id, 0, NULL, NameStr(con->conname));
636
637 chunk_constraint_insert(cc);
638
639 chunk_constraint_create(cc,
640 chunk->table_id,
641 chunk->fd.id,
642 chunk->hypertable_relid,
643 chunk->fd.hypertable_id);
644 }
645
646 ReleaseSysCache(tuple);
647 }
648
649 static bool
hypertable_constraint_matches_tuple(TupleInfo * ti,const char * hypertable_constraint_name)650 hypertable_constraint_matches_tuple(TupleInfo *ti, const char *hypertable_constraint_name)
651 {
652 bool isnull;
653 Datum name = slot_getattr(ti->slot, Anum_chunk_constraint_hypertable_constraint_name, &isnull);
654
655 return !isnull && namestrcmp(DatumGetName(name), hypertable_constraint_name) == 0;
656 }
657
658 static void
chunk_constraint_delete_metadata(TupleInfo * ti)659 chunk_constraint_delete_metadata(TupleInfo *ti)
660 {
661 bool isnull;
662 Datum constrname = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
663 int32 chunk_id = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &isnull));
664 /* Get the chunk relid. Note that, at this point, the chunk table can be
665 * deleted already */
666 Oid chunk_relid = ts_chunk_get_relid(chunk_id, true);
667
668 if (OidIsValid(chunk_relid))
669 {
670 Oid index_relid = get_constraint_index(
671 get_relation_constraint_oid(chunk_relid, NameStr(*DatumGetName(constrname)), true));
672
673 /*
674 * If this is an index constraint, we need to cleanup the index
675 * metadata. Don't drop the index though, since that will happend when
676 * the constraint is dropped.
677 */
678 if (OidIsValid(index_relid))
679 ts_chunk_index_delete(chunk_id, get_rel_name(index_relid), false);
680 }
681
682 ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
683 }
684
685 static void
chunk_constraint_drop_constraint(TupleInfo * ti)686 chunk_constraint_drop_constraint(TupleInfo *ti)
687 {
688 bool isnull;
689 Datum constrname = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
690 int32 chunk_id = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &isnull));
691 /* Get the chunk relid. Note that, at this point, the chunk table can be
692 * deleted already. */
693 Oid chunk_relid = ts_chunk_get_relid(chunk_id, true);
694
695 if (OidIsValid(chunk_relid))
696 {
697 ObjectAddress constrobj = {
698 .classId = ConstraintRelationId,
699 .objectId =
700 get_relation_constraint_oid(chunk_relid, NameStr(*DatumGetName(constrname)), true),
701 };
702
703 if (OidIsValid(constrobj.objectId))
704 performDeletion(&constrobj, DROP_RESTRICT, 0);
705 }
706 }
707
708 int
ts_chunk_constraint_delete_by_hypertable_constraint_name(int32 chunk_id,const char * hypertable_constraint_name,bool delete_metadata,bool drop_constraint)709 ts_chunk_constraint_delete_by_hypertable_constraint_name(int32 chunk_id,
710 const char *hypertable_constraint_name,
711 bool delete_metadata, bool drop_constraint)
712 {
713 ScanIterator iterator =
714 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
715 int count = 0;
716
717 init_scan_by_chunk_id(&iterator, chunk_id);
718 ts_scanner_foreach(&iterator)
719 {
720 if (!hypertable_constraint_matches_tuple(ts_scan_iterator_tuple_info(&iterator),
721 hypertable_constraint_name))
722 continue;
723
724 count++;
725
726 if (delete_metadata)
727 chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
728
729 if (drop_constraint)
730 chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
731 }
732 return count;
733 }
734
735 int
ts_chunk_constraint_delete_by_constraint_name(int32 chunk_id,const char * constraint_name,bool delete_metadata,bool drop_constraint)736 ts_chunk_constraint_delete_by_constraint_name(int32 chunk_id, const char *constraint_name,
737 bool delete_metadata, bool drop_constraint)
738 {
739 ScanIterator iterator =
740 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
741 int count = 0;
742
743 init_scan_by_chunk_id_constraint_name(&iterator, chunk_id, constraint_name);
744 ts_scanner_foreach(&iterator)
745 {
746 count++;
747 if (delete_metadata)
748 chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
749
750 if (drop_constraint)
751 chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
752 }
753 return count;
754 }
755
756 /*
757 * Delete all constraints for a chunk. Optionally, collect the deleted constraints.
758 */
759 int
ts_chunk_constraint_delete_by_chunk_id(int32 chunk_id,ChunkConstraints * ccs)760 ts_chunk_constraint_delete_by_chunk_id(int32 chunk_id, ChunkConstraints *ccs)
761 {
762 ScanIterator iterator =
763 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
764 int count = 0;
765
766 init_scan_by_chunk_id(&iterator, chunk_id);
767 ts_scanner_foreach(&iterator)
768 {
769 count++;
770
771 chunk_constraints_add_from_tuple(ccs, ts_scan_iterator_tuple_info(&iterator));
772 chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
773 chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
774 }
775 return count;
776 }
777
778 int
ts_chunk_constraint_delete_by_dimension_slice_id(int32 dimension_slice_id)779 ts_chunk_constraint_delete_by_dimension_slice_id(int32 dimension_slice_id)
780 {
781 ScanIterator iterator =
782 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
783 int count = 0;
784
785 init_scan_by_dimension_slice_id(&iterator, dimension_slice_id);
786 ts_scanner_foreach(&iterator)
787 {
788 count++;
789
790 chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
791 chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
792 }
793 return count;
794 }
795
796 void
ts_chunk_constraint_recreate(const ChunkConstraint * cc,Oid chunk_oid)797 ts_chunk_constraint_recreate(const ChunkConstraint *cc, Oid chunk_oid)
798 {
799 ObjectAddress constrobj = {
800 .classId = ConstraintRelationId,
801 .objectId = get_relation_constraint_oid(chunk_oid, NameStr(cc->fd.constraint_name), false),
802 };
803
804 performDeletion(&constrobj, DROP_RESTRICT, 0);
805 chunk_constraint_create_on_table(cc, chunk_oid);
806 }
807
808 static void
chunk_constraint_rename_on_chunk_table(int32 chunk_id,const char * old_name,const char * new_name)809 chunk_constraint_rename_on_chunk_table(int32 chunk_id, const char *old_name, const char *new_name)
810 {
811 Oid chunk_relid = ts_chunk_get_relid(chunk_id, false);
812 Oid nspid = get_rel_namespace(chunk_relid);
813 RenameStmt rename = {
814 .renameType = OBJECT_TABCONSTRAINT,
815 .relation = makeRangeVar(get_namespace_name(nspid), get_rel_name(chunk_relid), 0),
816 .subname = pstrdup(old_name),
817 .newname = pstrdup(new_name),
818 };
819
820 RenameConstraint(&rename);
821 }
822
823 static void
chunk_constraint_rename_hypertable_from_tuple(TupleInfo * ti,const char * newname)824 chunk_constraint_rename_hypertable_from_tuple(TupleInfo *ti, const char *newname)
825 {
826 bool nulls[Natts_chunk_constraint];
827 Datum values[Natts_chunk_constraint];
828 bool repl[Natts_chunk_constraint] = { false };
829 HeapTuple tuple, new_tuple;
830 TupleDesc tupdesc = ts_scanner_get_tupledesc(ti);
831 NameData new_hypertable_constraint_name;
832 NameData new_chunk_constraint_name;
833 Name old_chunk_constraint_name;
834 int32 chunk_id;
835 bool should_free;
836
837 tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
838 heap_deform_tuple(tuple, tupdesc, values, nulls);
839
840 chunk_id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)]);
841 namestrcpy(&new_hypertable_constraint_name, newname);
842 chunk_constraint_choose_name(&new_chunk_constraint_name, newname, chunk_id);
843
844 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
845 NameGetDatum(&new_hypertable_constraint_name);
846 repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
847 old_chunk_constraint_name =
848 DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)]);
849 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
850 NameGetDatum(&new_chunk_constraint_name);
851 repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] = true;
852
853 chunk_constraint_rename_on_chunk_table(chunk_id,
854 NameStr(*old_chunk_constraint_name),
855 NameStr(new_chunk_constraint_name));
856
857 new_tuple = heap_modify_tuple(tuple, tupdesc, values, nulls, repl);
858
859 ts_chunk_index_adjust_meta(chunk_id,
860 newname,
861 NameStr(*old_chunk_constraint_name),
862 NameStr(new_chunk_constraint_name));
863
864 ts_catalog_update(ti->scanrel, new_tuple);
865 heap_freetuple(new_tuple);
866
867 if (should_free)
868 heap_freetuple(tuple);
869 }
870
871 /*
872 * Adjust internal metadata after index/constraint rename
873 */
874 int
ts_chunk_constraint_adjust_meta(int32 chunk_id,const char * ht_constraint_name,const char * oldname,const char * newname)875 ts_chunk_constraint_adjust_meta(int32 chunk_id, const char *ht_constraint_name, const char *oldname,
876 const char *newname)
877 {
878 ScanIterator iterator =
879 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
880 int count = 0;
881
882 init_scan_by_chunk_id_constraint_name(&iterator, chunk_id, oldname);
883
884 ts_scanner_foreach(&iterator)
885 {
886 bool nulls[Natts_chunk_constraint];
887 bool repl[Natts_chunk_constraint] = { false };
888 Datum values[Natts_chunk_constraint];
889 bool should_free;
890 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
891 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
892 HeapTuple new_tuple;
893
894 heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
895
896 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
897 CStringGetDatum(ht_constraint_name);
898 repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
899 values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
900 CStringGetDatum(newname);
901 repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] = true;
902
903 new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls, repl);
904
905 ts_catalog_update(ti->scanrel, new_tuple);
906 heap_freetuple(new_tuple);
907
908 if (should_free)
909 heap_freetuple(tuple);
910
911 count++;
912 }
913
914 return count;
915 }
916
917 int
ts_chunk_constraint_rename_hypertable_constraint(int32 chunk_id,const char * oldname,const char * newname)918 ts_chunk_constraint_rename_hypertable_constraint(int32 chunk_id, const char *oldname,
919 const char *newname)
920 {
921 ScanIterator iterator =
922 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
923 int count = 0;
924
925 init_scan_by_chunk_id(&iterator, chunk_id);
926 ts_scanner_foreach(&iterator)
927 {
928 if (!hypertable_constraint_matches_tuple(ts_scan_iterator_tuple_info(&iterator), oldname))
929 continue;
930
931 count++;
932 chunk_constraint_rename_hypertable_from_tuple(ts_scan_iterator_tuple_info(&iterator),
933 newname);
934 }
935 return count;
936 }
937
938 char *
ts_chunk_constraint_get_name_from_hypertable_constraint(Oid chunk_relid,const char * hypertable_constraint_name)939 ts_chunk_constraint_get_name_from_hypertable_constraint(Oid chunk_relid,
940 const char *hypertable_constraint_name)
941 {
942 ScanIterator iterator =
943 ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
944 Datum chunk_id = DirectFunctionCall1(ts_chunk_id_from_relid, ObjectIdGetDatum(chunk_relid));
945
946 init_scan_by_chunk_id(&iterator, DatumGetInt32(chunk_id));
947 ts_scanner_foreach(&iterator)
948 {
949 TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
950 MemoryContext oldmctx;
951 bool isnull;
952 Datum datum;
953 char *name;
954
955 if (!hypertable_constraint_matches_tuple(ti, hypertable_constraint_name))
956 continue;
957
958 datum = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
959 Assert(!isnull);
960
961 oldmctx = MemoryContextSwitchTo(ti->mctx);
962 name = pstrdup(NameStr(*DatumGetName(datum)));
963 MemoryContextSwitchTo(oldmctx);
964 ts_scan_iterator_close(&iterator);
965
966 return name;
967 }
968 return NULL;
969 }
970