1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <access/attnum.h>
8 #include <access/xact.h>
9 #include <catalog/pg_type.h>
10 #include <executor/tuptable.h>
11 #include <foreign/fdwapi.h>
12 #include <miscadmin.h>
13 #include <nodes/execnodes.h>
14 #include <nodes/makefuncs.h>
15 #include <nodes/nodes.h>
16 #include <nodes/plannodes.h>
17 #include <optimizer/optimizer.h>
18 #include <parser/parsetree.h>
19 #include <rewrite/rewriteManip.h>
20 #include <utils/builtins.h>
21 #include <utils/guc.h>
22 #include <utils/lsyscache.h>
23 #include <utils/memutils.h>
24 #include <utils/rel.h>
25 #include <utils/rls.h>
26 #include <continuous_agg.h>
27 
28 #include "compat/compat.h"
29 #include "errors.h"
30 #include "chunk_insert_state.h"
31 #include "chunk_dispatch.h"
32 #include "chunk_data_node.h"
33 #include "chunk_dispatch_state.h"
34 #include "chunk_index.h"
35 #include "indexing.h"
36 
37 /* Just like ExecPrepareExpr except that it doesn't switch to the query memory context */
38 static inline ExprState *
prepare_constr_expr(Expr * node)39 prepare_constr_expr(Expr *node)
40 {
41 	ExprState *result;
42 
43 	node = expression_planner(node);
44 	result = ExecInitExpr(node, NULL);
45 
46 	return result;
47 }
48 
49 /*
50  * Create the constraint exprs inside the current memory context. If this
51  * is not done here, then ExecRelCheck will do it for you but put it into
52  * the query memory context, which will cause a memory leak.
53  *
54  * See the comment in `ts_chunk_insert_state_destroy` for more information
55  * on the implications of this.
56  */
57 static inline void
create_chunk_rri_constraint_expr(ResultRelInfo * rri,Relation rel)58 create_chunk_rri_constraint_expr(ResultRelInfo *rri, Relation rel)
59 {
60 	int ncheck, i;
61 	ConstrCheck *check;
62 
63 	Assert(rel->rd_att->constr != NULL && rri->ri_ConstraintExprs == NULL);
64 
65 	ncheck = rel->rd_att->constr->num_check;
66 	check = rel->rd_att->constr->check;
67 	rri->ri_ConstraintExprs = (ExprState **) palloc(ncheck * sizeof(ExprState *));
68 
69 	for (i = 0; i < ncheck; i++)
70 	{
71 		Expr *checkconstr = stringToNode(check[i].ccbin);
72 
73 		rri->ri_ConstraintExprs[i] = prepare_constr_expr(checkconstr);
74 	}
75 }
76 
77 /*
78  * Create a new ResultRelInfo for a chunk.
79  *
80  * The ResultRelInfo holds the executor state (e.g., open relation, indexes, and
81  * options) for the result relation where tuples will be stored.
82  *
83  * The Hypertable ResultRelInfo is used as a template for the chunk's new ResultRelInfo.
84  */
85 static inline ResultRelInfo *
create_chunk_result_relation_info(ChunkDispatch * dispatch,Relation rel)86 create_chunk_result_relation_info(ChunkDispatch *dispatch, Relation rel)
87 {
88 	ResultRelInfo *rri;
89 	ResultRelInfo *rri_orig = dispatch->hypertable_result_rel_info;
90 	Index hyper_rti = rri_orig->ri_RangeTableIndex;
91 	rri = makeNode(ResultRelInfo);
92 
93 	InitResultRelInfo(rri, rel, hyper_rti, NULL, dispatch->estate->es_instrument);
94 
95 	/* Copy options from the main table's (hypertable's) result relation info */
96 	rri->ri_WithCheckOptions = rri_orig->ri_WithCheckOptions;
97 	rri->ri_WithCheckOptionExprs = rri_orig->ri_WithCheckOptionExprs;
98 #if PG14_LT
99 	rri->ri_junkFilter = rri_orig->ri_junkFilter;
100 #endif
101 	rri->ri_projectReturning = rri_orig->ri_projectReturning;
102 
103 	rri->ri_FdwState = NULL;
104 	rri->ri_usesFdwDirectModify = rri_orig->ri_usesFdwDirectModify;
105 
106 	if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE)
107 		rri->ri_FdwRoutine = GetFdwRoutineForRelation(rel, true);
108 
109 	create_chunk_rri_constraint_expr(rri, rel);
110 
111 	return rri;
112 }
113 
114 static inline ResultRelInfo *
create_compress_chunk_result_relation_info(ChunkDispatch * dispatch,Relation compress_rel)115 create_compress_chunk_result_relation_info(ChunkDispatch *dispatch, Relation compress_rel)
116 {
117 	ResultRelInfo *rri = makeNode(ResultRelInfo);
118 	ResultRelInfo *rri_orig = dispatch->hypertable_result_rel_info;
119 	Index hyper_rti = rri_orig->ri_RangeTableIndex;
120 
121 	InitResultRelInfo(rri, compress_rel, hyper_rti, NULL, dispatch->estate->es_instrument);
122 
123 	/* RLS policies are not supported if compression is enabled */
124 	Assert(rri_orig->ri_WithCheckOptions == NULL && rri_orig->ri_WithCheckOptionExprs == NULL);
125 	Assert(rri_orig->ri_projectReturning == NULL);
126 #if PG14_LT
127 	rri->ri_junkFilter = rri_orig->ri_junkFilter;
128 #endif
129 
130 	/* compressed rel chunk is on data node. Does not need any FDW access on AN */
131 	rri->ri_FdwState = NULL;
132 	rri->ri_usesFdwDirectModify = false;
133 	rri->ri_FdwRoutine = NULL;
134 	/* constraints are executed on the orig base chunk. So we do
135 	 * not call create_chunk_rri_constraint_expr here
136 	 */
137 	return rri;
138 }
139 
140 static ProjectionInfo *
get_adjusted_projection_info_returning(ProjectionInfo * orig,List * returning_clauses,TupleConversionMap * map,Index varno,Oid rowtype,TupleDesc chunk_desc)141 get_adjusted_projection_info_returning(ProjectionInfo *orig, List *returning_clauses,
142 									   TupleConversionMap *map, Index varno, Oid rowtype,
143 									   TupleDesc chunk_desc)
144 {
145 	bool found_whole_row;
146 
147 	Assert(returning_clauses != NIL);
148 
149 	/* map hypertable attnos -> chunk attnos */
150 	if (map != NULL)
151 		returning_clauses = castNode(List,
152 									 map_variable_attnos_compat((Node *) returning_clauses,
153 																varno,
154 																0,
155 																map->attrMap,
156 																map->outdesc->natts,
157 																rowtype,
158 																&found_whole_row));
159 
160 	return ExecBuildProjectionInfo(returning_clauses,
161 								   orig->pi_exprContext,
162 								   orig->pi_state.resultslot,
163 								   NULL,
164 								   chunk_desc);
165 }
166 
167 static List *
translate_clause(List * inclause,TupleConversionMap * chunk_map,Index varno,Relation hyper_rel,Relation chunk_rel)168 translate_clause(List *inclause, TupleConversionMap *chunk_map, Index varno, Relation hyper_rel,
169 				 Relation chunk_rel)
170 {
171 	List *clause = copyObject(inclause);
172 	bool found_whole_row;
173 
174 	/* nothing to do here if the chunk_map is NULL */
175 	if (!chunk_map)
176 		return list_copy(clause);
177 
178 	/* map hypertable attnos -> chunk attnos for the "excluded" table */
179 	clause = castNode(List,
180 					  map_variable_attnos_compat((Node *) clause,
181 												 INNER_VAR,
182 												 0,
183 												 chunk_map->attrMap,
184 												 RelationGetDescr(hyper_rel)->natts,
185 												 RelationGetForm(chunk_rel)->reltype,
186 												 &found_whole_row));
187 
188 	/* map hypertable attnos -> chunk attnos for the hypertable */
189 	clause = castNode(List,
190 					  map_variable_attnos_compat((Node *) clause,
191 												 varno,
192 												 0,
193 												 chunk_map->attrMap,
194 												 RelationGetDescr(hyper_rel)->natts,
195 												 RelationGetForm(chunk_rel)->reltype,
196 												 &found_whole_row));
197 
198 	return clause;
199 }
200 
201 #if PG14_LT
202 /*
203  * adjust_hypertable_tlist - from Postgres source code `adjust_partition_tlist`
204  *		Adjust the targetlist entries for a given chunk to account for
205  *		attribute differences between hypertable and the chunk
206  *
207  * The expressions have already been fixed, but here we fix the list to make
208  * target resnos match the chunk's attribute numbers.  This results in a
209  * copy of the original target list in which the entries appear in resno
210  * order, including both the existing entries (that may have their resno
211  * changed in-place) and the newly added entries for columns that don't exist
212  * in the parent.
213  *
214  * Scribbles on the input tlist's entries resno so be aware.
215  */
216 static List *
adjust_hypertable_tlist(List * tlist,TupleConversionMap * map)217 adjust_hypertable_tlist(List *tlist, TupleConversionMap *map)
218 {
219 	List *new_tlist = NIL;
220 	TupleDesc chunk_tupdesc = map->outdesc;
221 #if PG13_GE
222 	AttrNumber *attrMap = map->attrMap->attnums;
223 #else
224 	AttrNumber *attrMap = map->attrMap;
225 #endif
226 	AttrNumber chunk_attrno;
227 
228 	for (chunk_attrno = 1; chunk_attrno <= chunk_tupdesc->natts; chunk_attrno++)
229 	{
230 		Form_pg_attribute att_tup = TupleDescAttr(chunk_tupdesc, chunk_attrno - 1);
231 		TargetEntry *tle;
232 
233 		if (attrMap[chunk_attrno - 1] != InvalidAttrNumber)
234 		{
235 			Assert(!att_tup->attisdropped);
236 
237 			/*
238 			 * Use the corresponding entry from the parent's tlist, adjusting
239 			 * the resno the match the partition's attno.
240 			 */
241 			tle = (TargetEntry *) list_nth(tlist, attrMap[chunk_attrno - 1] - 1);
242 			if (namestrcmp(&att_tup->attname, tle->resname) != 0)
243 				elog(ERROR, "invalid translation of ON CONFLICT update statements");
244 			tle->resno = chunk_attrno;
245 		}
246 		else
247 		{
248 			Const *expr;
249 
250 			/*
251 			 * For a dropped attribute in the partition, generate a dummy
252 			 * entry with resno matching the partition's attno.
253 			 */
254 			Assert(att_tup->attisdropped);
255 			expr = makeConst(INT4OID,
256 							 -1,
257 							 InvalidOid,
258 							 sizeof(int32),
259 							 (Datum) 0,
260 							 true, /* isnull */
261 							 true /* byval */);
262 			tle = makeTargetEntry((Expr *) expr,
263 								  chunk_attrno,
264 								  pstrdup(NameStr(att_tup->attname)),
265 								  false);
266 		}
267 		new_tlist = lappend(new_tlist, tle);
268 	}
269 	return new_tlist;
270 }
271 #endif
272 
273 #if PG14_GE
274 /*
275  * adjust_chunk_colnos
276  *		Adjust the list of UPDATE target column numbers to account for
277  *		attribute differences between the parent and the partition.
278  *
279  * adapted from postgres adjust_partition_colnos
280  */
281 static List *
adjust_chunk_colnos(List * colnos,ResultRelInfo * chunk_rri)282 adjust_chunk_colnos(List *colnos, ResultRelInfo *chunk_rri)
283 {
284 	List *new_colnos = NIL;
285 	TupleConversionMap *map = ExecGetChildToRootMap(chunk_rri);
286 	AttrMap *attrMap;
287 	ListCell *lc;
288 
289 	Assert(map != NULL); /* else we shouldn't be here */
290 	attrMap = map->attrMap;
291 
292 	foreach (lc, colnos)
293 	{
294 		AttrNumber parentattrno = lfirst_int(lc);
295 
296 		if (parentattrno <= 0 || parentattrno > attrMap->maplen ||
297 			attrMap->attnums[parentattrno - 1] == 0)
298 			elog(ERROR, "unexpected attno %d in target column list", parentattrno);
299 		new_colnos = lappend_int(new_colnos, attrMap->attnums[parentattrno - 1]);
300 	}
301 
302 	return new_colnos;
303 }
304 #endif
305 
306 /*
307  * Setup ON CONFLICT state for a chunk.
308  *
309  * Mostly, this is about mapping attribute numbers from the hypertable root to
310  * a chunk, accounting for differences in the tuple descriptors due to dropped
311  * columns, etc.
312  */
313 static void
setup_on_conflict_state(ChunkInsertState * state,ChunkDispatch * dispatch,TupleConversionMap * chunk_map)314 setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
315 						TupleConversionMap *chunk_map)
316 {
317 	TupleConversionMap *map = state->hyper_to_chunk_map;
318 	ResultRelInfo *chunk_rri = state->result_relation_info;
319 	ResultRelInfo *hyper_rri = dispatch->hypertable_result_rel_info;
320 	Relation chunk_rel = state->result_relation_info->ri_RelationDesc;
321 	Relation hyper_rel = hyper_rri->ri_RelationDesc;
322 	ModifyTableState *mtstate = castNode(ModifyTableState, dispatch->dispatch_state->mtstate);
323 	ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);
324 
325 	Assert(ts_chunk_dispatch_get_on_conflict_action(dispatch) == ONCONFLICT_UPDATE);
326 
327 	OnConflictSetState *onconfl = makeNode(OnConflictSetState);
328 	memcpy(onconfl, hyper_rri->ri_onConflict, sizeof(OnConflictSetState));
329 	chunk_rri->ri_onConflict = onconfl;
330 
331 #if PG14_GE
332 	chunk_rri->ri_RootToPartitionMap = map;
333 #endif
334 
335 	Assert(mt->onConflictSet);
336 	Assert(hyper_rri->ri_onConflict != NULL);
337 
338 	/*
339 	 * Need a separate existing slot for each partition, as the
340 	 * partition could be of a different AM, even if the tuple
341 	 * descriptors match.
342 	 */
343 	onconfl->oc_Existing = table_slot_create(chunk_rri->ri_RelationDesc, NULL);
344 	state->existing_slot = onconfl->oc_Existing;
345 
346 	/*
347 	 * If the chunk's tuple descriptor matches exactly the hypertable
348 	 * (the common case), we can re-use most of the parent's ON
349 	 * CONFLICT SET state, skipping a bunch of work.  Otherwise, we
350 	 * need to create state specific to this partition.
351 	 */
352 	if (!map)
353 	{
354 		/*
355 		 * It's safe to reuse these from the hypertable, as we
356 		 * only process one tuple at a time (therefore we won't
357 		 * overwrite needed data in slots), and the results of
358 		 * projections are independent of the underlying storage.
359 		 * Projections and where clauses themselves don't store state
360 		 * / are independent of the underlying storage.
361 		 */
362 		onconfl->oc_ProjSlot = hyper_rri->ri_onConflict->oc_ProjSlot;
363 		onconfl->oc_ProjInfo = hyper_rri->ri_onConflict->oc_ProjInfo;
364 		onconfl->oc_WhereClause = hyper_rri->ri_onConflict->oc_WhereClause;
365 		state->conflproj_slot = onconfl->oc_ProjSlot;
366 	}
367 	else
368 	{
369 		List *onconflset;
370 #if PG14_GE
371 		List *onconflcols;
372 #endif
373 
374 		/*
375 		 * Translate expressions in onConflictSet to account for
376 		 * different attribute numbers.  For that, map partition
377 		 * varattnos twice: first to catch the EXCLUDED
378 		 * pseudo-relation (INNER_VAR), and second to handle the main
379 		 * target relation (firstVarno).
380 		 */
381 		onconflset = copyObject(mt->onConflictSet);
382 
383 		Assert(map->outdesc == RelationGetDescr(chunk_rel));
384 
385 		if (!chunk_map)
386 			chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel),
387 													  RelationGetDescr(hyper_rel),
388 													  gettext_noop("could not convert row type"));
389 
390 		onconflset = translate_clause(onconflset,
391 									  chunk_map,
392 									  hyper_rri->ri_RangeTableIndex,
393 									  hyper_rel,
394 									  chunk_rel);
395 
396 #if PG14_LT
397 		onconflset = adjust_hypertable_tlist(onconflset, state->hyper_to_chunk_map);
398 #else
399 		chunk_rri->ri_ChildToRootMap = chunk_map;
400 		chunk_rri->ri_ChildToRootMapValid = true;
401 
402 		/* Finally, adjust the target colnos to match the chunk. */
403 		if (chunk_map)
404 			onconflcols = adjust_chunk_colnos(mt->onConflictCols, chunk_rri);
405 		else
406 			onconflcols = mt->onConflictCols;
407 #endif
408 
409 		/* create the tuple slot for the UPDATE SET projection */
410 		onconfl->oc_ProjSlot = table_slot_create(chunk_rel, NULL);
411 		state->conflproj_slot = onconfl->oc_ProjSlot;
412 
413 		/* build UPDATE SET projection state */
414 #if PG14_LT
415 		ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext;
416 		onconfl->oc_ProjInfo = ExecBuildProjectionInfo(onconflset,
417 													   econtext,
418 													   state->conflproj_slot,
419 													   NULL,
420 													   RelationGetDescr(chunk_rel));
421 #else
422 		onconfl->oc_ProjInfo = ExecBuildUpdateProjection(onconflset,
423 														 true,
424 														 onconflcols,
425 														 RelationGetDescr(chunk_rel),
426 														 mtstate->ps.ps_ExprContext,
427 														 onconfl->oc_ProjSlot,
428 														 &mtstate->ps);
429 #endif
430 
431 		Node *onconflict_where = mt->onConflictWhere;
432 
433 		/*
434 		 * Map attribute numbers in the WHERE clause, if it exists.
435 		 */
436 		if (onconflict_where && chunk_map)
437 		{
438 			List *clause = translate_clause(castNode(List, onconflict_where),
439 											chunk_map,
440 											hyper_rri->ri_RangeTableIndex,
441 											hyper_rel,
442 											chunk_rel);
443 
444 			chunk_rri->ri_onConflict->oc_WhereClause = ExecInitQual(clause, NULL);
445 		}
446 	}
447 }
448 
449 static void
destroy_on_conflict_state(ChunkInsertState * state)450 destroy_on_conflict_state(ChunkInsertState *state)
451 {
452 	/*
453 	 * Clean up per-chunk tuple table slots created for ON CONFLICT handling.
454 	 */
455 	if (NULL != state->existing_slot)
456 		ExecDropSingleTupleTableSlot(state->existing_slot);
457 
458 	/* The ON CONFLICT projection slot is only chunk specific in case the
459 	 * tuple descriptor didn't match the hypertable */
460 	if (NULL != state->hyper_to_chunk_map && NULL != state->conflproj_slot)
461 		ExecDropSingleTupleTableSlot(state->conflproj_slot);
462 }
463 
464 /* Translate hypertable indexes to chunk indexes in the arbiter clause */
465 static void
set_arbiter_indexes(ChunkInsertState * state,ChunkDispatch * dispatch)466 set_arbiter_indexes(ChunkInsertState *state, ChunkDispatch *dispatch)
467 {
468 	List *arbiter_indexes = ts_chunk_dispatch_get_arbiter_indexes(dispatch);
469 	ListCell *lc;
470 
471 	state->arbiter_indexes = NIL;
472 
473 	foreach (lc, arbiter_indexes)
474 	{
475 		Oid hypertable_index = lfirst_oid(lc);
476 		Chunk *chunk = ts_chunk_get_by_relid(RelationGetRelid(state->rel), true);
477 		ChunkIndexMapping cim;
478 
479 		if (ts_chunk_index_get_by_hypertable_indexrelid(chunk, hypertable_index, &cim) < 1)
480 		{
481 			/*
482 			 * In case of distributed hypertables, we don't have information about the
483 			 * arbiter index on the remote side, so error out with a helpful hint
484 			 */
485 			ereport(ERROR,
486 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
487 					 errmsg("could not find arbiter index for hypertable index \"%s\" on chunk "
488 							"\"%s\"",
489 							get_rel_name(hypertable_index),
490 							get_rel_name(RelationGetRelid(state->rel))),
491 					 hypertable_is_distributed(dispatch->hypertable) ?
492 						 errhint(
493 							 "Omit the index inference specification for the distributed hypertable"
494 							 " in the ON CONFLICT clause.") :
495 						 0));
496 		}
497 
498 		state->arbiter_indexes = lappend_oid(state->arbiter_indexes, cim.indexoid);
499 	}
500 	state->result_relation_info->ri_onConflictArbiterIndexes = state->arbiter_indexes;
501 }
502 
503 /* Change the projections to work with chunks instead of hypertables */
504 static void
adjust_projections(ChunkInsertState * cis,ChunkDispatch * dispatch,Oid rowtype)505 adjust_projections(ChunkInsertState *cis, ChunkDispatch *dispatch, Oid rowtype)
506 {
507 	ResultRelInfo *chunk_rri = cis->result_relation_info;
508 	Relation hyper_rel = dispatch->hypertable_result_rel_info->ri_RelationDesc;
509 	Relation chunk_rel = cis->rel;
510 	TupleConversionMap *chunk_map = NULL;
511 	OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
512 
513 	if (ts_chunk_dispatch_has_returning(dispatch))
514 	{
515 		/*
516 		 * We need the opposite map from cis->hyper_to_chunk_map. The map needs
517 		 * to have the hypertable_desc in the out spot for map_variable_attnos
518 		 * to work correctly in mapping hypertable attnos->chunk attnos.
519 		 */
520 		chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel),
521 												  RelationGetDescr(hyper_rel),
522 												  gettext_noop("could not convert row type"));
523 
524 		chunk_rri->ri_projectReturning =
525 			get_adjusted_projection_info_returning(chunk_rri->ri_projectReturning,
526 												   ts_chunk_dispatch_get_returning_clauses(
527 													   dispatch),
528 												   chunk_map,
529 												   dispatch->hypertable_result_rel_info
530 													   ->ri_RangeTableIndex,
531 												   rowtype,
532 												   RelationGetDescr(chunk_rel));
533 	}
534 
535 	/* Set the chunk's arbiter indexes for ON CONFLICT statements */
536 	if (onconflict_action != ONCONFLICT_NONE)
537 	{
538 		set_arbiter_indexes(cis, dispatch);
539 
540 		if (onconflict_action == ONCONFLICT_UPDATE)
541 			setup_on_conflict_state(cis, dispatch, chunk_map);
542 	}
543 }
544 
545 /* Assumption: we have acquired a lock on the chunk table. This is
546  *      important because lock acquisition order is always orignal chunk,
547  *      followed by compressed chunk to prevent deadlocks.
548  * We now try to acquire a lock on the compressed chunk, if one exists.
549  * Note that the insert could have been blocked by a recompress_chunk operation.
550  * So the compressed chunk could have moved under us. We need to refetch the chunk
551  * to get the correct compressed chunk id (github issue 3400)
552  */
553 static Relation
lock_associated_compressed_chunk(int32 chunk_id,bool * has_compressed_chunk)554 lock_associated_compressed_chunk(int32 chunk_id, bool *has_compressed_chunk)
555 {
556 	Relation compress_rel = NULL;
557 	Chunk *orig_chunk = ts_chunk_get_by_id(chunk_id, true);
558 	Oid compress_chunk_relid = InvalidOid;
559 	*has_compressed_chunk = false;
560 	if (orig_chunk->fd.compressed_chunk_id)
561 		compress_chunk_relid = ts_chunk_get_relid(orig_chunk->fd.compressed_chunk_id, false);
562 	if (compress_chunk_relid != InvalidOid)
563 	{
564 		*has_compressed_chunk = true;
565 		compress_rel = table_open(compress_chunk_relid, RowExclusiveLock);
566 	}
567 	return compress_rel;
568 }
569 
570 /*
571  * Create new insert chunk state.
572  *
573  * This is essentially a ResultRelInfo for a chunk. Initialization of the
574  * ResultRelInfo should be similar to ExecInitModifyTable().
575  */
576 extern ChunkInsertState *
ts_chunk_insert_state_create(const Chunk * chunk,ChunkDispatch * dispatch)577 ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
578 {
579 	ChunkInsertState *state;
580 	Relation rel, parent_rel, compress_rel = NULL;
581 	MemoryContext old_mcxt;
582 	MemoryContext cis_context = AllocSetContextCreate(dispatch->estate->es_query_cxt,
583 													  "chunk insert state memory context",
584 													  ALLOCSET_DEFAULT_SIZES);
585 	OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
586 	ResultRelInfo *resrelinfo, *relinfo;
587 	bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0);
588 
589 	/* permissions NOT checked here; were checked at hypertable level */
590 	if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED)
591 		ereport(ERROR,
592 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
593 				 errmsg("hypertables do not support row-level security")));
594 
595 	if (chunk->relkind != RELKIND_RELATION && chunk->relkind != RELKIND_FOREIGN_TABLE)
596 		elog(ERROR, "insert is not on a table");
597 
598 	if (has_compressed_chunk &&
599 		(onconflict_action != ONCONFLICT_NONE || ts_chunk_dispatch_has_returning(dispatch)))
600 		ereport(ERROR,
601 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
602 				 errmsg("insert with ON CONFLICT or RETURNING clause is not supported on "
603 						"compressed chunks")));
604 
605 	/*
606 	 * We must allocate the range table entry on the executor's per-query
607 	 * context
608 	 */
609 	old_mcxt = MemoryContextSwitchTo(dispatch->estate->es_query_cxt);
610 
611 	rel = table_open(chunk->table_id, RowExclusiveLock);
612 	if (has_compressed_chunk && ts_indexing_relation_has_primary_or_unique_index(rel))
613 	{
614 		table_close(rel, RowExclusiveLock);
615 		ereport(ERROR,
616 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
617 				 errmsg("insert into a compressed chunk that has primary or unique constraint is "
618 						"not supported")));
619 	}
620 	compress_rel = lock_associated_compressed_chunk(chunk->fd.id, &has_compressed_chunk);
621 
622 	MemoryContextSwitchTo(cis_context);
623 	relinfo = create_chunk_result_relation_info(dispatch, rel);
624 	if (!has_compressed_chunk)
625 		resrelinfo = relinfo;
626 	else
627 	{
628 		/* insert the tuple into the compressed chunk instead */
629 		resrelinfo = create_compress_chunk_result_relation_info(dispatch, compress_rel);
630 	}
631 	CheckValidResultRel(resrelinfo, ts_chunk_dispatch_get_cmd_type(dispatch));
632 
633 	state = palloc0(sizeof(ChunkInsertState));
634 	state->mctx = cis_context;
635 	state->rel = rel;
636 	state->result_relation_info = resrelinfo;
637 	state->estate = dispatch->estate;
638 
639 	if (resrelinfo->ri_RelationDesc->rd_rel->relhasindex &&
640 		resrelinfo->ri_IndexRelationDescs == NULL)
641 		ExecOpenIndices(resrelinfo, onconflict_action != ONCONFLICT_NONE);
642 
643 	if (relinfo->ri_TrigDesc != NULL)
644 	{
645 		TriggerDesc *tg = relinfo->ri_TrigDesc;
646 
647 		/* instead of triggers can only be created on VIEWs */
648 		Assert(!tg->trig_insert_instead_row);
649 
650 		/*
651 		 * A statement that targets a parent table in an inheritance or
652 		 * partitioning hierarchy does not cause the statement-level triggers
653 		 * of affected child tables to be fired; only the parent table's
654 		 * statement-level triggers are fired. However, row-level triggers
655 		 * of any affected child tables will be fired.
656 		 * During chunk creation we only copy ROW trigger to chunks so
657 		 * statement triggers should not exist on chunks.
658 		 */
659 		if (tg->trig_insert_after_statement || tg->trig_insert_before_statement)
660 			elog(ERROR, "statement trigger on chunk table not supported");
661 
662 		/* AFTER ROW triggers do not work since we redirect the insert
663 		 * to the compressed chunk. We still want cagg triggers to fire.
664 		 * We'll call them directly. But raise an error if there are
665 		 * other triggers
666 		 */
667 		if (has_compressed_chunk && tg->trig_insert_after_row)
668 		{
669 			StringInfo trigger_list = makeStringInfo();
670 			Assert(tg->numtriggers > 0);
671 			for (int i = 0; i < tg->numtriggers; i++)
672 			{
673 				if (strncmp(tg->triggers[i].tgname,
674 							CAGGINVAL_TRIGGER_NAME,
675 							strlen(CAGGINVAL_TRIGGER_NAME)) == 0)
676 					continue;
677 				if (i > 0)
678 					appendStringInfoString(trigger_list, ", ");
679 				appendStringInfoString(trigger_list, tg->triggers[i].tgname);
680 			}
681 			if (trigger_list->len != 0)
682 			{
683 				ereport(ERROR,
684 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
685 						 errmsg("after insert row trigger on compressed chunk not supported"),
686 						 errdetail("Triggers: %s", trigger_list->data),
687 						 errhint("Decompress the chunk first before inserting into it.")));
688 			}
689 		}
690 	}
691 
692 	parent_rel = table_open(dispatch->hypertable->main_table_relid, AccessShareLock);
693 
694 	/* Set tuple conversion map, if tuple needs conversion. We don't want to
695 	 * convert tuples going into foreign tables since these are actually sent to
696 	 * data nodes for insert on that node's local hypertable. */
697 	if (chunk->relkind != RELKIND_FOREIGN_TABLE)
698 		state->hyper_to_chunk_map =
699 			convert_tuples_by_name_compat(RelationGetDescr(parent_rel),
700 										  RelationGetDescr(rel),
701 										  gettext_noop("could not convert row type"));
702 
703 	// relinfo->ri_RootToPartitionMap = state->hyper_to_chunk_map;
704 	// relinfo->ri_PartitionTupleSlot = table_slot_create(relinfo->ri_RelationDesc,
705 	// &state->estate->es_tupleTable);
706 
707 	adjust_projections(state, dispatch, RelationGetForm(rel)->reltype);
708 
709 	if (has_compressed_chunk)
710 	{
711 		int32 htid = ts_hypertable_relid_to_id(chunk->hypertable_relid);
712 		/* this is true as compressed chunks are not created on access node */
713 		Assert(chunk->relkind != RELKIND_FOREIGN_TABLE);
714 		Assert(compress_rel != NULL);
715 		state->compress_rel = compress_rel;
716 		Assert(ts_cm_functions->compress_row_init != NULL);
717 		/* need a way to convert from chunk tuple to compressed chunk tuple */
718 		state->compress_state = ts_cm_functions->compress_row_init(htid, rel, compress_rel);
719 		state->orig_result_relation_info = relinfo;
720 	}
721 	else
722 	{
723 		state->compress_state = NULL;
724 	}
725 
726 	/* Need a tuple table slot to store tuples going into this chunk. We don't
727 	 * want this slot tied to the executor's tuple table, since that would tie
728 	 * the slot's lifetime to the entire length of the execution and we want
729 	 * to be able to dynamically create and destroy chunk insert
730 	 * state. Otherwise, memory might blow up when there are many chunks being
731 	 * inserted into. This also means that the slot needs to be destroyed with
732 	 * the chunk insert state. */
733 	state->slot = MakeSingleTupleTableSlot(RelationGetDescr(relinfo->ri_RelationDesc),
734 										   table_slot_callbacks(relinfo->ri_RelationDesc));
735 	table_close(parent_rel, AccessShareLock);
736 
737 	state->chunk_id = chunk->fd.id;
738 
739 	if (chunk->relkind == RELKIND_FOREIGN_TABLE)
740 	{
741 		RangeTblEntry *rte =
742 			rt_fetch(resrelinfo->ri_RangeTableIndex, dispatch->estate->es_range_table);
743 
744 		Assert(rte != NULL);
745 
746 		state->user_id = OidIsValid(rte->checkAsUser) ? rte->checkAsUser : GetUserId();
747 		state->chunk_data_nodes = ts_chunk_data_nodes_copy(chunk);
748 	}
749 
750 	if (dispatch->hypertable_result_rel_info->ri_usesFdwDirectModify)
751 	{
752 		/* If the hypertable is setup for direct modify, we do not really use
753 		 * the FDW. Instead exploit the FdwPrivate pointer to pass on the
754 		 * chunk insert state to DataNodeDispatch so that it knows which data nodes
755 		 * to insert into. */
756 		resrelinfo->ri_FdwState = state;
757 	}
758 	else if (resrelinfo->ri_FdwRoutine && !resrelinfo->ri_usesFdwDirectModify &&
759 			 resrelinfo->ri_FdwRoutine->BeginForeignModify)
760 	{
761 		/*
762 		 * If this is a chunk located one or more data nodes, setup the
763 		 * foreign data wrapper state for the chunk. The private fdw data was
764 		 * created at the planning stage and contains, among other things, a
765 		 * deparsed insert statement for the hypertable.
766 		 */
767 		ModifyTableState *mtstate = dispatch->dispatch_state->mtstate;
768 		ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);
769 		List *fdwprivate = linitial_node(List, mt->fdwPrivLists);
770 
771 		Assert(NIL != fdwprivate);
772 		/*
773 		 * Since the fdwprivate data is part of the plan it must only
774 		 * consist of Node objects that can be copied. Therefore, we
775 		 * cannot directly append the non-Node ChunkInsertState to the
776 		 * private data. Instead, we make a copy of the private data
777 		 * before passing it on to the FDW handler function. In the
778 		 * FDW, the ChunkInsertState will be at the offset defined by
779 		 * the FdwModifyPrivateChunkInsertState (see
780 		 * tsl/src/fdw/timescaledb_fdw.c).
781 		 */
782 		fdwprivate = lappend(list_copy(fdwprivate), state);
783 		resrelinfo->ri_FdwRoutine->BeginForeignModify(mtstate,
784 													  resrelinfo,
785 													  fdwprivate,
786 													  0,
787 													  dispatch->eflags);
788 	}
789 
790 	MemoryContextSwitchTo(old_mcxt);
791 
792 	return state;
793 }
794 
795 extern void
ts_chunk_insert_state_destroy(ChunkInsertState * state)796 ts_chunk_insert_state_destroy(ChunkInsertState *state)
797 {
798 	ResultRelInfo *rri = state->result_relation_info;
799 
800 	if (rri->ri_FdwRoutine && !rri->ri_usesFdwDirectModify && rri->ri_FdwRoutine->EndForeignModify)
801 		rri->ri_FdwRoutine->EndForeignModify(state->estate, rri);
802 
803 	destroy_on_conflict_state(state);
804 	ExecCloseIndices(state->result_relation_info);
805 
806 	if (state->compress_rel)
807 	{
808 		ResultRelInfo *orig_chunk_rri = state->orig_result_relation_info;
809 		Oid chunk_relid = RelationGetRelid(orig_chunk_rri->ri_RelationDesc);
810 		ts_cm_functions->compress_row_end(state->compress_state);
811 		ts_cm_functions->compress_row_destroy(state->compress_state);
812 		Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
813 		if (!ts_chunk_is_unordered(chunk))
814 			ts_chunk_set_unordered(chunk);
815 		table_close(state->compress_rel, NoLock);
816 	}
817 	else if (RelationGetForm(state->result_relation_info->ri_RelationDesc)->relkind ==
818 			 RELKIND_FOREIGN_TABLE)
819 	{
820 		/* If a distributed chunk shows compressed status on AN,
821 		 * we mark it as unordered , because the insert now goes into
822 		 * a previously compressed chunk
823 		 */
824 		Oid chunk_relid = RelationGetRelid(state->result_relation_info->ri_RelationDesc);
825 		Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
826 		if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_unordered(chunk)))
827 			ts_chunk_set_unordered(chunk);
828 	}
829 
830 	table_close(state->rel, NoLock);
831 	if (state->slot)
832 		ExecDropSingleTupleTableSlot(state->slot);
833 
834 	/*
835 	 * Postgres stores cached row types from `get_cached_rowtype` in the
836 	 * constraint expression and tries to free this type via a callback from the
837 	 * `per_tuple_exprcontext`. Since we create constraint expressions within
838 	 * the chunk insert state memory context, this leads to a series of pointers
839 	 * structured like: `per_tuple_exprcontext -> constraint expr (in chunk
840 	 * insert state) -> cached row type` if we try to free the the chunk insert
841 	 * state MemoryContext while the `es_per_tuple_exprcontext` is live,
842 	 * postgres tries to dereference a dangling pointer in one of
843 	 * `es_per_tuple_exprcontext`'s callbacks. Normally postgres allocates the
844 	 * constraint expressions in a parent context of per_tuple_exprcontext so
845 	 * there is no issue, however we've run into excessive memory usage due to
846 	 * too many constraints, and want to allocate them for a shorter lifetime so
847 	 * we free them when SubspaceStore gets to full. This leaves us with a
848 	 * memory context relationship like the following:
849 	 *
850 	 *     query_ctx
851 	 *       / \
852 	 *      /   \
853 	 *   CIS    per_tuple
854 	 *
855 	 *
856 	 * To ensure this doesn't create dangling pointers from the per-tuple
857 	 * context to the chunk insert state (CIS) when we destroy the CIS, we avoid
858 	 * freeing the CIS memory context immediately. Instead we change its parent
859 	 * to be the per-tuple context (if it is still alive) so that it is only
860 	 * freed once that context is freed:
861 	 *
862 	 *     query_ctx
863 	 *        \
864 	 *         \
865 	 *         per_tuple
866 	 *           \
867 	 *            \
868 	 *            CIS
869 	 *
870 	 * Note that a previous approach registered the chunk insert state (CIS) to
871 	 * be freed by a reset callback on the per-tuple context. That caused a
872 	 * subtle bug because both the per-tuple context and the CIS share the same
873 	 * parent. Thus, the callback on a child would trigger the deletion of a
874 	 * sibling, leading to a cyclic relationship:
875 	 *
876 	 *     query_ctx
877 	 *       / \
878 	 *      /   \
879 	 *   CIS <-- per_tuple
880 	 *
881 	 *
882 	 * With this cycle, a delete of the query_ctx could first trigger a delete
883 	 * of the CIS (if not already deleted), then the per_tuple context, followed
884 	 * by the CIS again (via the callback), and thus a crash.
885 	 */
886 	if (state->estate->es_per_tuple_exprcontext)
887 		MemoryContextSetParent(state->mctx,
888 							   state->estate->es_per_tuple_exprcontext->ecxt_per_tuple_memory);
889 	else
890 		MemoryContextDelete(state->mctx);
891 }
892