1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <nodes/nodes.h>
8 #include <nodes/extensible.h>
9 #include <nodes/makefuncs.h>
10 #include <nodes/nodeFuncs.h>
11 #include <utils/rel.h>
12 #include <catalog/pg_type.h>
13
14 #include "compat/compat.h"
15 #include "chunk_dispatch.h"
16 #include "chunk_insert_state.h"
17 #include "subspace_store.h"
18 #include "dimension.h"
19 #include "guc.h"
20
21 ChunkDispatch *
ts_chunk_dispatch_create(Hypertable * ht,EState * estate,int eflags)22 ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags)
23 {
24 ChunkDispatch *cd = palloc0(sizeof(ChunkDispatch));
25
26 cd->hypertable = ht;
27 cd->estate = estate;
28 cd->eflags = eflags;
29 cd->hypertable_result_rel_info = NULL;
30 cd->cache =
31 ts_subspace_store_init(ht->space, estate->es_query_cxt, ts_guc_max_open_chunks_per_insert);
32 cd->prev_cis = NULL;
33 cd->prev_cis_oid = InvalidOid;
34
35 return cd;
36 }
37
38 static inline ModifyTableState *
get_modifytable_state(const ChunkDispatch * dispatch)39 get_modifytable_state(const ChunkDispatch *dispatch)
40 {
41 return dispatch->dispatch_state->mtstate;
42 }
43
44 static inline ModifyTable *
get_modifytable(const ChunkDispatch * dispatch)45 get_modifytable(const ChunkDispatch *dispatch)
46 {
47 return castNode(ModifyTable, get_modifytable_state(dispatch)->ps.plan);
48 }
49
50 bool
ts_chunk_dispatch_has_returning(const ChunkDispatch * dispatch)51 ts_chunk_dispatch_has_returning(const ChunkDispatch *dispatch)
52 {
53 if (!dispatch->dispatch_state)
54 return false;
55 return get_modifytable(dispatch)->returningLists != NIL;
56 }
57
58 List *
ts_chunk_dispatch_get_returning_clauses(const ChunkDispatch * dispatch)59 ts_chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
60 {
61 #if PG14_LT
62 ModifyTableState *mtstate = get_modifytable_state(dispatch);
63 return list_nth(get_modifytable(dispatch)->returningLists, mtstate->mt_whichplan);
64 #else
65 Assert(list_length(get_modifytable(dispatch)->returningLists) == 1);
66 return linitial(get_modifytable(dispatch)->returningLists);
67 #endif
68 }
69
70 List *
ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch * dispatch)71 ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch *dispatch)
72 {
73 return dispatch->dispatch_state->arbiter_indexes;
74 }
75
76 OnConflictAction
ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch * dispatch)77 ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
78 {
79 if (!dispatch->dispatch_state)
80 return ONCONFLICT_NONE;
81 return get_modifytable(dispatch)->onConflictAction;
82 }
83
84 List *
ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch * dispatch)85 ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch *dispatch)
86 {
87 return get_modifytable(dispatch)->onConflictSet;
88 }
89
90 CmdType
ts_chunk_dispatch_get_cmd_type(const ChunkDispatch * dispatch)91 ts_chunk_dispatch_get_cmd_type(const ChunkDispatch *dispatch)
92 {
93 return dispatch->dispatch_state == NULL ? CMD_INSERT :
94 dispatch->dispatch_state->mtstate->operation;
95 }
96
97 void
ts_chunk_dispatch_destroy(ChunkDispatch * cd)98 ts_chunk_dispatch_destroy(ChunkDispatch *cd)
99 {
100 ts_subspace_store_free(cd->cache);
101 }
102
103 static void
destroy_chunk_insert_state(void * cis)104 destroy_chunk_insert_state(void *cis)
105 {
106 ts_chunk_insert_state_destroy((ChunkInsertState *) cis);
107 }
108
109 /*
110 * Get the chunk insert state for the chunk that matches the given point in the
111 * partitioned hyperspace.
112 */
113 extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch * dispatch,Point * point,const on_chunk_changed_func on_chunk_changed,void * data)114 ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
115 const on_chunk_changed_func on_chunk_changed, void *data)
116 {
117 ChunkInsertState *cis;
118 bool cis_changed = true;
119
120 cis = ts_subspace_store_get(dispatch->cache, point);
121
122 if (NULL == cis)
123 {
124 Chunk *new_chunk;
125
126 new_chunk = ts_hypertable_get_or_create_chunk(dispatch->hypertable, point);
127
128 if (NULL == new_chunk)
129 elog(ERROR, "no chunk found or created");
130
131 cis = ts_chunk_insert_state_create(new_chunk, dispatch);
132 ts_subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state);
133 }
134 else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
135 {
136 /* got the same item from cache as before */
137 cis_changed = false;
138 }
139
140 if (cis_changed && on_chunk_changed)
141 on_chunk_changed(cis, data);
142
143 Assert(cis != NULL);
144 dispatch->prev_cis = cis;
145 dispatch->prev_cis_oid = cis->rel->rd_id;
146 return cis;
147 }
148