1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <executor/executor.h>
8 #include <parser/parsetree.h>
9 #include <nodes/plannodes.h>
10 #include <commands/explain.h>
11 #include <foreign/fdwapi.h>
12 #include <utils/rel.h>
13 #include <fmgr.h>
14 #include <miscadmin.h>
15 #include <guc.h>
16 
17 #include <remote/async.h>
18 #include <remote/stmt_params.h>
19 #include <remote/connection.h>
20 #include <remote/dist_txn.h>
21 #include <remote/utils.h>
22 #include <remote/tuplefactory.h>
23 #include <chunk_data_node.h>
24 #include <nodes/chunk_insert_state.h>
25 
26 #include "scan_plan.h"
27 #include "modify_exec.h"
28 
29 /*
30  * This enum describes what's kept in the fdw_private list for a ModifyTable
31  * node referencing a timescaledb_fdw foreign table.  We store:
32  *
33  * 1) INSERT/UPDATE/DELETE statement text to be sent to the data node
34  * 2) Integer list of target attribute numbers for INSERT/UPDATE
35  *	  (NIL for a DELETE)
36  * 3) Boolean flag showing if the remote query has a RETURNING clause
37  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
38  */
39 enum FdwModifyPrivateIndex
40 {
41 	/* SQL statement to execute remotely (as a String node) */
42 	FdwModifyPrivateUpdateSql,
43 	/* Integer list of target attribute numbers for INSERT/UPDATE */
44 	FdwModifyPrivateTargetAttnums,
45 	/* has-returning flag (as an integer Value node) */
46 	FdwModifyPrivateHasReturning,
47 	/* Integer list of attribute numbers retrieved by RETURNING */
48 	FdwModifyPrivateRetrievedAttrs,
49 	/* The data nodes for the current chunk */
50 	FdwModifyPrivateDataNodes,
51 	/* Insert state for the current chunk */
52 	FdwModifyPrivateChunkInsertState,
53 };
54 
55 typedef struct TsFdwDataNodeState
56 {
57 	TSConnectionId id;
58 	/* for remote query execution */
59 	TSConnection *conn;   /* connection for the scan */
60 	PreparedStmt *p_stmt; /* prepared statement handle, if created */
61 } TsFdwDataNodeState;
62 
63 /*
64  * Execution state of a foreign insert/update/delete operation.
65  */
66 typedef struct TsFdwModifyState
67 {
68 	Relation rel;						  /* relcache entry for the foreign table */
69 	AttConvInMetadata *att_conv_metadata; /* attribute datatype conversion metadata for converting
70 											 result to tuples */
71 
72 	/* extracted fdw_private data */
73 	char *query;		/* text of INSERT/UPDATE/DELETE command */
74 	List *target_attrs; /* list of target attribute numbers */
75 	bool has_returning; /* is there a RETURNING clause? */
76 	TupleFactory *tupfactory;
77 
78 	AttrNumber ctid_attno; /* attnum of input resjunk ctid column */
79 
80 	bool prepared;
81 	int num_data_nodes;
82 	StmtParams *stmt_params; /* prepared statement paremeters */
83 	TsFdwDataNodeState data_nodes[FLEXIBLE_ARRAY_MEMBER];
84 } TsFdwModifyState;
85 
86 #define TS_FDW_MODIFY_STATE_SIZE(num_data_nodes)                                                   \
87 	(sizeof(TsFdwModifyState) + (sizeof(TsFdwDataNodeState) * num_data_nodes))
88 
89 static void
initialize_fdw_data_node_state(TsFdwDataNodeState * fdw_data_node,TSConnectionId id)90 initialize_fdw_data_node_state(TsFdwDataNodeState *fdw_data_node, TSConnectionId id)
91 {
92 	fdw_data_node->id = id;
93 	fdw_data_node->conn = remote_dist_txn_get_connection(id, REMOTE_TXN_USE_PREP_STMT);
94 	fdw_data_node->p_stmt = NULL;
95 }
96 
97 /*
98  * create_foreign_modify
99  *		Construct an execution state of a foreign insert/update/delete
100  *		operation
101  */
102 static TsFdwModifyState *
create_foreign_modify(EState * estate,Relation rel,CmdType operation,Oid check_as_user,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs,List * server_id_list)103 create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check_as_user,
104 					  Plan *subplan, char *query, List *target_attrs, bool has_returning,
105 					  List *retrieved_attrs, List *server_id_list)
106 {
107 	TsFdwModifyState *fmstate;
108 	TupleDesc tupdesc = RelationGetDescr(rel);
109 	ListCell *lc;
110 	Oid user_id = OidIsValid(check_as_user) ? check_as_user : GetUserId();
111 	int i = 0;
112 	int num_data_nodes = server_id_list == NIL ? 1 : list_length(server_id_list);
113 
114 	/* Begin constructing TsFdwModifyState. */
115 	fmstate = (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(num_data_nodes));
116 	fmstate->rel = rel;
117 
118 	/*
119 	 * Identify which user to do the remote access as.  This should match what
120 	 * ExecCheckRTEPerms() does.
121 	 */
122 
123 	if (NIL != server_id_list)
124 	{
125 		/*
126 		 * This is either (1) an INSERT on a hypertable chunk, or (2) an
127 		 * UPDATE or DELETE on a chunk. In the former case (1), the data nodes
128 		 * were passed on from the INSERT path via the chunk insert state, and
129 		 * in the latter case (2), the data nodes were resolved at planning time
130 		 * in the FDW planning callback.
131 		 */
132 
133 		foreach (lc, server_id_list)
134 		{
135 			Oid server_id = lfirst_oid(lc);
136 			TSConnectionId id = remote_connection_id(server_id, user_id);
137 
138 			initialize_fdw_data_node_state(&fmstate->data_nodes[i++], id);
139 		}
140 	}
141 	else
142 	{
143 		/*
144 		 * If there is no chunk insert state and no data nodes from planning,
145 		 * this is an INSERT, UPDATE, or DELETE on a standalone foreign table.
146 		 * We must get the data node from the foreign table's metadata.
147 		 */
148 		ForeignTable *table = GetForeignTable(rel->rd_id);
149 		TSConnectionId id = remote_connection_id(table->serverid, user_id);
150 
151 		initialize_fdw_data_node_state(&fmstate->data_nodes[0], id);
152 	}
153 
154 	/* Set up remote query information. */
155 	fmstate->query = query;
156 	fmstate->target_attrs = target_attrs;
157 	fmstate->has_returning = has_returning;
158 	fmstate->prepared = false; /* PREPARE will happen later */
159 	fmstate->num_data_nodes = num_data_nodes;
160 
161 	/* Prepare for input conversion of RETURNING results. */
162 	if (fmstate->has_returning)
163 		fmstate->att_conv_metadata = data_format_create_att_conv_in_metadata(tupdesc, false);
164 
165 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
166 	{
167 		Assert(subplan != NULL);
168 
169 		/* Find the ctid resjunk column in the subplan's result */
170 		fmstate->ctid_attno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid");
171 		if (!AttributeNumberIsValid(fmstate->ctid_attno))
172 			elog(ERROR, "could not find junk ctid column");
173 	}
174 
175 	fmstate->stmt_params = stmt_params_create(fmstate->target_attrs,
176 											  operation == CMD_UPDATE || operation == CMD_DELETE,
177 											  tupdesc,
178 											  1);
179 
180 	fmstate->tupfactory = tuplefactory_create_for_rel(rel, retrieved_attrs);
181 
182 	return fmstate;
183 }
184 
185 /*
186  * Convert a relation's attribute numbers to the corresponding numbers for
187  * another relation.
188  *
189  * Conversions are necessary when, e.g., a (new) chunk's attribute numbers do
190  * not match the root table's numbers after a column has been removed.
191  */
192 static List *
convert_attrs(TupleConversionMap * map,List * attrs)193 convert_attrs(TupleConversionMap *map, List *attrs)
194 {
195 	List *new_attrs = NIL;
196 	ListCell *lc;
197 
198 	foreach (lc, attrs)
199 	{
200 		AttrNumber attnum = lfirst_int(lc);
201 		int i;
202 
203 		for (i = 0; i < map->outdesc->natts; i++)
204 		{
205 #if PG13_GE
206 			if (map->attrMap->attnums[i] == attnum)
207 #else
208 			if (map->attrMap[i] == attnum)
209 #endif
210 			{
211 				new_attrs = lappend_int(new_attrs, AttrOffsetGetAttrNumber(i));
212 				break;
213 			}
214 		}
215 
216 		/* Assert that we found the attribute */
217 		Assert(i != map->outdesc->natts);
218 	}
219 
220 	Assert(list_length(attrs) == list_length(new_attrs));
221 
222 	return new_attrs;
223 }
224 
225 static List *
get_chunk_server_id_list(const List * chunk_data_nodes)226 get_chunk_server_id_list(const List *chunk_data_nodes)
227 {
228 	List *list = NIL;
229 	ListCell *lc;
230 
231 	foreach (lc, chunk_data_nodes)
232 	{
233 		ChunkDataNode *cdn = lfirst(lc);
234 
235 		list = lappend_oid(list, cdn->foreign_server_oid);
236 	}
237 
238 	return list;
239 }
240 
241 void
fdw_begin_foreign_modify(PlanState * pstate,ResultRelInfo * rri,CmdType operation,List * fdw_private,Plan * subplan)242 fdw_begin_foreign_modify(PlanState *pstate, ResultRelInfo *rri, CmdType operation,
243 						 List *fdw_private, Plan *subplan)
244 {
245 	TsFdwModifyState *fmstate;
246 	EState *estate = pstate->state;
247 	char *query;
248 	List *target_attrs;
249 	bool has_returning;
250 	List *retrieved_attrs;
251 	List *server_id_list = NIL;
252 	ChunkInsertState *cis = NULL;
253 	RangeTblEntry *rte;
254 
255 	/* Deconstruct fdw_private data. */
256 	query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql));
257 	target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums);
258 	has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning));
259 	retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs);
260 
261 	/* Find RTE. */
262 	rte = rt_fetch(rri->ri_RangeTableIndex, estate->es_range_table);
263 
264 	Assert(NULL != rte);
265 
266 	if (list_length(fdw_private) > FdwModifyPrivateDataNodes)
267 	{
268 		List *data_nodes = (List *) list_nth(fdw_private, FdwModifyPrivateDataNodes);
269 		ListCell *lc;
270 
271 		foreach (lc, data_nodes)
272 			server_id_list = lappend_oid(server_id_list, lfirst_oid(lc));
273 	}
274 
275 	if (list_length(fdw_private) > FdwModifyPrivateChunkInsertState)
276 	{
277 		cis = (ChunkInsertState *) list_nth(fdw_private, FdwModifyPrivateChunkInsertState);
278 
279 		/*
280 		 * A chunk may have different attribute numbers than the root relation
281 		 * that we planned the attribute lists for
282 		 */
283 		if (NULL != cis->hyper_to_chunk_map)
284 		{
285 			/*
286 			 * Convert the target attributes (the inserted or updated
287 			 * attributes)
288 			 */
289 			target_attrs = convert_attrs(cis->hyper_to_chunk_map, target_attrs);
290 
291 			/*
292 			 * Convert the retrieved attributes, if there is a RETURNING
293 			 * statement
294 			 */
295 			if (NIL != retrieved_attrs)
296 				retrieved_attrs = convert_attrs(cis->hyper_to_chunk_map, retrieved_attrs);
297 		}
298 
299 		/*
300 		 * If there's a chunk insert state, then it has the authoritative
301 		 * data node list.
302 		 */
303 		server_id_list = get_chunk_server_id_list(cis->chunk_data_nodes);
304 	}
305 
306 	/* Construct an execution state. */
307 	fmstate = create_foreign_modify(estate,
308 									rri->ri_RelationDesc,
309 									operation,
310 									rte->checkAsUser,
311 									subplan,
312 									query,
313 									target_attrs,
314 									has_returning,
315 									retrieved_attrs,
316 									server_id_list);
317 
318 	rri->ri_FdwState = fmstate;
319 }
320 
321 static PreparedStmt *
prepare_foreign_modify_data_node(TsFdwModifyState * fmstate,TsFdwDataNodeState * fdw_data_node)322 prepare_foreign_modify_data_node(TsFdwModifyState *fmstate, TsFdwDataNodeState *fdw_data_node)
323 {
324 	AsyncRequest *req;
325 
326 	Assert(NULL == fdw_data_node->p_stmt);
327 
328 	req = async_request_send_prepare(fdw_data_node->conn,
329 									 fmstate->query,
330 									 stmt_params_num_params(fmstate->stmt_params));
331 
332 	Assert(NULL != req);
333 
334 	/*
335 	 * Async request interface doesn't seem to allow waiting for multiple
336 	 * prepared statements in an AsyncRequestSet. Should fix async API
337 	 */
338 	return async_request_wait_prepared_statement(req);
339 }
340 
341 /*
342  * prepare_foreign_modify
343  *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
344  */
345 static void
prepare_foreign_modify(TsFdwModifyState * fmstate)346 prepare_foreign_modify(TsFdwModifyState *fmstate)
347 {
348 	int i;
349 
350 	for (i = 0; i < fmstate->num_data_nodes; i++)
351 	{
352 		TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
353 
354 		fdw_data_node->p_stmt = prepare_foreign_modify_data_node(fmstate, fdw_data_node);
355 	}
356 
357 	fmstate->prepared = true;
358 }
359 
360 /*
361  * store_returning_result
362  *		Store the result of a RETURNING clause
363  *
364  * On error, be sure to release the PGresult on the way out.  Callers do not
365  * have PG_TRY blocks to ensure this happens.
366  */
367 static void
store_returning_result(TsFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)368 store_returning_result(TsFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
369 {
370 	PG_TRY();
371 	{
372 		HeapTuple newtup =
373 			tuplefactory_make_tuple(fmstate->tupfactory, res, 0, PQbinaryTuples(res));
374 
375 		/* tuple will be deleted when it is cleared from the slot */
376 		ExecStoreHeapTuple(newtup, slot, true);
377 	}
378 	PG_CATCH();
379 	{
380 		if (res)
381 			PQclear(res);
382 		PG_RE_THROW();
383 	}
384 	PG_END_TRY();
385 }
386 
387 static int
response_type(AttConvInMetadata * att_conv_metadata)388 response_type(AttConvInMetadata *att_conv_metadata)
389 {
390 	if (!ts_guc_enable_connection_binary_data)
391 		return FORMAT_TEXT;
392 	return att_conv_metadata == NULL || att_conv_metadata->binary ? FORMAT_BINARY : FORMAT_TEXT;
393 }
394 
395 TupleTableSlot *
fdw_exec_foreign_insert(TsFdwModifyState * fmstate,EState * estate,TupleTableSlot * slot,TupleTableSlot * planslot)396 fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot,
397 						TupleTableSlot *planslot)
398 {
399 	StmtParams *params = fmstate->stmt_params;
400 	AsyncRequestSet *reqset;
401 	AsyncResponseResult *rsp;
402 	int n_rows = -1;
403 	int i;
404 
405 	if (!fmstate->prepared)
406 		prepare_foreign_modify(fmstate);
407 
408 	reqset = async_request_set_create();
409 
410 	stmt_params_convert_values(params, slot, NULL);
411 
412 	for (i = 0; i < fmstate->num_data_nodes; i++)
413 	{
414 		TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
415 		AsyncRequest *req = NULL;
416 		int type = response_type(fmstate->att_conv_metadata);
417 		req = async_request_send_prepared_stmt_with_params(fdw_data_node->p_stmt, params, type);
418 		Assert(NULL != req);
419 		async_request_set_add(reqset, req);
420 	}
421 
422 	while ((rsp = async_request_set_wait_any_result(reqset)))
423 	{
424 		PGresult *res = async_response_result_get_pg_result(rsp);
425 
426 		if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
427 			async_response_report_error((AsyncResponse *) rsp, ERROR);
428 
429 		/*
430 		 * If we insert into multiple replica chunks, we should only return
431 		 * the results from the first one
432 		 */
433 		if (n_rows == -1)
434 		{
435 			/* Check number of rows affected, and fetch RETURNING tuple if any */
436 			if (fmstate->has_returning)
437 			{
438 				n_rows = PQntuples(res);
439 
440 				if (n_rows > 0)
441 					store_returning_result(fmstate, slot, res);
442 			}
443 			else
444 				n_rows = atoi(PQcmdTuples(res));
445 		}
446 
447 		/* And clean up */
448 		async_response_result_close(rsp);
449 		stmt_params_reset(params);
450 	}
451 
452 	/*
453 	 * Currently no way to do a deep cleanup of all request in the request
454 	 * set. The worry here is that since this runs in a per-chunk insert state
455 	 * memory context, the async API will accumulate a lot of cruft during
456 	 * inserts
457 	 */
458 	pfree(reqset);
459 
460 	/* Return NULL if nothing was inserted on the remote end */
461 	return (n_rows > 0) ? slot : NULL;
462 }
463 
464 /*
465  * Execute either an UPDATE or DELETE.
466  */
467 TupleTableSlot *
fdw_exec_foreign_update_or_delete(TsFdwModifyState * fmstate,EState * estate,TupleTableSlot * slot,TupleTableSlot * planslot,ModifyCommand cmd)468 fdw_exec_foreign_update_or_delete(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot,
469 								  TupleTableSlot *planslot, ModifyCommand cmd)
470 {
471 	StmtParams *params = fmstate->stmt_params;
472 	AsyncRequestSet *reqset;
473 	AsyncResponseResult *rsp;
474 	Datum datum;
475 	bool is_null;
476 	int n_rows = -1;
477 	int i;
478 
479 	/* Set up the prepared statement on the data node, if we didn't yet */
480 	if (!fmstate->prepared)
481 		prepare_foreign_modify(fmstate);
482 
483 	/* Get the ctid that was passed up as a resjunk column */
484 	datum = ExecGetJunkAttribute(planslot, fmstate->ctid_attno, &is_null);
485 
486 	/* shouldn't ever get a null result... */
487 	if (is_null)
488 		elog(ERROR, "ctid is NULL");
489 
490 	stmt_params_convert_values(params,
491 							   (cmd == UPDATE_CMD ? slot : NULL),
492 							   (ItemPointer) DatumGetPointer(datum));
493 	reqset = async_request_set_create();
494 
495 	for (i = 0; i < fmstate->num_data_nodes; i++)
496 	{
497 		AsyncRequest *req = NULL;
498 		TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
499 		int type = response_type(fmstate->att_conv_metadata);
500 		req = async_request_send_prepared_stmt_with_params(fdw_data_node->p_stmt, params, type);
501 
502 		Assert(NULL != req);
503 
504 		async_request_attach_user_data(req, fdw_data_node);
505 		async_request_set_add(reqset, req);
506 	}
507 
508 	while ((rsp = async_request_set_wait_any_result(reqset)))
509 	{
510 		PGresult *res = async_response_result_get_pg_result(rsp);
511 
512 		if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
513 			remote_result_elog(res, ERROR);
514 
515 		/*
516 		 * If we update multiple replica chunks, we should only return the
517 		 * results from the first one.
518 		 */
519 		if (n_rows == -1)
520 		{
521 			/* Check number of rows affected, and fetch RETURNING tuple if any */
522 			if (fmstate->has_returning)
523 			{
524 				n_rows = PQntuples(res);
525 
526 				if (n_rows > 0)
527 					store_returning_result(fmstate, slot, res);
528 			}
529 			else
530 				n_rows = atoi(PQcmdTuples(res));
531 		}
532 
533 		/* And clean up */
534 		async_response_result_close(rsp);
535 	}
536 
537 	/*
538 	 * Currently no way to do a deep cleanup of all request in the request
539 	 * set. The worry here is that since this runs in a per-chunk insert state
540 	 * memory context, the async API will accumulate a lot of cruft during
541 	 * inserts
542 	 */
543 	pfree(reqset);
544 	stmt_params_reset(params);
545 
546 	/* Return NULL if nothing was updated on the remote end */
547 	return (n_rows > 0) ? slot : NULL;
548 }
549 
550 /*
551  * finish_foreign_modify
552  *		Release resources for a foreign insert/update/delete operation
553  */
554 void
fdw_finish_foreign_modify(TsFdwModifyState * fmstate)555 fdw_finish_foreign_modify(TsFdwModifyState *fmstate)
556 {
557 	int i;
558 
559 	Assert(fmstate != NULL);
560 
561 	for (i = 0; i < fmstate->num_data_nodes; i++)
562 	{
563 		TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
564 
565 		/* If we created a prepared statement, destroy it */
566 		if (NULL != fdw_data_node->p_stmt)
567 		{
568 			prepared_stmt_close(fdw_data_node->p_stmt);
569 			fdw_data_node->p_stmt = NULL;
570 		}
571 
572 		fdw_data_node->conn = NULL;
573 	}
574 
575 	stmt_params_free(fmstate->stmt_params);
576 }
577 
578 void
fdw_explain_modify(PlanState * ps,ResultRelInfo * rri,List * fdw_private,int subplan_index,ExplainState * es)579 fdw_explain_modify(PlanState *ps, ResultRelInfo *rri, List *fdw_private, int subplan_index,
580 				   ExplainState *es)
581 {
582 	if (es->verbose)
583 	{
584 		const char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql));
585 
586 		ExplainPropertyText("Remote SQL", sql, es);
587 	}
588 }
589