1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <parser/parsetree.h>
8 #include <access/sysattr.h>
9 #include <utils/rel.h>
10 
11 #include <chunk.h>
12 #include <chunk_data_node.h>
13 
14 #include "deparse.h"
15 #include "modify_plan.h"
16 
17 static List *
get_insert_attrs(Relation rel)18 get_insert_attrs(Relation rel)
19 {
20 	TupleDesc tupdesc = RelationGetDescr(rel);
21 	List *attrs = NIL;
22 	int i;
23 
24 	for (i = 0; i < tupdesc->natts; i++)
25 	{
26 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
27 
28 		if (!attr->attisdropped)
29 			attrs = lappend_int(attrs, AttrOffsetGetAttrNumber(i));
30 	}
31 
32 	return attrs;
33 }
34 
35 static List *
get_update_attrs(RangeTblEntry * rte)36 get_update_attrs(RangeTblEntry *rte)
37 {
38 	List *attrs = NIL;
39 	int col = -1;
40 
41 	while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
42 	{
43 		/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
44 		AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
45 
46 		if (attno <= InvalidAttrNumber) /* shouldn't happen */
47 			elog(ERROR, "system-column update is not supported");
48 
49 		attrs = lappend_int(attrs, attno);
50 	}
51 
52 	return attrs;
53 }
54 
55 static List *
get_chunk_data_nodes(Oid relid)56 get_chunk_data_nodes(Oid relid)
57 {
58 	Chunk *chunk = ts_chunk_get_by_relid(relid, false);
59 	List *serveroids = NIL;
60 	ListCell *lc;
61 
62 	if (NULL == chunk)
63 		return NIL;
64 
65 	foreach (lc, chunk->data_nodes)
66 	{
67 		ChunkDataNode *cs = lfirst(lc);
68 
69 		serveroids = lappend_oid(serveroids, cs->foreign_server_oid);
70 	}
71 
72 	return serveroids;
73 }
74 
75 /*
76  * Plan INSERT, UPDATE, and DELETE.
77  *
78  * The main task of this function is to generate (deparse) the SQL statement
79  * for the corresponding tables on data nodes.
80  *
81  * If the planning involves a hypertable, the function is called differently
82  * depending on the command:
83  *
84  * 1. INSERT - called only once during hypertable planning and the given
85  * result relation is the hypertable root relation. This is due to
86  * TimescaleDBs unique INSERT path. We'd like to plan the INSERT as if it
87  * would happen on the root of the hypertable. This is useful because INSERTs
88  * should occur via the top-level hypertables on the data nodes
89  * (preferrably batched), and not once per individual remote chunk
90  * (inefficient and won't go through the standard INSERT path on the data
91  * node).
92  *
93  * 2. UPDATE and DELETE - called once per chunk and the given result relation
94  * is the chunk relation.
95  *
96  * For non-hypertables, which are foreign tables using the timescaledb_fdw,
97  * this function is called the way it normally would be for the FDW API, i.e.,
98  * once during planning.
99  *
100  * For the TimescaleDB insert path, we actually call
101  * this function only once on the hypertable's root table instead of once per
102  * chunk. This is because we want to send INSERT statements to each remote
103  * hypertable rather than each remote chunk.
104  *
105  * UPDATEs and DELETEs work slightly different since we have no "optimized"
106  * path for such operations. Instead, they happen once per chunk.
107  */
108 List *
fdw_plan_foreign_modify(PlannerInfo * root,ModifyTable * plan,Index result_relation,int subplan_index)109 fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relation,
110 						int subplan_index)
111 {
112 	CmdType operation = plan->operation;
113 	RangeTblEntry *rte = planner_rt_fetch(result_relation, root);
114 	Relation rel;
115 	StringInfoData sql;
116 	List *returning_list = NIL;
117 	List *retrieved_attrs = NIL;
118 	List *target_attrs = NIL;
119 	List *data_nodes = NIL;
120 	bool do_nothing = false;
121 
122 	initStringInfo(&sql);
123 
124 	/*
125 	 * Extract the relevant RETURNING list if any.
126 	 */
127 	if (plan->returningLists)
128 		returning_list = (List *) list_nth(plan->returningLists, subplan_index);
129 
130 	/*
131 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
132 	 * should have already been rejected in the optimizer, as presently there
133 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
134 	 * NOTHING is supported without an inference specification.
135 	 */
136 	if (plan->onConflictAction == ONCONFLICT_NOTHING)
137 		do_nothing = true;
138 	else if (plan->onConflictAction != ONCONFLICT_NONE)
139 		ereport(ERROR,
140 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
141 				 errmsg("ON CONFLICT DO UPDATE not supported"
142 						" on distributed hypertables")));
143 
144 	/*
145 	 * Core code already has some lock on each rel being planned, so we can
146 	 * use NoLock here.
147 	 */
148 	rel = table_open(rte->relid, NoLock);
149 
150 	/*
151 	 * Construct the SQL command string
152 	 *
153 	 * In an INSERT, we transmit all columns that are defined in the foreign
154 	 * table.  In an UPDATE, we transmit only columns that were explicitly
155 	 * targets of the UPDATE, so as to avoid unnecessary data transmission.
156 	 * (We can't do that for INSERT since we would miss sending default values
157 	 * for columns not listed in the source statement.)
158 	 */
159 	switch (operation)
160 	{
161 		case CMD_INSERT:
162 			target_attrs = get_insert_attrs(rel);
163 			deparseInsertSql(&sql,
164 							 rte,
165 							 result_relation,
166 							 rel,
167 							 target_attrs,
168 							 1,
169 							 do_nothing,
170 							 returning_list,
171 							 &retrieved_attrs);
172 			break;
173 		case CMD_UPDATE:
174 			target_attrs = get_update_attrs(rte);
175 			deparseUpdateSql(&sql,
176 							 rte,
177 							 result_relation,
178 							 rel,
179 							 target_attrs,
180 							 returning_list,
181 							 &retrieved_attrs);
182 			data_nodes = get_chunk_data_nodes(rel->rd_id);
183 			break;
184 		case CMD_DELETE:
185 			deparseDeleteSql(&sql, rte, result_relation, rel, returning_list, &retrieved_attrs);
186 			data_nodes = get_chunk_data_nodes(rel->rd_id);
187 			break;
188 		default:
189 			elog(ERROR, "unexpected operation: %d", (int) operation);
190 			break;
191 	}
192 
193 	table_close(rel, NoLock);
194 
195 	/*
196 	 * Build the fdw_private list that will be available to the executor.
197 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
198 	 */
199 	return list_make5(makeString(sql.data),
200 					  target_attrs,
201 					  makeInteger((retrieved_attrs != NIL)),
202 					  retrieved_attrs,
203 					  data_nodes);
204 }
205