1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <parser/parsetree.h>
8 #include <access/sysattr.h>
9 #include <utils/rel.h>
10
11 #include <chunk.h>
12 #include <chunk_data_node.h>
13
14 #include "deparse.h"
15 #include "modify_plan.h"
16
17 static List *
get_insert_attrs(Relation rel)18 get_insert_attrs(Relation rel)
19 {
20 TupleDesc tupdesc = RelationGetDescr(rel);
21 List *attrs = NIL;
22 int i;
23
24 for (i = 0; i < tupdesc->natts; i++)
25 {
26 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
27
28 if (!attr->attisdropped)
29 attrs = lappend_int(attrs, AttrOffsetGetAttrNumber(i));
30 }
31
32 return attrs;
33 }
34
35 static List *
get_update_attrs(RangeTblEntry * rte)36 get_update_attrs(RangeTblEntry *rte)
37 {
38 List *attrs = NIL;
39 int col = -1;
40
41 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
42 {
43 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
44 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
45
46 if (attno <= InvalidAttrNumber) /* shouldn't happen */
47 elog(ERROR, "system-column update is not supported");
48
49 attrs = lappend_int(attrs, attno);
50 }
51
52 return attrs;
53 }
54
55 static List *
get_chunk_data_nodes(Oid relid)56 get_chunk_data_nodes(Oid relid)
57 {
58 Chunk *chunk = ts_chunk_get_by_relid(relid, false);
59 List *serveroids = NIL;
60 ListCell *lc;
61
62 if (NULL == chunk)
63 return NIL;
64
65 foreach (lc, chunk->data_nodes)
66 {
67 ChunkDataNode *cs = lfirst(lc);
68
69 serveroids = lappend_oid(serveroids, cs->foreign_server_oid);
70 }
71
72 return serveroids;
73 }
74
75 /*
76 * Plan INSERT, UPDATE, and DELETE.
77 *
78 * The main task of this function is to generate (deparse) the SQL statement
79 * for the corresponding tables on data nodes.
80 *
81 * If the planning involves a hypertable, the function is called differently
82 * depending on the command:
83 *
84 * 1. INSERT - called only once during hypertable planning and the given
85 * result relation is the hypertable root relation. This is due to
86 * TimescaleDBs unique INSERT path. We'd like to plan the INSERT as if it
87 * would happen on the root of the hypertable. This is useful because INSERTs
88 * should occur via the top-level hypertables on the data nodes
89 * (preferrably batched), and not once per individual remote chunk
90 * (inefficient and won't go through the standard INSERT path on the data
91 * node).
92 *
93 * 2. UPDATE and DELETE - called once per chunk and the given result relation
94 * is the chunk relation.
95 *
96 * For non-hypertables, which are foreign tables using the timescaledb_fdw,
97 * this function is called the way it normally would be for the FDW API, i.e.,
98 * once during planning.
99 *
100 * For the TimescaleDB insert path, we actually call
101 * this function only once on the hypertable's root table instead of once per
102 * chunk. This is because we want to send INSERT statements to each remote
103 * hypertable rather than each remote chunk.
104 *
105 * UPDATEs and DELETEs work slightly different since we have no "optimized"
106 * path for such operations. Instead, they happen once per chunk.
107 */
108 List *
fdw_plan_foreign_modify(PlannerInfo * root,ModifyTable * plan,Index result_relation,int subplan_index)109 fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relation,
110 int subplan_index)
111 {
112 CmdType operation = plan->operation;
113 RangeTblEntry *rte = planner_rt_fetch(result_relation, root);
114 Relation rel;
115 StringInfoData sql;
116 List *returning_list = NIL;
117 List *retrieved_attrs = NIL;
118 List *target_attrs = NIL;
119 List *data_nodes = NIL;
120 bool do_nothing = false;
121
122 initStringInfo(&sql);
123
124 /*
125 * Extract the relevant RETURNING list if any.
126 */
127 if (plan->returningLists)
128 returning_list = (List *) list_nth(plan->returningLists, subplan_index);
129
130 /*
131 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
132 * should have already been rejected in the optimizer, as presently there
133 * is no way to recognize an arbiter index on a foreign table. Only DO
134 * NOTHING is supported without an inference specification.
135 */
136 if (plan->onConflictAction == ONCONFLICT_NOTHING)
137 do_nothing = true;
138 else if (plan->onConflictAction != ONCONFLICT_NONE)
139 ereport(ERROR,
140 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
141 errmsg("ON CONFLICT DO UPDATE not supported"
142 " on distributed hypertables")));
143
144 /*
145 * Core code already has some lock on each rel being planned, so we can
146 * use NoLock here.
147 */
148 rel = table_open(rte->relid, NoLock);
149
150 /*
151 * Construct the SQL command string
152 *
153 * In an INSERT, we transmit all columns that are defined in the foreign
154 * table. In an UPDATE, we transmit only columns that were explicitly
155 * targets of the UPDATE, so as to avoid unnecessary data transmission.
156 * (We can't do that for INSERT since we would miss sending default values
157 * for columns not listed in the source statement.)
158 */
159 switch (operation)
160 {
161 case CMD_INSERT:
162 target_attrs = get_insert_attrs(rel);
163 deparseInsertSql(&sql,
164 rte,
165 result_relation,
166 rel,
167 target_attrs,
168 1,
169 do_nothing,
170 returning_list,
171 &retrieved_attrs);
172 break;
173 case CMD_UPDATE:
174 target_attrs = get_update_attrs(rte);
175 deparseUpdateSql(&sql,
176 rte,
177 result_relation,
178 rel,
179 target_attrs,
180 returning_list,
181 &retrieved_attrs);
182 data_nodes = get_chunk_data_nodes(rel->rd_id);
183 break;
184 case CMD_DELETE:
185 deparseDeleteSql(&sql, rte, result_relation, rel, returning_list, &retrieved_attrs);
186 data_nodes = get_chunk_data_nodes(rel->rd_id);
187 break;
188 default:
189 elog(ERROR, "unexpected operation: %d", (int) operation);
190 break;
191 }
192
193 table_close(rel, NoLock);
194
195 /*
196 * Build the fdw_private list that will be available to the executor.
197 * Items in the list must match enum FdwModifyPrivateIndex, above.
198 */
199 return list_make5(makeString(sql.data),
200 target_attrs,
201 makeInteger((retrieved_attrs != NIL)),
202 retrieved_attrs,
203 data_nodes);
204 }
205