1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <access/attnum.h>
8 #include <access/xact.h>
9 #include <catalog/pg_type.h>
10 #include <executor/tuptable.h>
11 #include <foreign/fdwapi.h>
12 #include <miscadmin.h>
13 #include <nodes/execnodes.h>
14 #include <nodes/makefuncs.h>
15 #include <nodes/nodes.h>
16 #include <nodes/plannodes.h>
17 #include <optimizer/optimizer.h>
18 #include <parser/parsetree.h>
19 #include <rewrite/rewriteManip.h>
20 #include <utils/builtins.h>
21 #include <utils/guc.h>
22 #include <utils/lsyscache.h>
23 #include <utils/memutils.h>
24 #include <utils/rel.h>
25 #include <utils/rls.h>
26 #include <continuous_agg.h>
27
28 #include "compat/compat.h"
29 #include "errors.h"
30 #include "chunk_insert_state.h"
31 #include "chunk_dispatch.h"
32 #include "chunk_data_node.h"
33 #include "chunk_dispatch_state.h"
34 #include "chunk_index.h"
35 #include "indexing.h"
36
37 /* Just like ExecPrepareExpr except that it doesn't switch to the query memory context */
38 static inline ExprState *
prepare_constr_expr(Expr * node)39 prepare_constr_expr(Expr *node)
40 {
41 ExprState *result;
42
43 node = expression_planner(node);
44 result = ExecInitExpr(node, NULL);
45
46 return result;
47 }
48
49 /*
50 * Create the constraint exprs inside the current memory context. If this
51 * is not done here, then ExecRelCheck will do it for you but put it into
52 * the query memory context, which will cause a memory leak.
53 *
54 * See the comment in `ts_chunk_insert_state_destroy` for more information
55 * on the implications of this.
56 */
57 static inline void
create_chunk_rri_constraint_expr(ResultRelInfo * rri,Relation rel)58 create_chunk_rri_constraint_expr(ResultRelInfo *rri, Relation rel)
59 {
60 int ncheck, i;
61 ConstrCheck *check;
62
63 Assert(rel->rd_att->constr != NULL && rri->ri_ConstraintExprs == NULL);
64
65 ncheck = rel->rd_att->constr->num_check;
66 check = rel->rd_att->constr->check;
67 rri->ri_ConstraintExprs = (ExprState **) palloc(ncheck * sizeof(ExprState *));
68
69 for (i = 0; i < ncheck; i++)
70 {
71 Expr *checkconstr = stringToNode(check[i].ccbin);
72
73 rri->ri_ConstraintExprs[i] = prepare_constr_expr(checkconstr);
74 }
75 }
76
77 /*
78 * Create a new ResultRelInfo for a chunk.
79 *
80 * The ResultRelInfo holds the executor state (e.g., open relation, indexes, and
81 * options) for the result relation where tuples will be stored.
82 *
83 * The Hypertable ResultRelInfo is used as a template for the chunk's new ResultRelInfo.
84 */
85 static inline ResultRelInfo *
create_chunk_result_relation_info(ChunkDispatch * dispatch,Relation rel)86 create_chunk_result_relation_info(ChunkDispatch *dispatch, Relation rel)
87 {
88 ResultRelInfo *rri;
89 ResultRelInfo *rri_orig = dispatch->hypertable_result_rel_info;
90 Index hyper_rti = rri_orig->ri_RangeTableIndex;
91 rri = makeNode(ResultRelInfo);
92
93 InitResultRelInfo(rri, rel, hyper_rti, NULL, dispatch->estate->es_instrument);
94
95 /* Copy options from the main table's (hypertable's) result relation info */
96 rri->ri_WithCheckOptions = rri_orig->ri_WithCheckOptions;
97 rri->ri_WithCheckOptionExprs = rri_orig->ri_WithCheckOptionExprs;
98 #if PG14_LT
99 rri->ri_junkFilter = rri_orig->ri_junkFilter;
100 #endif
101 rri->ri_projectReturning = rri_orig->ri_projectReturning;
102
103 rri->ri_FdwState = NULL;
104 rri->ri_usesFdwDirectModify = rri_orig->ri_usesFdwDirectModify;
105
106 if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE)
107 rri->ri_FdwRoutine = GetFdwRoutineForRelation(rel, true);
108
109 create_chunk_rri_constraint_expr(rri, rel);
110
111 return rri;
112 }
113
114 static inline ResultRelInfo *
create_compress_chunk_result_relation_info(ChunkDispatch * dispatch,Relation compress_rel)115 create_compress_chunk_result_relation_info(ChunkDispatch *dispatch, Relation compress_rel)
116 {
117 ResultRelInfo *rri = makeNode(ResultRelInfo);
118 ResultRelInfo *rri_orig = dispatch->hypertable_result_rel_info;
119 Index hyper_rti = rri_orig->ri_RangeTableIndex;
120
121 InitResultRelInfo(rri, compress_rel, hyper_rti, NULL, dispatch->estate->es_instrument);
122
123 /* RLS policies are not supported if compression is enabled */
124 Assert(rri_orig->ri_WithCheckOptions == NULL && rri_orig->ri_WithCheckOptionExprs == NULL);
125 Assert(rri_orig->ri_projectReturning == NULL);
126 #if PG14_LT
127 rri->ri_junkFilter = rri_orig->ri_junkFilter;
128 #endif
129
130 /* compressed rel chunk is on data node. Does not need any FDW access on AN */
131 rri->ri_FdwState = NULL;
132 rri->ri_usesFdwDirectModify = false;
133 rri->ri_FdwRoutine = NULL;
134 /* constraints are executed on the orig base chunk. So we do
135 * not call create_chunk_rri_constraint_expr here
136 */
137 return rri;
138 }
139
140 static ProjectionInfo *
get_adjusted_projection_info_returning(ProjectionInfo * orig,List * returning_clauses,TupleConversionMap * map,Index varno,Oid rowtype,TupleDesc chunk_desc)141 get_adjusted_projection_info_returning(ProjectionInfo *orig, List *returning_clauses,
142 TupleConversionMap *map, Index varno, Oid rowtype,
143 TupleDesc chunk_desc)
144 {
145 bool found_whole_row;
146
147 Assert(returning_clauses != NIL);
148
149 /* map hypertable attnos -> chunk attnos */
150 if (map != NULL)
151 returning_clauses = castNode(List,
152 map_variable_attnos_compat((Node *) returning_clauses,
153 varno,
154 0,
155 map->attrMap,
156 map->outdesc->natts,
157 rowtype,
158 &found_whole_row));
159
160 return ExecBuildProjectionInfo(returning_clauses,
161 orig->pi_exprContext,
162 orig->pi_state.resultslot,
163 NULL,
164 chunk_desc);
165 }
166
167 static List *
translate_clause(List * inclause,TupleConversionMap * chunk_map,Index varno,Relation hyper_rel,Relation chunk_rel)168 translate_clause(List *inclause, TupleConversionMap *chunk_map, Index varno, Relation hyper_rel,
169 Relation chunk_rel)
170 {
171 List *clause = copyObject(inclause);
172 bool found_whole_row;
173
174 /* nothing to do here if the chunk_map is NULL */
175 if (!chunk_map)
176 return list_copy(clause);
177
178 /* map hypertable attnos -> chunk attnos for the "excluded" table */
179 clause = castNode(List,
180 map_variable_attnos_compat((Node *) clause,
181 INNER_VAR,
182 0,
183 chunk_map->attrMap,
184 RelationGetDescr(hyper_rel)->natts,
185 RelationGetForm(chunk_rel)->reltype,
186 &found_whole_row));
187
188 /* map hypertable attnos -> chunk attnos for the hypertable */
189 clause = castNode(List,
190 map_variable_attnos_compat((Node *) clause,
191 varno,
192 0,
193 chunk_map->attrMap,
194 RelationGetDescr(hyper_rel)->natts,
195 RelationGetForm(chunk_rel)->reltype,
196 &found_whole_row));
197
198 return clause;
199 }
200
201 #if PG14_LT
202 /*
203 * adjust_hypertable_tlist - from Postgres source code `adjust_partition_tlist`
204 * Adjust the targetlist entries for a given chunk to account for
205 * attribute differences between hypertable and the chunk
206 *
207 * The expressions have already been fixed, but here we fix the list to make
208 * target resnos match the chunk's attribute numbers. This results in a
209 * copy of the original target list in which the entries appear in resno
210 * order, including both the existing entries (that may have their resno
211 * changed in-place) and the newly added entries for columns that don't exist
212 * in the parent.
213 *
214 * Scribbles on the input tlist's entries resno so be aware.
215 */
216 static List *
adjust_hypertable_tlist(List * tlist,TupleConversionMap * map)217 adjust_hypertable_tlist(List *tlist, TupleConversionMap *map)
218 {
219 List *new_tlist = NIL;
220 TupleDesc chunk_tupdesc = map->outdesc;
221 #if PG13_GE
222 AttrNumber *attrMap = map->attrMap->attnums;
223 #else
224 AttrNumber *attrMap = map->attrMap;
225 #endif
226 AttrNumber chunk_attrno;
227
228 for (chunk_attrno = 1; chunk_attrno <= chunk_tupdesc->natts; chunk_attrno++)
229 {
230 Form_pg_attribute att_tup = TupleDescAttr(chunk_tupdesc, chunk_attrno - 1);
231 TargetEntry *tle;
232
233 if (attrMap[chunk_attrno - 1] != InvalidAttrNumber)
234 {
235 Assert(!att_tup->attisdropped);
236
237 /*
238 * Use the corresponding entry from the parent's tlist, adjusting
239 * the resno the match the partition's attno.
240 */
241 tle = (TargetEntry *) list_nth(tlist, attrMap[chunk_attrno - 1] - 1);
242 if (namestrcmp(&att_tup->attname, tle->resname) != 0)
243 elog(ERROR, "invalid translation of ON CONFLICT update statements");
244 tle->resno = chunk_attrno;
245 }
246 else
247 {
248 Const *expr;
249
250 /*
251 * For a dropped attribute in the partition, generate a dummy
252 * entry with resno matching the partition's attno.
253 */
254 Assert(att_tup->attisdropped);
255 expr = makeConst(INT4OID,
256 -1,
257 InvalidOid,
258 sizeof(int32),
259 (Datum) 0,
260 true, /* isnull */
261 true /* byval */);
262 tle = makeTargetEntry((Expr *) expr,
263 chunk_attrno,
264 pstrdup(NameStr(att_tup->attname)),
265 false);
266 }
267 new_tlist = lappend(new_tlist, tle);
268 }
269 return new_tlist;
270 }
271 #endif
272
273 #if PG14_GE
274 /*
275 * adjust_chunk_colnos
276 * Adjust the list of UPDATE target column numbers to account for
277 * attribute differences between the parent and the partition.
278 *
279 * adapted from postgres adjust_partition_colnos
280 */
281 static List *
adjust_chunk_colnos(List * colnos,ResultRelInfo * chunk_rri)282 adjust_chunk_colnos(List *colnos, ResultRelInfo *chunk_rri)
283 {
284 List *new_colnos = NIL;
285 TupleConversionMap *map = ExecGetChildToRootMap(chunk_rri);
286 AttrMap *attrMap;
287 ListCell *lc;
288
289 Assert(map != NULL); /* else we shouldn't be here */
290 attrMap = map->attrMap;
291
292 foreach (lc, colnos)
293 {
294 AttrNumber parentattrno = lfirst_int(lc);
295
296 if (parentattrno <= 0 || parentattrno > attrMap->maplen ||
297 attrMap->attnums[parentattrno - 1] == 0)
298 elog(ERROR, "unexpected attno %d in target column list", parentattrno);
299 new_colnos = lappend_int(new_colnos, attrMap->attnums[parentattrno - 1]);
300 }
301
302 return new_colnos;
303 }
304 #endif
305
306 /*
307 * Setup ON CONFLICT state for a chunk.
308 *
309 * Mostly, this is about mapping attribute numbers from the hypertable root to
310 * a chunk, accounting for differences in the tuple descriptors due to dropped
311 * columns, etc.
312 */
313 static void
setup_on_conflict_state(ChunkInsertState * state,ChunkDispatch * dispatch,TupleConversionMap * chunk_map)314 setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
315 TupleConversionMap *chunk_map)
316 {
317 TupleConversionMap *map = state->hyper_to_chunk_map;
318 ResultRelInfo *chunk_rri = state->result_relation_info;
319 ResultRelInfo *hyper_rri = dispatch->hypertable_result_rel_info;
320 Relation chunk_rel = state->result_relation_info->ri_RelationDesc;
321 Relation hyper_rel = hyper_rri->ri_RelationDesc;
322 ModifyTableState *mtstate = castNode(ModifyTableState, dispatch->dispatch_state->mtstate);
323 ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);
324
325 Assert(ts_chunk_dispatch_get_on_conflict_action(dispatch) == ONCONFLICT_UPDATE);
326
327 OnConflictSetState *onconfl = makeNode(OnConflictSetState);
328 memcpy(onconfl, hyper_rri->ri_onConflict, sizeof(OnConflictSetState));
329 chunk_rri->ri_onConflict = onconfl;
330
331 #if PG14_GE
332 chunk_rri->ri_RootToPartitionMap = map;
333 #endif
334
335 Assert(mt->onConflictSet);
336 Assert(hyper_rri->ri_onConflict != NULL);
337
338 /*
339 * Need a separate existing slot for each partition, as the
340 * partition could be of a different AM, even if the tuple
341 * descriptors match.
342 */
343 onconfl->oc_Existing = table_slot_create(chunk_rri->ri_RelationDesc, NULL);
344 state->existing_slot = onconfl->oc_Existing;
345
346 /*
347 * If the chunk's tuple descriptor matches exactly the hypertable
348 * (the common case), we can re-use most of the parent's ON
349 * CONFLICT SET state, skipping a bunch of work. Otherwise, we
350 * need to create state specific to this partition.
351 */
352 if (!map)
353 {
354 /*
355 * It's safe to reuse these from the hypertable, as we
356 * only process one tuple at a time (therefore we won't
357 * overwrite needed data in slots), and the results of
358 * projections are independent of the underlying storage.
359 * Projections and where clauses themselves don't store state
360 * / are independent of the underlying storage.
361 */
362 onconfl->oc_ProjSlot = hyper_rri->ri_onConflict->oc_ProjSlot;
363 onconfl->oc_ProjInfo = hyper_rri->ri_onConflict->oc_ProjInfo;
364 onconfl->oc_WhereClause = hyper_rri->ri_onConflict->oc_WhereClause;
365 state->conflproj_slot = onconfl->oc_ProjSlot;
366 }
367 else
368 {
369 List *onconflset;
370 #if PG14_GE
371 List *onconflcols;
372 #endif
373
374 /*
375 * Translate expressions in onConflictSet to account for
376 * different attribute numbers. For that, map partition
377 * varattnos twice: first to catch the EXCLUDED
378 * pseudo-relation (INNER_VAR), and second to handle the main
379 * target relation (firstVarno).
380 */
381 onconflset = copyObject(mt->onConflictSet);
382
383 Assert(map->outdesc == RelationGetDescr(chunk_rel));
384
385 if (!chunk_map)
386 chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel),
387 RelationGetDescr(hyper_rel),
388 gettext_noop("could not convert row type"));
389
390 onconflset = translate_clause(onconflset,
391 chunk_map,
392 hyper_rri->ri_RangeTableIndex,
393 hyper_rel,
394 chunk_rel);
395
396 #if PG14_LT
397 onconflset = adjust_hypertable_tlist(onconflset, state->hyper_to_chunk_map);
398 #else
399 chunk_rri->ri_ChildToRootMap = chunk_map;
400 chunk_rri->ri_ChildToRootMapValid = true;
401
402 /* Finally, adjust the target colnos to match the chunk. */
403 if (chunk_map)
404 onconflcols = adjust_chunk_colnos(mt->onConflictCols, chunk_rri);
405 else
406 onconflcols = mt->onConflictCols;
407 #endif
408
409 /* create the tuple slot for the UPDATE SET projection */
410 onconfl->oc_ProjSlot = table_slot_create(chunk_rel, NULL);
411 state->conflproj_slot = onconfl->oc_ProjSlot;
412
413 /* build UPDATE SET projection state */
414 #if PG14_LT
415 ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext;
416 onconfl->oc_ProjInfo = ExecBuildProjectionInfo(onconflset,
417 econtext,
418 state->conflproj_slot,
419 NULL,
420 RelationGetDescr(chunk_rel));
421 #else
422 onconfl->oc_ProjInfo = ExecBuildUpdateProjection(onconflset,
423 true,
424 onconflcols,
425 RelationGetDescr(chunk_rel),
426 mtstate->ps.ps_ExprContext,
427 onconfl->oc_ProjSlot,
428 &mtstate->ps);
429 #endif
430
431 Node *onconflict_where = mt->onConflictWhere;
432
433 /*
434 * Map attribute numbers in the WHERE clause, if it exists.
435 */
436 if (onconflict_where && chunk_map)
437 {
438 List *clause = translate_clause(castNode(List, onconflict_where),
439 chunk_map,
440 hyper_rri->ri_RangeTableIndex,
441 hyper_rel,
442 chunk_rel);
443
444 chunk_rri->ri_onConflict->oc_WhereClause = ExecInitQual(clause, NULL);
445 }
446 }
447 }
448
449 static void
destroy_on_conflict_state(ChunkInsertState * state)450 destroy_on_conflict_state(ChunkInsertState *state)
451 {
452 /*
453 * Clean up per-chunk tuple table slots created for ON CONFLICT handling.
454 */
455 if (NULL != state->existing_slot)
456 ExecDropSingleTupleTableSlot(state->existing_slot);
457
458 /* The ON CONFLICT projection slot is only chunk specific in case the
459 * tuple descriptor didn't match the hypertable */
460 if (NULL != state->hyper_to_chunk_map && NULL != state->conflproj_slot)
461 ExecDropSingleTupleTableSlot(state->conflproj_slot);
462 }
463
464 /* Translate hypertable indexes to chunk indexes in the arbiter clause */
465 static void
set_arbiter_indexes(ChunkInsertState * state,ChunkDispatch * dispatch)466 set_arbiter_indexes(ChunkInsertState *state, ChunkDispatch *dispatch)
467 {
468 List *arbiter_indexes = ts_chunk_dispatch_get_arbiter_indexes(dispatch);
469 ListCell *lc;
470
471 state->arbiter_indexes = NIL;
472
473 foreach (lc, arbiter_indexes)
474 {
475 Oid hypertable_index = lfirst_oid(lc);
476 Chunk *chunk = ts_chunk_get_by_relid(RelationGetRelid(state->rel), true);
477 ChunkIndexMapping cim;
478
479 if (ts_chunk_index_get_by_hypertable_indexrelid(chunk, hypertable_index, &cim) < 1)
480 {
481 /*
482 * In case of distributed hypertables, we don't have information about the
483 * arbiter index on the remote side, so error out with a helpful hint
484 */
485 ereport(ERROR,
486 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
487 errmsg("could not find arbiter index for hypertable index \"%s\" on chunk "
488 "\"%s\"",
489 get_rel_name(hypertable_index),
490 get_rel_name(RelationGetRelid(state->rel))),
491 hypertable_is_distributed(dispatch->hypertable) ?
492 errhint(
493 "Omit the index inference specification for the distributed hypertable"
494 " in the ON CONFLICT clause.") :
495 0));
496 }
497
498 state->arbiter_indexes = lappend_oid(state->arbiter_indexes, cim.indexoid);
499 }
500 state->result_relation_info->ri_onConflictArbiterIndexes = state->arbiter_indexes;
501 }
502
503 /* Change the projections to work with chunks instead of hypertables */
504 static void
adjust_projections(ChunkInsertState * cis,ChunkDispatch * dispatch,Oid rowtype)505 adjust_projections(ChunkInsertState *cis, ChunkDispatch *dispatch, Oid rowtype)
506 {
507 ResultRelInfo *chunk_rri = cis->result_relation_info;
508 Relation hyper_rel = dispatch->hypertable_result_rel_info->ri_RelationDesc;
509 Relation chunk_rel = cis->rel;
510 TupleConversionMap *chunk_map = NULL;
511 OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
512
513 if (ts_chunk_dispatch_has_returning(dispatch))
514 {
515 /*
516 * We need the opposite map from cis->hyper_to_chunk_map. The map needs
517 * to have the hypertable_desc in the out spot for map_variable_attnos
518 * to work correctly in mapping hypertable attnos->chunk attnos.
519 */
520 chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel),
521 RelationGetDescr(hyper_rel),
522 gettext_noop("could not convert row type"));
523
524 chunk_rri->ri_projectReturning =
525 get_adjusted_projection_info_returning(chunk_rri->ri_projectReturning,
526 ts_chunk_dispatch_get_returning_clauses(
527 dispatch),
528 chunk_map,
529 dispatch->hypertable_result_rel_info
530 ->ri_RangeTableIndex,
531 rowtype,
532 RelationGetDescr(chunk_rel));
533 }
534
535 /* Set the chunk's arbiter indexes for ON CONFLICT statements */
536 if (onconflict_action != ONCONFLICT_NONE)
537 {
538 set_arbiter_indexes(cis, dispatch);
539
540 if (onconflict_action == ONCONFLICT_UPDATE)
541 setup_on_conflict_state(cis, dispatch, chunk_map);
542 }
543 }
544
545 /* Assumption: we have acquired a lock on the chunk table. This is
546 * important because lock acquisition order is always orignal chunk,
547 * followed by compressed chunk to prevent deadlocks.
548 * We now try to acquire a lock on the compressed chunk, if one exists.
549 * Note that the insert could have been blocked by a recompress_chunk operation.
550 * So the compressed chunk could have moved under us. We need to refetch the chunk
551 * to get the correct compressed chunk id (github issue 3400)
552 */
553 static Relation
lock_associated_compressed_chunk(int32 chunk_id,bool * has_compressed_chunk)554 lock_associated_compressed_chunk(int32 chunk_id, bool *has_compressed_chunk)
555 {
556 Relation compress_rel = NULL;
557 Chunk *orig_chunk = ts_chunk_get_by_id(chunk_id, true);
558 Oid compress_chunk_relid = InvalidOid;
559 *has_compressed_chunk = false;
560 if (orig_chunk->fd.compressed_chunk_id)
561 compress_chunk_relid = ts_chunk_get_relid(orig_chunk->fd.compressed_chunk_id, false);
562 if (compress_chunk_relid != InvalidOid)
563 {
564 *has_compressed_chunk = true;
565 compress_rel = table_open(compress_chunk_relid, RowExclusiveLock);
566 }
567 return compress_rel;
568 }
569
570 /*
571 * Create new insert chunk state.
572 *
573 * This is essentially a ResultRelInfo for a chunk. Initialization of the
574 * ResultRelInfo should be similar to ExecInitModifyTable().
575 */
576 extern ChunkInsertState *
ts_chunk_insert_state_create(const Chunk * chunk,ChunkDispatch * dispatch)577 ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
578 {
579 ChunkInsertState *state;
580 Relation rel, parent_rel, compress_rel = NULL;
581 MemoryContext old_mcxt;
582 MemoryContext cis_context = AllocSetContextCreate(dispatch->estate->es_query_cxt,
583 "chunk insert state memory context",
584 ALLOCSET_DEFAULT_SIZES);
585 OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
586 ResultRelInfo *resrelinfo, *relinfo;
587 bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0);
588
589 /* permissions NOT checked here; were checked at hypertable level */
590 if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED)
591 ereport(ERROR,
592 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
593 errmsg("hypertables do not support row-level security")));
594
595 if (chunk->relkind != RELKIND_RELATION && chunk->relkind != RELKIND_FOREIGN_TABLE)
596 elog(ERROR, "insert is not on a table");
597
598 if (has_compressed_chunk &&
599 (onconflict_action != ONCONFLICT_NONE || ts_chunk_dispatch_has_returning(dispatch)))
600 ereport(ERROR,
601 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
602 errmsg("insert with ON CONFLICT or RETURNING clause is not supported on "
603 "compressed chunks")));
604
605 /*
606 * We must allocate the range table entry on the executor's per-query
607 * context
608 */
609 old_mcxt = MemoryContextSwitchTo(dispatch->estate->es_query_cxt);
610
611 rel = table_open(chunk->table_id, RowExclusiveLock);
612 if (has_compressed_chunk && ts_indexing_relation_has_primary_or_unique_index(rel))
613 {
614 table_close(rel, RowExclusiveLock);
615 ereport(ERROR,
616 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
617 errmsg("insert into a compressed chunk that has primary or unique constraint is "
618 "not supported")));
619 }
620 compress_rel = lock_associated_compressed_chunk(chunk->fd.id, &has_compressed_chunk);
621
622 MemoryContextSwitchTo(cis_context);
623 relinfo = create_chunk_result_relation_info(dispatch, rel);
624 if (!has_compressed_chunk)
625 resrelinfo = relinfo;
626 else
627 {
628 /* insert the tuple into the compressed chunk instead */
629 resrelinfo = create_compress_chunk_result_relation_info(dispatch, compress_rel);
630 }
631 CheckValidResultRel(resrelinfo, ts_chunk_dispatch_get_cmd_type(dispatch));
632
633 state = palloc0(sizeof(ChunkInsertState));
634 state->mctx = cis_context;
635 state->rel = rel;
636 state->result_relation_info = resrelinfo;
637 state->estate = dispatch->estate;
638
639 if (resrelinfo->ri_RelationDesc->rd_rel->relhasindex &&
640 resrelinfo->ri_IndexRelationDescs == NULL)
641 ExecOpenIndices(resrelinfo, onconflict_action != ONCONFLICT_NONE);
642
643 if (relinfo->ri_TrigDesc != NULL)
644 {
645 TriggerDesc *tg = relinfo->ri_TrigDesc;
646
647 /* instead of triggers can only be created on VIEWs */
648 Assert(!tg->trig_insert_instead_row);
649
650 /*
651 * A statement that targets a parent table in an inheritance or
652 * partitioning hierarchy does not cause the statement-level triggers
653 * of affected child tables to be fired; only the parent table's
654 * statement-level triggers are fired. However, row-level triggers
655 * of any affected child tables will be fired.
656 * During chunk creation we only copy ROW trigger to chunks so
657 * statement triggers should not exist on chunks.
658 */
659 if (tg->trig_insert_after_statement || tg->trig_insert_before_statement)
660 elog(ERROR, "statement trigger on chunk table not supported");
661
662 /* AFTER ROW triggers do not work since we redirect the insert
663 * to the compressed chunk. We still want cagg triggers to fire.
664 * We'll call them directly. But raise an error if there are
665 * other triggers
666 */
667 if (has_compressed_chunk && tg->trig_insert_after_row)
668 {
669 StringInfo trigger_list = makeStringInfo();
670 Assert(tg->numtriggers > 0);
671 for (int i = 0; i < tg->numtriggers; i++)
672 {
673 if (strncmp(tg->triggers[i].tgname,
674 CAGGINVAL_TRIGGER_NAME,
675 strlen(CAGGINVAL_TRIGGER_NAME)) == 0)
676 continue;
677 if (i > 0)
678 appendStringInfoString(trigger_list, ", ");
679 appendStringInfoString(trigger_list, tg->triggers[i].tgname);
680 }
681 if (trigger_list->len != 0)
682 {
683 ereport(ERROR,
684 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
685 errmsg("after insert row trigger on compressed chunk not supported"),
686 errdetail("Triggers: %s", trigger_list->data),
687 errhint("Decompress the chunk first before inserting into it.")));
688 }
689 }
690 }
691
692 parent_rel = table_open(dispatch->hypertable->main_table_relid, AccessShareLock);
693
694 /* Set tuple conversion map, if tuple needs conversion. We don't want to
695 * convert tuples going into foreign tables since these are actually sent to
696 * data nodes for insert on that node's local hypertable. */
697 if (chunk->relkind != RELKIND_FOREIGN_TABLE)
698 state->hyper_to_chunk_map =
699 convert_tuples_by_name_compat(RelationGetDescr(parent_rel),
700 RelationGetDescr(rel),
701 gettext_noop("could not convert row type"));
702
703 // relinfo->ri_RootToPartitionMap = state->hyper_to_chunk_map;
704 // relinfo->ri_PartitionTupleSlot = table_slot_create(relinfo->ri_RelationDesc,
705 // &state->estate->es_tupleTable);
706
707 adjust_projections(state, dispatch, RelationGetForm(rel)->reltype);
708
709 if (has_compressed_chunk)
710 {
711 int32 htid = ts_hypertable_relid_to_id(chunk->hypertable_relid);
712 /* this is true as compressed chunks are not created on access node */
713 Assert(chunk->relkind != RELKIND_FOREIGN_TABLE);
714 Assert(compress_rel != NULL);
715 state->compress_rel = compress_rel;
716 Assert(ts_cm_functions->compress_row_init != NULL);
717 /* need a way to convert from chunk tuple to compressed chunk tuple */
718 state->compress_state = ts_cm_functions->compress_row_init(htid, rel, compress_rel);
719 state->orig_result_relation_info = relinfo;
720 }
721 else
722 {
723 state->compress_state = NULL;
724 }
725
726 /* Need a tuple table slot to store tuples going into this chunk. We don't
727 * want this slot tied to the executor's tuple table, since that would tie
728 * the slot's lifetime to the entire length of the execution and we want
729 * to be able to dynamically create and destroy chunk insert
730 * state. Otherwise, memory might blow up when there are many chunks being
731 * inserted into. This also means that the slot needs to be destroyed with
732 * the chunk insert state. */
733 state->slot = MakeSingleTupleTableSlot(RelationGetDescr(relinfo->ri_RelationDesc),
734 table_slot_callbacks(relinfo->ri_RelationDesc));
735 table_close(parent_rel, AccessShareLock);
736
737 state->chunk_id = chunk->fd.id;
738
739 if (chunk->relkind == RELKIND_FOREIGN_TABLE)
740 {
741 RangeTblEntry *rte =
742 rt_fetch(resrelinfo->ri_RangeTableIndex, dispatch->estate->es_range_table);
743
744 Assert(rte != NULL);
745
746 state->user_id = OidIsValid(rte->checkAsUser) ? rte->checkAsUser : GetUserId();
747 state->chunk_data_nodes = ts_chunk_data_nodes_copy(chunk);
748 }
749
750 if (dispatch->hypertable_result_rel_info->ri_usesFdwDirectModify)
751 {
752 /* If the hypertable is setup for direct modify, we do not really use
753 * the FDW. Instead exploit the FdwPrivate pointer to pass on the
754 * chunk insert state to DataNodeDispatch so that it knows which data nodes
755 * to insert into. */
756 resrelinfo->ri_FdwState = state;
757 }
758 else if (resrelinfo->ri_FdwRoutine && !resrelinfo->ri_usesFdwDirectModify &&
759 resrelinfo->ri_FdwRoutine->BeginForeignModify)
760 {
761 /*
762 * If this is a chunk located one or more data nodes, setup the
763 * foreign data wrapper state for the chunk. The private fdw data was
764 * created at the planning stage and contains, among other things, a
765 * deparsed insert statement for the hypertable.
766 */
767 ModifyTableState *mtstate = dispatch->dispatch_state->mtstate;
768 ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);
769 List *fdwprivate = linitial_node(List, mt->fdwPrivLists);
770
771 Assert(NIL != fdwprivate);
772 /*
773 * Since the fdwprivate data is part of the plan it must only
774 * consist of Node objects that can be copied. Therefore, we
775 * cannot directly append the non-Node ChunkInsertState to the
776 * private data. Instead, we make a copy of the private data
777 * before passing it on to the FDW handler function. In the
778 * FDW, the ChunkInsertState will be at the offset defined by
779 * the FdwModifyPrivateChunkInsertState (see
780 * tsl/src/fdw/timescaledb_fdw.c).
781 */
782 fdwprivate = lappend(list_copy(fdwprivate), state);
783 resrelinfo->ri_FdwRoutine->BeginForeignModify(mtstate,
784 resrelinfo,
785 fdwprivate,
786 0,
787 dispatch->eflags);
788 }
789
790 MemoryContextSwitchTo(old_mcxt);
791
792 return state;
793 }
794
795 extern void
ts_chunk_insert_state_destroy(ChunkInsertState * state)796 ts_chunk_insert_state_destroy(ChunkInsertState *state)
797 {
798 ResultRelInfo *rri = state->result_relation_info;
799
800 if (rri->ri_FdwRoutine && !rri->ri_usesFdwDirectModify && rri->ri_FdwRoutine->EndForeignModify)
801 rri->ri_FdwRoutine->EndForeignModify(state->estate, rri);
802
803 destroy_on_conflict_state(state);
804 ExecCloseIndices(state->result_relation_info);
805
806 if (state->compress_rel)
807 {
808 ResultRelInfo *orig_chunk_rri = state->orig_result_relation_info;
809 Oid chunk_relid = RelationGetRelid(orig_chunk_rri->ri_RelationDesc);
810 ts_cm_functions->compress_row_end(state->compress_state);
811 ts_cm_functions->compress_row_destroy(state->compress_state);
812 Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
813 if (!ts_chunk_is_unordered(chunk))
814 ts_chunk_set_unordered(chunk);
815 table_close(state->compress_rel, NoLock);
816 }
817 else if (RelationGetForm(state->result_relation_info->ri_RelationDesc)->relkind ==
818 RELKIND_FOREIGN_TABLE)
819 {
820 /* If a distributed chunk shows compressed status on AN,
821 * we mark it as unordered , because the insert now goes into
822 * a previously compressed chunk
823 */
824 Oid chunk_relid = RelationGetRelid(state->result_relation_info->ri_RelationDesc);
825 Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
826 if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_unordered(chunk)))
827 ts_chunk_set_unordered(chunk);
828 }
829
830 table_close(state->rel, NoLock);
831 if (state->slot)
832 ExecDropSingleTupleTableSlot(state->slot);
833
834 /*
835 * Postgres stores cached row types from `get_cached_rowtype` in the
836 * constraint expression and tries to free this type via a callback from the
837 * `per_tuple_exprcontext`. Since we create constraint expressions within
838 * the chunk insert state memory context, this leads to a series of pointers
839 * structured like: `per_tuple_exprcontext -> constraint expr (in chunk
840 * insert state) -> cached row type` if we try to free the the chunk insert
841 * state MemoryContext while the `es_per_tuple_exprcontext` is live,
842 * postgres tries to dereference a dangling pointer in one of
843 * `es_per_tuple_exprcontext`'s callbacks. Normally postgres allocates the
844 * constraint expressions in a parent context of per_tuple_exprcontext so
845 * there is no issue, however we've run into excessive memory usage due to
846 * too many constraints, and want to allocate them for a shorter lifetime so
847 * we free them when SubspaceStore gets to full. This leaves us with a
848 * memory context relationship like the following:
849 *
850 * query_ctx
851 * / \
852 * / \
853 * CIS per_tuple
854 *
855 *
856 * To ensure this doesn't create dangling pointers from the per-tuple
857 * context to the chunk insert state (CIS) when we destroy the CIS, we avoid
858 * freeing the CIS memory context immediately. Instead we change its parent
859 * to be the per-tuple context (if it is still alive) so that it is only
860 * freed once that context is freed:
861 *
862 * query_ctx
863 * \
864 * \
865 * per_tuple
866 * \
867 * \
868 * CIS
869 *
870 * Note that a previous approach registered the chunk insert state (CIS) to
871 * be freed by a reset callback on the per-tuple context. That caused a
872 * subtle bug because both the per-tuple context and the CIS share the same
873 * parent. Thus, the callback on a child would trigger the deletion of a
874 * sibling, leading to a cyclic relationship:
875 *
876 * query_ctx
877 * / \
878 * / \
879 * CIS <-- per_tuple
880 *
881 *
882 * With this cycle, a delete of the query_ctx could first trigger a delete
883 * of the CIS (if not already deleted), then the per_tuple context, followed
884 * by the CIS again (via the callback), and thus a crash.
885 */
886 if (state->estate->es_per_tuple_exprcontext)
887 MemoryContextSetParent(state->mctx,
888 state->estate->es_per_tuple_exprcontext->ecxt_per_tuple_memory);
889 else
890 MemoryContextDelete(state->mctx);
891 }
892