1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include "scanner.h"
7 #include <postgres.h>
8 #include <access/heapam.h>
9 #include <access/xact.h>
10 #include <catalog/dependency.h>
11 #include <catalog/indexing.h>
12 #include <catalog/objectaddress.h>
13 #include <catalog/pg_constraint.h>
14 #include <commands/tablecmds.h>
15 #include <funcapi.h>
16 #include <nodes/makefuncs.h>
17 #include <utils/builtins.h>
18 #include <utils/hsearch.h>
19 #include <utils/lsyscache.h>
20 #include <utils/relcache.h>
21 #include <utils/rel.h>
22 #include <utils/syscache.h>
23 
24 #include "compat/compat.h"
25 #include "export.h"
26 #include "scan_iterator.h"
27 #include "chunk_constraint.h"
28 #include "chunk_index.h"
29 #include "constraint.h"
30 #include "dimension_vector.h"
31 #include "dimension_slice.h"
32 #include "hypercube.h"
33 #include "chunk.h"
34 #include "hypertable.h"
35 #include "errors.h"
36 #include "process_utility.h"
37 
38 #define DEFAULT_EXTRA_CONSTRAINTS_SIZE 4
39 
40 #define CHUNK_CONSTRAINTS_SIZE(num_constraints) (sizeof(ChunkConstraint) * (num_constraints))
41 
42 ChunkConstraints *
ts_chunk_constraints_alloc(int size_hint,MemoryContext mctx)43 ts_chunk_constraints_alloc(int size_hint, MemoryContext mctx)
44 {
45 	ChunkConstraints *ccs = MemoryContextAlloc(mctx, sizeof(ChunkConstraints));
46 
47 	ccs->mctx = mctx;
48 	ccs->capacity = size_hint + DEFAULT_EXTRA_CONSTRAINTS_SIZE;
49 	ccs->num_constraints = 0;
50 	ccs->num_dimension_constraints = 0;
51 	ccs->constraints = MemoryContextAllocZero(mctx, CHUNK_CONSTRAINTS_SIZE(ccs->capacity));
52 
53 	return ccs;
54 }
55 
56 ChunkConstraints *
ts_chunk_constraints_copy(ChunkConstraints * ccs)57 ts_chunk_constraints_copy(ChunkConstraints *ccs)
58 {
59 	ChunkConstraints *copy = palloc(sizeof(ChunkConstraints));
60 
61 	memcpy(copy, ccs, sizeof(ChunkConstraints));
62 	copy->constraints = palloc0(CHUNK_CONSTRAINTS_SIZE(ccs->capacity));
63 	memcpy(copy->constraints, ccs->constraints, CHUNK_CONSTRAINTS_SIZE(ccs->num_constraints));
64 
65 	return copy;
66 }
67 
68 static void
chunk_constraints_expand(ChunkConstraints * ccs,int16 new_capacity)69 chunk_constraints_expand(ChunkConstraints *ccs, int16 new_capacity)
70 {
71 	MemoryContext old;
72 
73 	if (new_capacity <= ccs->capacity)
74 		return;
75 
76 	old = MemoryContextSwitchTo(ccs->mctx);
77 	ccs->capacity = new_capacity;
78 	Assert(ccs->constraints); /* repalloc() does not work with NULL argument */
79 	ccs->constraints = repalloc(ccs->constraints, CHUNK_CONSTRAINTS_SIZE(new_capacity));
80 	MemoryContextSwitchTo(old);
81 }
82 
83 static void
chunk_constraint_dimension_choose_name(Name dst,int32 dimension_slice_id)84 chunk_constraint_dimension_choose_name(Name dst, int32 dimension_slice_id)
85 {
86 	snprintf(NameStr(*dst), NAMEDATALEN, "constraint_%d", dimension_slice_id);
87 }
88 
89 static void
chunk_constraint_choose_name(Name dst,const char * hypertable_constraint_name,int32 chunk_id)90 chunk_constraint_choose_name(Name dst, const char *hypertable_constraint_name, int32 chunk_id)
91 {
92 	char constrname[NAMEDATALEN];
93 	CatalogSecurityContext sec_ctx;
94 
95 	Assert(hypertable_constraint_name != NULL);
96 
97 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
98 	snprintf(constrname,
99 			 NAMEDATALEN,
100 			 "%d_" INT64_FORMAT "_%s",
101 			 chunk_id,
102 			 ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_CONSTRAINT),
103 			 hypertable_constraint_name);
104 	ts_catalog_restore_user(&sec_ctx);
105 
106 	namestrcpy(dst, constrname);
107 }
108 
109 static ChunkConstraint *
chunk_constraints_add(ChunkConstraints * ccs,int32 chunk_id,int32 dimension_slice_id,const char * constraint_name,const char * hypertable_constraint_name)110 chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id, int32 dimension_slice_id,
111 					  const char *constraint_name, const char *hypertable_constraint_name)
112 {
113 	ChunkConstraint *cc;
114 
115 	chunk_constraints_expand(ccs, ccs->num_constraints + 1);
116 	cc = &ccs->constraints[ccs->num_constraints++];
117 	cc->fd.chunk_id = chunk_id;
118 	cc->fd.dimension_slice_id = dimension_slice_id;
119 
120 	if (NULL == constraint_name)
121 	{
122 		if (is_dimension_constraint(cc))
123 		{
124 			chunk_constraint_dimension_choose_name(&cc->fd.constraint_name,
125 												   cc->fd.dimension_slice_id);
126 			namestrcpy(&cc->fd.hypertable_constraint_name, "");
127 		}
128 		else
129 			chunk_constraint_choose_name(&cc->fd.constraint_name,
130 										 hypertable_constraint_name,
131 										 cc->fd.chunk_id);
132 	}
133 	else
134 		namestrcpy(&cc->fd.constraint_name, constraint_name);
135 
136 	if (NULL != hypertable_constraint_name)
137 		namestrcpy(&cc->fd.hypertable_constraint_name, hypertable_constraint_name);
138 
139 	if (is_dimension_constraint(cc))
140 		ccs->num_dimension_constraints++;
141 
142 	return cc;
143 }
144 
145 static void
chunk_constraint_fill_tuple_values(const ChunkConstraint * cc,Datum values[Natts_chunk_constraint],bool nulls[Natts_chunk_constraint])146 chunk_constraint_fill_tuple_values(const ChunkConstraint *cc, Datum values[Natts_chunk_constraint],
147 								   bool nulls[Natts_chunk_constraint])
148 {
149 	memset(values, 0, sizeof(Datum) * Natts_chunk_constraint);
150 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)] =
151 		Int32GetDatum(cc->fd.chunk_id);
152 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)] =
153 		Int32GetDatum(cc->fd.dimension_slice_id);
154 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
155 		NameGetDatum(&cc->fd.constraint_name);
156 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
157 		NameGetDatum(&cc->fd.hypertable_constraint_name);
158 
159 	if (is_dimension_constraint(cc))
160 		nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
161 	else
162 		nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)] = true;
163 }
164 
165 static void
chunk_constraint_insert_relation(const Relation rel,const ChunkConstraint * cc)166 chunk_constraint_insert_relation(const Relation rel, const ChunkConstraint *cc)
167 {
168 	TupleDesc desc = RelationGetDescr(rel);
169 	Datum values[Natts_chunk_constraint];
170 	bool nulls[Natts_chunk_constraint] = { false };
171 
172 	chunk_constraint_fill_tuple_values(cc, values, nulls);
173 	ts_catalog_insert_values(rel, desc, values, nulls);
174 }
175 
176 /*
177  * Insert multiple chunk constraints into the metadata catalog.
178  */
179 void
ts_chunk_constraints_insert_metadata(const ChunkConstraints * ccs)180 ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs)
181 {
182 	Catalog *catalog = ts_catalog_get();
183 	CatalogSecurityContext sec_ctx;
184 	Relation rel;
185 	int i;
186 
187 	rel = table_open(catalog_get_table_id(catalog, CHUNK_CONSTRAINT), RowExclusiveLock);
188 
189 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
190 
191 	for (i = 0; i < ccs->num_constraints; i++)
192 		chunk_constraint_insert_relation(rel, &ccs->constraints[i]);
193 
194 	ts_catalog_restore_user(&sec_ctx);
195 	table_close(rel, RowExclusiveLock);
196 }
197 
198 /*
199  * Insert a single chunk constraints into the metadata catalog.
200  */
201 static void
chunk_constraint_insert(ChunkConstraint * constraint)202 chunk_constraint_insert(ChunkConstraint *constraint)
203 {
204 	Catalog *catalog = ts_catalog_get();
205 	CatalogSecurityContext sec_ctx;
206 	Relation rel;
207 
208 	rel = table_open(catalog_get_table_id(catalog, CHUNK_CONSTRAINT), RowExclusiveLock);
209 
210 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
211 	chunk_constraint_insert_relation(rel, constraint);
212 	ts_catalog_restore_user(&sec_ctx);
213 	table_close(rel, RowExclusiveLock);
214 }
215 
216 static ChunkConstraint *
chunk_constraints_add_from_tuple(ChunkConstraints * ccs,TupleInfo * ti)217 chunk_constraints_add_from_tuple(ChunkConstraints *ccs, TupleInfo *ti)
218 {
219 	bool nulls[Natts_chunk_constraint];
220 	Datum values[Natts_chunk_constraint];
221 	ChunkConstraint *constraints;
222 	int32 dimension_slice_id;
223 	Name constraint_name;
224 	Name hypertable_constraint_name;
225 	bool should_free;
226 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
227 
228 	heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
229 
230 	constraint_name =
231 		DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)]);
232 
233 	if (nulls[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)])
234 	{
235 		dimension_slice_id = 0;
236 		hypertable_constraint_name = DatumGetName(
237 			values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)]);
238 	}
239 	else
240 	{
241 		dimension_slice_id = DatumGetInt32(
242 			values[AttrNumberGetAttrOffset(Anum_chunk_constraint_dimension_slice_id)]);
243 		hypertable_constraint_name = DatumGetName(DirectFunctionCall1(namein, CStringGetDatum("")));
244 	}
245 
246 	constraints =
247 		chunk_constraints_add(ccs,
248 							  DatumGetInt32(
249 								  values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)]),
250 							  dimension_slice_id,
251 							  NameStr(*constraint_name),
252 							  NameStr(*hypertable_constraint_name));
253 
254 	if (should_free)
255 		heap_freetuple(tuple);
256 
257 	return constraints;
258 }
259 
260 /*
261  * Add a constraint to a chunk table.
262  */
263 static Oid
chunk_constraint_create_on_table(const ChunkConstraint * cc,Oid chunk_oid)264 chunk_constraint_create_on_table(const ChunkConstraint *cc, Oid chunk_oid)
265 {
266 	HeapTuple tuple;
267 	Datum values[Natts_chunk_constraint];
268 	bool nulls[Natts_chunk_constraint] = { false };
269 	CatalogSecurityContext sec_ctx;
270 	Relation rel;
271 
272 	chunk_constraint_fill_tuple_values(cc, values, nulls);
273 
274 	rel = RelationIdGetRelation(catalog_get_table_id(ts_catalog_get(), CHUNK_CONSTRAINT));
275 	tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
276 	RelationClose(rel);
277 
278 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
279 	CatalogInternalCall1(DDL_ADD_CHUNK_CONSTRAINT, HeapTupleGetDatum(tuple));
280 	ts_catalog_restore_user(&sec_ctx);
281 	heap_freetuple(tuple);
282 
283 	return get_relation_constraint_oid(chunk_oid, NameStr(cc->fd.constraint_name), true);
284 }
285 
286 /*
287  * Create a constraint on a chunk table, including adding relevant metadata to
288  * the catalog.
289  */
290 static Oid
chunk_constraint_create(const ChunkConstraint * cc,Oid chunk_oid,int32 chunk_id,Oid hypertable_oid,int32 hypertable_id)291 chunk_constraint_create(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id,
292 						Oid hypertable_oid, int32 hypertable_id)
293 {
294 	Oid chunk_constraint_oid;
295 
296 	ts_process_utility_set_expect_chunk_modification(true);
297 	chunk_constraint_oid = chunk_constraint_create_on_table(cc, chunk_oid);
298 	ts_process_utility_set_expect_chunk_modification(false);
299 
300 	/*
301 	 * The table constraint might not have been created if this constraint
302 	 * corresponds to a dimension slice that covers the entire range of values
303 	 * in the particular dimension. In that case, there is no need to add a
304 	 * table constraint.
305 	 */
306 	if (!OidIsValid(chunk_constraint_oid))
307 		return InvalidOid;
308 
309 	if (!is_dimension_constraint(cc))
310 	{
311 		Oid hypertable_constraint_oid =
312 			get_relation_constraint_oid(hypertable_oid,
313 										NameStr(cc->fd.hypertable_constraint_name),
314 										false);
315 		HeapTuple tuple = SearchSysCache1(CONSTROID, hypertable_constraint_oid);
316 
317 		if (HeapTupleIsValid(tuple))
318 		{
319 			FormData_pg_constraint *constr = (FormData_pg_constraint *) GETSTRUCT(tuple);
320 
321 			if (OidIsValid(constr->conindid) && constr->contype != CONSTRAINT_FOREIGN)
322 				ts_chunk_index_create_from_constraint(hypertable_id,
323 													  hypertable_constraint_oid,
324 													  chunk_id,
325 													  chunk_constraint_oid);
326 
327 			ReleaseSysCache(tuple);
328 		}
329 	}
330 
331 	return chunk_constraint_oid;
332 }
333 
334 /*
335  * Create a set of constraints on a chunk table.
336  */
337 void
ts_chunk_constraints_create(const ChunkConstraints * ccs,Oid chunk_oid,int32 chunk_id,Oid hypertable_oid,int32 hypertable_id)338 ts_chunk_constraints_create(const ChunkConstraints *ccs, Oid chunk_oid, int32 chunk_id,
339 							Oid hypertable_oid, int32 hypertable_id)
340 {
341 	int i;
342 
343 	for (i = 0; i < ccs->num_constraints; i++)
344 		chunk_constraint_create(&ccs->constraints[i],
345 								chunk_oid,
346 								chunk_id,
347 								hypertable_oid,
348 								hypertable_id);
349 }
350 
351 static void
init_scan_by_chunk_id(ScanIterator * iterator,int32 chunk_id)352 init_scan_by_chunk_id(ScanIterator *iterator, int32 chunk_id)
353 {
354 	iterator->ctx.index = catalog_get_index(ts_catalog_get(),
355 											CHUNK_CONSTRAINT,
356 											CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
357 	ts_scan_iterator_scan_key_init(iterator,
358 								   Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_chunk_id,
359 								   BTEqualStrategyNumber,
360 								   F_INT4EQ,
361 								   Int32GetDatum(chunk_id));
362 }
363 
364 static void
init_scan_by_dimension_slice_id(ScanIterator * iterator,int32 dimension_slice_id)365 init_scan_by_dimension_slice_id(ScanIterator *iterator, int32 dimension_slice_id)
366 {
367 	iterator->ctx.index = catalog_get_index(ts_catalog_get(),
368 											CHUNK_CONSTRAINT,
369 											CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
370 
371 	ts_scan_iterator_scan_key_init(
372 		iterator,
373 		Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_dimension_slice_id,
374 		BTEqualStrategyNumber,
375 		F_INT4EQ,
376 		Int32GetDatum(dimension_slice_id));
377 }
378 
379 static void
init_scan_by_chunk_id_constraint_name(ScanIterator * iterator,int32 chunk_id,const char * constraint_name)380 init_scan_by_chunk_id_constraint_name(ScanIterator *iterator, int32 chunk_id,
381 									  const char *constraint_name)
382 {
383 	iterator->ctx.index = catalog_get_index(ts_catalog_get(),
384 											CHUNK_CONSTRAINT,
385 											CHUNK_CONSTRAINT_CHUNK_ID_CONSTRAINT_NAME_IDX);
386 
387 	ts_scan_iterator_scan_key_init(iterator,
388 								   Anum_chunk_constraint_chunk_id_constraint_name_idx_chunk_id,
389 								   BTEqualStrategyNumber,
390 								   F_INT4EQ,
391 								   Int32GetDatum(chunk_id));
392 
393 	ts_scan_iterator_scan_key_init(
394 		iterator,
395 		Anum_chunk_constraint_chunk_id_constraint_name_idx_constraint_name,
396 		BTEqualStrategyNumber,
397 		F_NAMEEQ,
398 		CStringGetDatum(constraint_name));
399 }
400 
401 /*
402  * Scan all the chunk's constraints given its chunk ID.
403  *
404  * Returns a set of chunk constraints.
405  */
406 ChunkConstraints *
ts_chunk_constraint_scan_by_chunk_id(int32 chunk_id,Size num_constraints_hint,MemoryContext mctx)407 ts_chunk_constraint_scan_by_chunk_id(int32 chunk_id, Size num_constraints_hint, MemoryContext mctx)
408 {
409 	ChunkConstraints *constraints = ts_chunk_constraints_alloc(num_constraints_hint, mctx);
410 	ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
411 	int num_found = 0;
412 
413 	init_scan_by_chunk_id(&iterator, chunk_id);
414 	ts_scanner_foreach(&iterator)
415 	{
416 		num_found++;
417 		chunk_constraints_add_from_tuple(constraints, ts_scan_iterator_tuple_info(&iterator));
418 	}
419 
420 	if (num_found != constraints->num_constraints)
421 		elog(ERROR, "unexpected number of constraints found for chunk ID %d", chunk_id);
422 
423 	return constraints;
424 }
425 
426 typedef struct ChunkConstraintScanData
427 {
428 	ChunkScanCtx *scanctx;
429 	DimensionSlice *slice;
430 } ChunkConstraintScanData;
431 
432 /*
433  * Scan for all chunk constraints that match the given slice ID. The chunk
434  * constraints are saved in the chunk scan context.
435  */
436 int
ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice * slice,ChunkScanCtx * ctx,MemoryContext mctx)437 ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *slice, ChunkScanCtx *ctx,
438 											MemoryContext mctx)
439 {
440 	ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
441 	int count = 0;
442 
443 	init_scan_by_dimension_slice_id(&iterator, slice->fd.id);
444 	ts_scanner_foreach(&iterator)
445 	{
446 		const Hyperspace *hs = ctx->space;
447 		ChunkStub *stub;
448 		ChunkScanEntry *entry;
449 		bool found;
450 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
451 		Datum datum = slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &found);
452 		int32 chunk_id = DatumGetInt32(datum);
453 
454 		if (slot_attisnull(ts_scan_iterator_slot(&iterator),
455 						   Anum_chunk_constraint_dimension_slice_id))
456 			continue;
457 
458 		count++;
459 
460 		Assert(!slot_attisnull(ti->slot, Anum_chunk_constraint_dimension_slice_id));
461 
462 		entry = hash_search(ctx->htab, &chunk_id, HASH_ENTER, &found);
463 
464 		if (!found)
465 		{
466 			stub = ts_chunk_stub_create(chunk_id, hs->num_dimensions);
467 			stub->cube = ts_hypercube_alloc(hs->num_dimensions);
468 			entry->stub = stub;
469 		}
470 		else
471 			stub = entry->stub;
472 
473 		chunk_constraints_add_from_tuple(stub->constraints, ti);
474 
475 		ts_hypercube_add_slice(stub->cube, slice);
476 
477 		/* A stub is complete when we've added slices for all its dimensions,
478 		 * i.e., a complete hypercube */
479 		if (chunk_stub_is_complete(stub, ctx->space))
480 		{
481 			ctx->num_complete_chunks++;
482 
483 			if (ctx->early_abort)
484 			{
485 				ts_scan_iterator_close(&iterator);
486 				break;
487 			}
488 		}
489 	}
490 	return count;
491 }
492 
493 /*
494  * Similar to chunk_constraint_scan_by_dimension_slice, but stores only chunk_ids
495  * in a list, which is easier to traverse and provides deterministic chunk selection.
496  */
497 int
ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice * slice,List ** list,MemoryContext mctx)498 ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice *slice, List **list,
499 													MemoryContext mctx)
500 {
501 	ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
502 	int count = 0;
503 
504 	init_scan_by_dimension_slice_id(&iterator, slice->fd.id);
505 	ts_scanner_foreach(&iterator)
506 	{
507 		bool is_null;
508 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
509 		Datum chunk_id;
510 
511 		if (slot_attisnull(ti->slot, Anum_chunk_constraint_dimension_slice_id))
512 			continue;
513 
514 		count++;
515 		chunk_id = slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &is_null);
516 		Assert(!is_null);
517 		*list = lappend_int(*list, DatumGetInt32(chunk_id));
518 	}
519 	return count;
520 }
521 
522 /*
523  * Scan for chunk constraints given a dimension slice ID.
524  *
525  * Optionally, collect all chunk constraints if ChunkConstraints is non-NULL.
526  */
527 int
ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,ChunkConstraints * ccs,MemoryContext mctx)528 ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id, ChunkConstraints *ccs,
529 											   MemoryContext mctx)
530 {
531 	ScanIterator iterator = ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, mctx);
532 	int count = 0;
533 
534 	init_scan_by_dimension_slice_id(&iterator, dimension_slice_id);
535 	ts_scanner_foreach(&iterator)
536 	{
537 		if (slot_attisnull(ts_scan_iterator_slot(&iterator),
538 						   Anum_chunk_constraint_dimension_slice_id))
539 			continue;
540 
541 		count++;
542 		if (ccs != NULL)
543 			chunk_constraints_add_from_tuple(ccs, ts_scan_iterator_tuple_info(&iterator));
544 	}
545 	return count;
546 }
547 
548 static bool
chunk_constraint_need_on_chunk(const char chunk_relkind,Form_pg_constraint conform)549 chunk_constraint_need_on_chunk(const char chunk_relkind, Form_pg_constraint conform)
550 {
551 	if (conform->contype == CONSTRAINT_CHECK)
552 	{
553 		/*
554 		 * check and not null constraints handled by regular inheritance (from
555 		 * docs): All check constraints and not-null constraints on a parent
556 		 * table are automatically inherited by its children, unless
557 		 * explicitly specified otherwise with NO INHERIT clauses. Other types
558 		 * of constraints (unique, primary key, and foreign key constraints)
559 		 * are not inherited."
560 		 */
561 		return false;
562 	}
563 
564 	/* Foreign tables do not support non-check constraints, so skip them */
565 	if (chunk_relkind == RELKIND_FOREIGN_TABLE)
566 		return false;
567 
568 	return true;
569 }
570 
571 int
ts_chunk_constraints_add_dimension_constraints(ChunkConstraints * ccs,int32 chunk_id,const Hypercube * cube)572 ts_chunk_constraints_add_dimension_constraints(ChunkConstraints *ccs, int32 chunk_id,
573 											   const Hypercube *cube)
574 {
575 	int i;
576 
577 	for (i = 0; i < cube->num_slices; i++)
578 		chunk_constraints_add(ccs, chunk_id, cube->slices[i]->fd.id, NULL, NULL);
579 
580 	return cube->num_slices;
581 }
582 
583 typedef struct ConstraintContext
584 {
585 	int num_added;
586 	char chunk_relkind;
587 	ChunkConstraints *ccs;
588 	int32 chunk_id;
589 } ConstraintContext;
590 
591 static ConstraintProcessStatus
chunk_constraint_add(HeapTuple constraint_tuple,void * arg)592 chunk_constraint_add(HeapTuple constraint_tuple, void *arg)
593 {
594 	ConstraintContext *cc = arg;
595 	Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(constraint_tuple);
596 
597 	if (chunk_constraint_need_on_chunk(cc->chunk_relkind, constraint))
598 	{
599 		chunk_constraints_add(cc->ccs, cc->chunk_id, 0, NULL, NameStr(constraint->conname));
600 		return CONSTR_PROCESSED;
601 	}
602 
603 	return CONSTR_IGNORED;
604 }
605 
606 int
ts_chunk_constraints_add_inheritable_constraints(ChunkConstraints * ccs,int32 chunk_id,const char chunk_relkind,Oid hypertable_oid)607 ts_chunk_constraints_add_inheritable_constraints(ChunkConstraints *ccs, int32 chunk_id,
608 												 const char chunk_relkind, Oid hypertable_oid)
609 {
610 	ConstraintContext cc = {
611 		.chunk_relkind = chunk_relkind,
612 		.ccs = ccs,
613 		.chunk_id = chunk_id,
614 	};
615 
616 	return ts_constraint_process(hypertable_oid, chunk_constraint_add, &cc);
617 }
618 
619 void
ts_chunk_constraint_create_on_chunk(const Chunk * chunk,Oid constraint_oid)620 ts_chunk_constraint_create_on_chunk(const Chunk *chunk, Oid constraint_oid)
621 {
622 	HeapTuple tuple;
623 	Form_pg_constraint con;
624 
625 	tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraint_oid));
626 
627 	if (!HeapTupleIsValid(tuple))
628 		elog(ERROR, "cache lookup failed for constraint %u", constraint_oid);
629 
630 	con = (Form_pg_constraint) GETSTRUCT(tuple);
631 
632 	if (chunk_constraint_need_on_chunk(chunk->relkind, con))
633 	{
634 		ChunkConstraint *cc =
635 			chunk_constraints_add(chunk->constraints, chunk->fd.id, 0, NULL, NameStr(con->conname));
636 
637 		chunk_constraint_insert(cc);
638 
639 		chunk_constraint_create(cc,
640 								chunk->table_id,
641 								chunk->fd.id,
642 								chunk->hypertable_relid,
643 								chunk->fd.hypertable_id);
644 	}
645 
646 	ReleaseSysCache(tuple);
647 }
648 
649 static bool
hypertable_constraint_matches_tuple(TupleInfo * ti,const char * hypertable_constraint_name)650 hypertable_constraint_matches_tuple(TupleInfo *ti, const char *hypertable_constraint_name)
651 {
652 	bool isnull;
653 	Datum name = slot_getattr(ti->slot, Anum_chunk_constraint_hypertable_constraint_name, &isnull);
654 
655 	return !isnull && namestrcmp(DatumGetName(name), hypertable_constraint_name) == 0;
656 }
657 
658 static void
chunk_constraint_delete_metadata(TupleInfo * ti)659 chunk_constraint_delete_metadata(TupleInfo *ti)
660 {
661 	bool isnull;
662 	Datum constrname = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
663 	int32 chunk_id = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &isnull));
664 	/* Get the chunk relid. Note that, at this point, the chunk table can be
665 	 * deleted already */
666 	Oid chunk_relid = ts_chunk_get_relid(chunk_id, true);
667 
668 	if (OidIsValid(chunk_relid))
669 	{
670 		Oid index_relid = get_constraint_index(
671 			get_relation_constraint_oid(chunk_relid, NameStr(*DatumGetName(constrname)), true));
672 
673 		/*
674 		 * If this is an index constraint, we need to cleanup the index
675 		 * metadata. Don't drop the index though, since that will happend when
676 		 * the constraint is dropped.
677 		 */
678 		if (OidIsValid(index_relid))
679 			ts_chunk_index_delete(chunk_id, get_rel_name(index_relid), false);
680 	}
681 
682 	ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
683 }
684 
685 static void
chunk_constraint_drop_constraint(TupleInfo * ti)686 chunk_constraint_drop_constraint(TupleInfo *ti)
687 {
688 	bool isnull;
689 	Datum constrname = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
690 	int32 chunk_id = DatumGetInt32(slot_getattr(ti->slot, Anum_chunk_constraint_chunk_id, &isnull));
691 	/* Get the chunk relid. Note that, at this point, the chunk table can be
692 	 * deleted already. */
693 	Oid chunk_relid = ts_chunk_get_relid(chunk_id, true);
694 
695 	if (OidIsValid(chunk_relid))
696 	{
697 		ObjectAddress constrobj = {
698 			.classId = ConstraintRelationId,
699 			.objectId =
700 				get_relation_constraint_oid(chunk_relid, NameStr(*DatumGetName(constrname)), true),
701 		};
702 
703 		if (OidIsValid(constrobj.objectId))
704 			performDeletion(&constrobj, DROP_RESTRICT, 0);
705 	}
706 }
707 
708 int
ts_chunk_constraint_delete_by_hypertable_constraint_name(int32 chunk_id,const char * hypertable_constraint_name,bool delete_metadata,bool drop_constraint)709 ts_chunk_constraint_delete_by_hypertable_constraint_name(int32 chunk_id,
710 														 const char *hypertable_constraint_name,
711 														 bool delete_metadata, bool drop_constraint)
712 {
713 	ScanIterator iterator =
714 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
715 	int count = 0;
716 
717 	init_scan_by_chunk_id(&iterator, chunk_id);
718 	ts_scanner_foreach(&iterator)
719 	{
720 		if (!hypertable_constraint_matches_tuple(ts_scan_iterator_tuple_info(&iterator),
721 												 hypertable_constraint_name))
722 			continue;
723 
724 		count++;
725 
726 		if (delete_metadata)
727 			chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
728 
729 		if (drop_constraint)
730 			chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
731 	}
732 	return count;
733 }
734 
735 int
ts_chunk_constraint_delete_by_constraint_name(int32 chunk_id,const char * constraint_name,bool delete_metadata,bool drop_constraint)736 ts_chunk_constraint_delete_by_constraint_name(int32 chunk_id, const char *constraint_name,
737 											  bool delete_metadata, bool drop_constraint)
738 {
739 	ScanIterator iterator =
740 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
741 	int count = 0;
742 
743 	init_scan_by_chunk_id_constraint_name(&iterator, chunk_id, constraint_name);
744 	ts_scanner_foreach(&iterator)
745 	{
746 		count++;
747 		if (delete_metadata)
748 			chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
749 
750 		if (drop_constraint)
751 			chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
752 	}
753 	return count;
754 }
755 
756 /*
757  * Delete all constraints for a chunk. Optionally, collect the deleted constraints.
758  */
759 int
ts_chunk_constraint_delete_by_chunk_id(int32 chunk_id,ChunkConstraints * ccs)760 ts_chunk_constraint_delete_by_chunk_id(int32 chunk_id, ChunkConstraints *ccs)
761 {
762 	ScanIterator iterator =
763 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
764 	int count = 0;
765 
766 	init_scan_by_chunk_id(&iterator, chunk_id);
767 	ts_scanner_foreach(&iterator)
768 	{
769 		count++;
770 
771 		chunk_constraints_add_from_tuple(ccs, ts_scan_iterator_tuple_info(&iterator));
772 		chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
773 		chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
774 	}
775 	return count;
776 }
777 
778 int
ts_chunk_constraint_delete_by_dimension_slice_id(int32 dimension_slice_id)779 ts_chunk_constraint_delete_by_dimension_slice_id(int32 dimension_slice_id)
780 {
781 	ScanIterator iterator =
782 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
783 	int count = 0;
784 
785 	init_scan_by_dimension_slice_id(&iterator, dimension_slice_id);
786 	ts_scanner_foreach(&iterator)
787 	{
788 		count++;
789 
790 		chunk_constraint_delete_metadata(ts_scan_iterator_tuple_info(&iterator));
791 		chunk_constraint_drop_constraint(ts_scan_iterator_tuple_info(&iterator));
792 	}
793 	return count;
794 }
795 
796 void
ts_chunk_constraint_recreate(const ChunkConstraint * cc,Oid chunk_oid)797 ts_chunk_constraint_recreate(const ChunkConstraint *cc, Oid chunk_oid)
798 {
799 	ObjectAddress constrobj = {
800 		.classId = ConstraintRelationId,
801 		.objectId = get_relation_constraint_oid(chunk_oid, NameStr(cc->fd.constraint_name), false),
802 	};
803 
804 	performDeletion(&constrobj, DROP_RESTRICT, 0);
805 	chunk_constraint_create_on_table(cc, chunk_oid);
806 }
807 
808 static void
chunk_constraint_rename_on_chunk_table(int32 chunk_id,const char * old_name,const char * new_name)809 chunk_constraint_rename_on_chunk_table(int32 chunk_id, const char *old_name, const char *new_name)
810 {
811 	Oid chunk_relid = ts_chunk_get_relid(chunk_id, false);
812 	Oid nspid = get_rel_namespace(chunk_relid);
813 	RenameStmt rename = {
814 		.renameType = OBJECT_TABCONSTRAINT,
815 		.relation = makeRangeVar(get_namespace_name(nspid), get_rel_name(chunk_relid), 0),
816 		.subname = pstrdup(old_name),
817 		.newname = pstrdup(new_name),
818 	};
819 
820 	RenameConstraint(&rename);
821 }
822 
823 static void
chunk_constraint_rename_hypertable_from_tuple(TupleInfo * ti,const char * newname)824 chunk_constraint_rename_hypertable_from_tuple(TupleInfo *ti, const char *newname)
825 {
826 	bool nulls[Natts_chunk_constraint];
827 	Datum values[Natts_chunk_constraint];
828 	bool repl[Natts_chunk_constraint] = { false };
829 	HeapTuple tuple, new_tuple;
830 	TupleDesc tupdesc = ts_scanner_get_tupledesc(ti);
831 	NameData new_hypertable_constraint_name;
832 	NameData new_chunk_constraint_name;
833 	Name old_chunk_constraint_name;
834 	int32 chunk_id;
835 	bool should_free;
836 
837 	tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
838 	heap_deform_tuple(tuple, tupdesc, values, nulls);
839 
840 	chunk_id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_chunk_id)]);
841 	namestrcpy(&new_hypertable_constraint_name, newname);
842 	chunk_constraint_choose_name(&new_chunk_constraint_name, newname, chunk_id);
843 
844 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
845 		NameGetDatum(&new_hypertable_constraint_name);
846 	repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
847 	old_chunk_constraint_name =
848 		DatumGetName(values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)]);
849 	values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
850 		NameGetDatum(&new_chunk_constraint_name);
851 	repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] = true;
852 
853 	chunk_constraint_rename_on_chunk_table(chunk_id,
854 										   NameStr(*old_chunk_constraint_name),
855 										   NameStr(new_chunk_constraint_name));
856 
857 	new_tuple = heap_modify_tuple(tuple, tupdesc, values, nulls, repl);
858 
859 	ts_chunk_index_adjust_meta(chunk_id,
860 							   newname,
861 							   NameStr(*old_chunk_constraint_name),
862 							   NameStr(new_chunk_constraint_name));
863 
864 	ts_catalog_update(ti->scanrel, new_tuple);
865 	heap_freetuple(new_tuple);
866 
867 	if (should_free)
868 		heap_freetuple(tuple);
869 }
870 
871 /*
872  * Adjust internal metadata after index/constraint rename
873  */
874 int
ts_chunk_constraint_adjust_meta(int32 chunk_id,const char * ht_constraint_name,const char * oldname,const char * newname)875 ts_chunk_constraint_adjust_meta(int32 chunk_id, const char *ht_constraint_name, const char *oldname,
876 								const char *newname)
877 {
878 	ScanIterator iterator =
879 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
880 	int count = 0;
881 
882 	init_scan_by_chunk_id_constraint_name(&iterator, chunk_id, oldname);
883 
884 	ts_scanner_foreach(&iterator)
885 	{
886 		bool nulls[Natts_chunk_constraint];
887 		bool repl[Natts_chunk_constraint] = { false };
888 		Datum values[Natts_chunk_constraint];
889 		bool should_free;
890 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
891 		HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
892 		HeapTuple new_tuple;
893 
894 		heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
895 
896 		values[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] =
897 			CStringGetDatum(ht_constraint_name);
898 		repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_hypertable_constraint_name)] = true;
899 		values[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] =
900 			CStringGetDatum(newname);
901 		repl[AttrNumberGetAttrOffset(Anum_chunk_constraint_constraint_name)] = true;
902 
903 		new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls, repl);
904 
905 		ts_catalog_update(ti->scanrel, new_tuple);
906 		heap_freetuple(new_tuple);
907 
908 		if (should_free)
909 			heap_freetuple(tuple);
910 
911 		count++;
912 	}
913 
914 	return count;
915 }
916 
917 int
ts_chunk_constraint_rename_hypertable_constraint(int32 chunk_id,const char * oldname,const char * newname)918 ts_chunk_constraint_rename_hypertable_constraint(int32 chunk_id, const char *oldname,
919 												 const char *newname)
920 {
921 	ScanIterator iterator =
922 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
923 	int count = 0;
924 
925 	init_scan_by_chunk_id(&iterator, chunk_id);
926 	ts_scanner_foreach(&iterator)
927 	{
928 		if (!hypertable_constraint_matches_tuple(ts_scan_iterator_tuple_info(&iterator), oldname))
929 			continue;
930 
931 		count++;
932 		chunk_constraint_rename_hypertable_from_tuple(ts_scan_iterator_tuple_info(&iterator),
933 													  newname);
934 	}
935 	return count;
936 }
937 
938 char *
ts_chunk_constraint_get_name_from_hypertable_constraint(Oid chunk_relid,const char * hypertable_constraint_name)939 ts_chunk_constraint_get_name_from_hypertable_constraint(Oid chunk_relid,
940 														const char *hypertable_constraint_name)
941 {
942 	ScanIterator iterator =
943 		ts_scan_iterator_create(CHUNK_CONSTRAINT, RowExclusiveLock, CurrentMemoryContext);
944 	Datum chunk_id = DirectFunctionCall1(ts_chunk_id_from_relid, ObjectIdGetDatum(chunk_relid));
945 
946 	init_scan_by_chunk_id(&iterator, DatumGetInt32(chunk_id));
947 	ts_scanner_foreach(&iterator)
948 	{
949 		TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
950 		MemoryContext oldmctx;
951 		bool isnull;
952 		Datum datum;
953 		char *name;
954 
955 		if (!hypertable_constraint_matches_tuple(ti, hypertable_constraint_name))
956 			continue;
957 
958 		datum = slot_getattr(ti->slot, Anum_chunk_constraint_constraint_name, &isnull);
959 		Assert(!isnull);
960 
961 		oldmctx = MemoryContextSwitchTo(ti->mctx);
962 		name = pstrdup(NameStr(*DatumGetName(datum)));
963 		MemoryContextSwitchTo(oldmctx);
964 		ts_scan_iterator_close(&iterator);
965 
966 		return name;
967 	}
968 	return NULL;
969 }
970