1 /*
2  * This file and its contents are licensed under the Apache License 2.0.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-APACHE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <nodes/nodes.h>
8 #include <nodes/extensible.h>
9 #include <nodes/makefuncs.h>
10 #include <nodes/nodeFuncs.h>
11 #include <utils/rel.h>
12 #include <catalog/pg_type.h>
13 
14 #include "compat/compat.h"
15 #include "chunk_dispatch.h"
16 #include "chunk_insert_state.h"
17 #include "subspace_store.h"
18 #include "dimension.h"
19 #include "guc.h"
20 
21 ChunkDispatch *
ts_chunk_dispatch_create(Hypertable * ht,EState * estate,int eflags)22 ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags)
23 {
24 	ChunkDispatch *cd = palloc0(sizeof(ChunkDispatch));
25 
26 	cd->hypertable = ht;
27 	cd->estate = estate;
28 	cd->eflags = eflags;
29 	cd->hypertable_result_rel_info = NULL;
30 	cd->cache =
31 		ts_subspace_store_init(ht->space, estate->es_query_cxt, ts_guc_max_open_chunks_per_insert);
32 	cd->prev_cis = NULL;
33 	cd->prev_cis_oid = InvalidOid;
34 
35 	return cd;
36 }
37 
38 static inline ModifyTableState *
get_modifytable_state(const ChunkDispatch * dispatch)39 get_modifytable_state(const ChunkDispatch *dispatch)
40 {
41 	return dispatch->dispatch_state->mtstate;
42 }
43 
44 static inline ModifyTable *
get_modifytable(const ChunkDispatch * dispatch)45 get_modifytable(const ChunkDispatch *dispatch)
46 {
47 	return castNode(ModifyTable, get_modifytable_state(dispatch)->ps.plan);
48 }
49 
50 bool
ts_chunk_dispatch_has_returning(const ChunkDispatch * dispatch)51 ts_chunk_dispatch_has_returning(const ChunkDispatch *dispatch)
52 {
53 	if (!dispatch->dispatch_state)
54 		return false;
55 	return get_modifytable(dispatch)->returningLists != NIL;
56 }
57 
58 List *
ts_chunk_dispatch_get_returning_clauses(const ChunkDispatch * dispatch)59 ts_chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
60 {
61 #if PG14_LT
62 	ModifyTableState *mtstate = get_modifytable_state(dispatch);
63 	return list_nth(get_modifytable(dispatch)->returningLists, mtstate->mt_whichplan);
64 #else
65 	Assert(list_length(get_modifytable(dispatch)->returningLists) == 1);
66 	return linitial(get_modifytable(dispatch)->returningLists);
67 #endif
68 }
69 
70 List *
ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch * dispatch)71 ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch *dispatch)
72 {
73 	return dispatch->dispatch_state->arbiter_indexes;
74 }
75 
76 OnConflictAction
ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch * dispatch)77 ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
78 {
79 	if (!dispatch->dispatch_state)
80 		return ONCONFLICT_NONE;
81 	return get_modifytable(dispatch)->onConflictAction;
82 }
83 
84 List *
ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch * dispatch)85 ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch *dispatch)
86 {
87 	return get_modifytable(dispatch)->onConflictSet;
88 }
89 
90 CmdType
ts_chunk_dispatch_get_cmd_type(const ChunkDispatch * dispatch)91 ts_chunk_dispatch_get_cmd_type(const ChunkDispatch *dispatch)
92 {
93 	return dispatch->dispatch_state == NULL ? CMD_INSERT :
94 											  dispatch->dispatch_state->mtstate->operation;
95 }
96 
97 void
ts_chunk_dispatch_destroy(ChunkDispatch * cd)98 ts_chunk_dispatch_destroy(ChunkDispatch *cd)
99 {
100 	ts_subspace_store_free(cd->cache);
101 }
102 
103 static void
destroy_chunk_insert_state(void * cis)104 destroy_chunk_insert_state(void *cis)
105 {
106 	ts_chunk_insert_state_destroy((ChunkInsertState *) cis);
107 }
108 
109 /*
110  * Get the chunk insert state for the chunk that matches the given point in the
111  * partitioned hyperspace.
112  */
113 extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch * dispatch,Point * point,const on_chunk_changed_func on_chunk_changed,void * data)114 ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
115 										 const on_chunk_changed_func on_chunk_changed, void *data)
116 {
117 	ChunkInsertState *cis;
118 	bool cis_changed = true;
119 
120 	cis = ts_subspace_store_get(dispatch->cache, point);
121 
122 	if (NULL == cis)
123 	{
124 		Chunk *new_chunk;
125 
126 		new_chunk = ts_hypertable_get_or_create_chunk(dispatch->hypertable, point);
127 
128 		if (NULL == new_chunk)
129 			elog(ERROR, "no chunk found or created");
130 
131 		cis = ts_chunk_insert_state_create(new_chunk, dispatch);
132 		ts_subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state);
133 	}
134 	else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
135 	{
136 		/* got the same item from cache as before */
137 		cis_changed = false;
138 	}
139 
140 	if (cis_changed && on_chunk_changed)
141 		on_chunk_changed(cis, data);
142 
143 	Assert(cis != NULL);
144 	dispatch->prev_cis = cis;
145 	dispatch->prev_cis_oid = cis->rel->rd_id;
146 	return cis;
147 }
148