1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <executor/executor.h>
8 #include <parser/parsetree.h>
9 #include <nodes/plannodes.h>
10 #include <commands/explain.h>
11 #include <foreign/fdwapi.h>
12 #include <utils/rel.h>
13 #include <fmgr.h>
14 #include <miscadmin.h>
15 #include <guc.h>
16
17 #include <remote/async.h>
18 #include <remote/stmt_params.h>
19 #include <remote/connection.h>
20 #include <remote/dist_txn.h>
21 #include <remote/utils.h>
22 #include <remote/tuplefactory.h>
23 #include <chunk_data_node.h>
24 #include <nodes/chunk_insert_state.h>
25
26 #include "scan_plan.h"
27 #include "modify_exec.h"
28
29 /*
30 * This enum describes what's kept in the fdw_private list for a ModifyTable
31 * node referencing a timescaledb_fdw foreign table. We store:
32 *
33 * 1) INSERT/UPDATE/DELETE statement text to be sent to the data node
34 * 2) Integer list of target attribute numbers for INSERT/UPDATE
35 * (NIL for a DELETE)
36 * 3) Boolean flag showing if the remote query has a RETURNING clause
37 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
38 */
39 enum FdwModifyPrivateIndex
40 {
41 /* SQL statement to execute remotely (as a String node) */
42 FdwModifyPrivateUpdateSql,
43 /* Integer list of target attribute numbers for INSERT/UPDATE */
44 FdwModifyPrivateTargetAttnums,
45 /* has-returning flag (as an integer Value node) */
46 FdwModifyPrivateHasReturning,
47 /* Integer list of attribute numbers retrieved by RETURNING */
48 FdwModifyPrivateRetrievedAttrs,
49 /* The data nodes for the current chunk */
50 FdwModifyPrivateDataNodes,
51 /* Insert state for the current chunk */
52 FdwModifyPrivateChunkInsertState,
53 };
54
55 typedef struct TsFdwDataNodeState
56 {
57 TSConnectionId id;
58 /* for remote query execution */
59 TSConnection *conn; /* connection for the scan */
60 PreparedStmt *p_stmt; /* prepared statement handle, if created */
61 } TsFdwDataNodeState;
62
63 /*
64 * Execution state of a foreign insert/update/delete operation.
65 */
66 typedef struct TsFdwModifyState
67 {
68 Relation rel; /* relcache entry for the foreign table */
69 AttConvInMetadata *att_conv_metadata; /* attribute datatype conversion metadata for converting
70 result to tuples */
71
72 /* extracted fdw_private data */
73 char *query; /* text of INSERT/UPDATE/DELETE command */
74 List *target_attrs; /* list of target attribute numbers */
75 bool has_returning; /* is there a RETURNING clause? */
76 TupleFactory *tupfactory;
77
78 AttrNumber ctid_attno; /* attnum of input resjunk ctid column */
79
80 bool prepared;
81 int num_data_nodes;
82 StmtParams *stmt_params; /* prepared statement paremeters */
83 TsFdwDataNodeState data_nodes[FLEXIBLE_ARRAY_MEMBER];
84 } TsFdwModifyState;
85
86 #define TS_FDW_MODIFY_STATE_SIZE(num_data_nodes) \
87 (sizeof(TsFdwModifyState) + (sizeof(TsFdwDataNodeState) * num_data_nodes))
88
89 static void
initialize_fdw_data_node_state(TsFdwDataNodeState * fdw_data_node,TSConnectionId id)90 initialize_fdw_data_node_state(TsFdwDataNodeState *fdw_data_node, TSConnectionId id)
91 {
92 fdw_data_node->id = id;
93 fdw_data_node->conn = remote_dist_txn_get_connection(id, REMOTE_TXN_USE_PREP_STMT);
94 fdw_data_node->p_stmt = NULL;
95 }
96
97 /*
98 * create_foreign_modify
99 * Construct an execution state of a foreign insert/update/delete
100 * operation
101 */
102 static TsFdwModifyState *
create_foreign_modify(EState * estate,Relation rel,CmdType operation,Oid check_as_user,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs,List * server_id_list)103 create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check_as_user,
104 Plan *subplan, char *query, List *target_attrs, bool has_returning,
105 List *retrieved_attrs, List *server_id_list)
106 {
107 TsFdwModifyState *fmstate;
108 TupleDesc tupdesc = RelationGetDescr(rel);
109 ListCell *lc;
110 Oid user_id = OidIsValid(check_as_user) ? check_as_user : GetUserId();
111 int i = 0;
112 int num_data_nodes = server_id_list == NIL ? 1 : list_length(server_id_list);
113
114 /* Begin constructing TsFdwModifyState. */
115 fmstate = (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(num_data_nodes));
116 fmstate->rel = rel;
117
118 /*
119 * Identify which user to do the remote access as. This should match what
120 * ExecCheckRTEPerms() does.
121 */
122
123 if (NIL != server_id_list)
124 {
125 /*
126 * This is either (1) an INSERT on a hypertable chunk, or (2) an
127 * UPDATE or DELETE on a chunk. In the former case (1), the data nodes
128 * were passed on from the INSERT path via the chunk insert state, and
129 * in the latter case (2), the data nodes were resolved at planning time
130 * in the FDW planning callback.
131 */
132
133 foreach (lc, server_id_list)
134 {
135 Oid server_id = lfirst_oid(lc);
136 TSConnectionId id = remote_connection_id(server_id, user_id);
137
138 initialize_fdw_data_node_state(&fmstate->data_nodes[i++], id);
139 }
140 }
141 else
142 {
143 /*
144 * If there is no chunk insert state and no data nodes from planning,
145 * this is an INSERT, UPDATE, or DELETE on a standalone foreign table.
146 * We must get the data node from the foreign table's metadata.
147 */
148 ForeignTable *table = GetForeignTable(rel->rd_id);
149 TSConnectionId id = remote_connection_id(table->serverid, user_id);
150
151 initialize_fdw_data_node_state(&fmstate->data_nodes[0], id);
152 }
153
154 /* Set up remote query information. */
155 fmstate->query = query;
156 fmstate->target_attrs = target_attrs;
157 fmstate->has_returning = has_returning;
158 fmstate->prepared = false; /* PREPARE will happen later */
159 fmstate->num_data_nodes = num_data_nodes;
160
161 /* Prepare for input conversion of RETURNING results. */
162 if (fmstate->has_returning)
163 fmstate->att_conv_metadata = data_format_create_att_conv_in_metadata(tupdesc, false);
164
165 if (operation == CMD_UPDATE || operation == CMD_DELETE)
166 {
167 Assert(subplan != NULL);
168
169 /* Find the ctid resjunk column in the subplan's result */
170 fmstate->ctid_attno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid");
171 if (!AttributeNumberIsValid(fmstate->ctid_attno))
172 elog(ERROR, "could not find junk ctid column");
173 }
174
175 fmstate->stmt_params = stmt_params_create(fmstate->target_attrs,
176 operation == CMD_UPDATE || operation == CMD_DELETE,
177 tupdesc,
178 1);
179
180 fmstate->tupfactory = tuplefactory_create_for_rel(rel, retrieved_attrs);
181
182 return fmstate;
183 }
184
185 /*
186 * Convert a relation's attribute numbers to the corresponding numbers for
187 * another relation.
188 *
189 * Conversions are necessary when, e.g., a (new) chunk's attribute numbers do
190 * not match the root table's numbers after a column has been removed.
191 */
192 static List *
convert_attrs(TupleConversionMap * map,List * attrs)193 convert_attrs(TupleConversionMap *map, List *attrs)
194 {
195 List *new_attrs = NIL;
196 ListCell *lc;
197
198 foreach (lc, attrs)
199 {
200 AttrNumber attnum = lfirst_int(lc);
201 int i;
202
203 for (i = 0; i < map->outdesc->natts; i++)
204 {
205 #if PG13_GE
206 if (map->attrMap->attnums[i] == attnum)
207 #else
208 if (map->attrMap[i] == attnum)
209 #endif
210 {
211 new_attrs = lappend_int(new_attrs, AttrOffsetGetAttrNumber(i));
212 break;
213 }
214 }
215
216 /* Assert that we found the attribute */
217 Assert(i != map->outdesc->natts);
218 }
219
220 Assert(list_length(attrs) == list_length(new_attrs));
221
222 return new_attrs;
223 }
224
225 static List *
get_chunk_server_id_list(const List * chunk_data_nodes)226 get_chunk_server_id_list(const List *chunk_data_nodes)
227 {
228 List *list = NIL;
229 ListCell *lc;
230
231 foreach (lc, chunk_data_nodes)
232 {
233 ChunkDataNode *cdn = lfirst(lc);
234
235 list = lappend_oid(list, cdn->foreign_server_oid);
236 }
237
238 return list;
239 }
240
241 void
fdw_begin_foreign_modify(PlanState * pstate,ResultRelInfo * rri,CmdType operation,List * fdw_private,Plan * subplan)242 fdw_begin_foreign_modify(PlanState *pstate, ResultRelInfo *rri, CmdType operation,
243 List *fdw_private, Plan *subplan)
244 {
245 TsFdwModifyState *fmstate;
246 EState *estate = pstate->state;
247 char *query;
248 List *target_attrs;
249 bool has_returning;
250 List *retrieved_attrs;
251 List *server_id_list = NIL;
252 ChunkInsertState *cis = NULL;
253 RangeTblEntry *rte;
254
255 /* Deconstruct fdw_private data. */
256 query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql));
257 target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums);
258 has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning));
259 retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs);
260
261 /* Find RTE. */
262 rte = rt_fetch(rri->ri_RangeTableIndex, estate->es_range_table);
263
264 Assert(NULL != rte);
265
266 if (list_length(fdw_private) > FdwModifyPrivateDataNodes)
267 {
268 List *data_nodes = (List *) list_nth(fdw_private, FdwModifyPrivateDataNodes);
269 ListCell *lc;
270
271 foreach (lc, data_nodes)
272 server_id_list = lappend_oid(server_id_list, lfirst_oid(lc));
273 }
274
275 if (list_length(fdw_private) > FdwModifyPrivateChunkInsertState)
276 {
277 cis = (ChunkInsertState *) list_nth(fdw_private, FdwModifyPrivateChunkInsertState);
278
279 /*
280 * A chunk may have different attribute numbers than the root relation
281 * that we planned the attribute lists for
282 */
283 if (NULL != cis->hyper_to_chunk_map)
284 {
285 /*
286 * Convert the target attributes (the inserted or updated
287 * attributes)
288 */
289 target_attrs = convert_attrs(cis->hyper_to_chunk_map, target_attrs);
290
291 /*
292 * Convert the retrieved attributes, if there is a RETURNING
293 * statement
294 */
295 if (NIL != retrieved_attrs)
296 retrieved_attrs = convert_attrs(cis->hyper_to_chunk_map, retrieved_attrs);
297 }
298
299 /*
300 * If there's a chunk insert state, then it has the authoritative
301 * data node list.
302 */
303 server_id_list = get_chunk_server_id_list(cis->chunk_data_nodes);
304 }
305
306 /* Construct an execution state. */
307 fmstate = create_foreign_modify(estate,
308 rri->ri_RelationDesc,
309 operation,
310 rte->checkAsUser,
311 subplan,
312 query,
313 target_attrs,
314 has_returning,
315 retrieved_attrs,
316 server_id_list);
317
318 rri->ri_FdwState = fmstate;
319 }
320
321 static PreparedStmt *
prepare_foreign_modify_data_node(TsFdwModifyState * fmstate,TsFdwDataNodeState * fdw_data_node)322 prepare_foreign_modify_data_node(TsFdwModifyState *fmstate, TsFdwDataNodeState *fdw_data_node)
323 {
324 AsyncRequest *req;
325
326 Assert(NULL == fdw_data_node->p_stmt);
327
328 req = async_request_send_prepare(fdw_data_node->conn,
329 fmstate->query,
330 stmt_params_num_params(fmstate->stmt_params));
331
332 Assert(NULL != req);
333
334 /*
335 * Async request interface doesn't seem to allow waiting for multiple
336 * prepared statements in an AsyncRequestSet. Should fix async API
337 */
338 return async_request_wait_prepared_statement(req);
339 }
340
341 /*
342 * prepare_foreign_modify
343 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
344 */
345 static void
prepare_foreign_modify(TsFdwModifyState * fmstate)346 prepare_foreign_modify(TsFdwModifyState *fmstate)
347 {
348 int i;
349
350 for (i = 0; i < fmstate->num_data_nodes; i++)
351 {
352 TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
353
354 fdw_data_node->p_stmt = prepare_foreign_modify_data_node(fmstate, fdw_data_node);
355 }
356
357 fmstate->prepared = true;
358 }
359
360 /*
361 * store_returning_result
362 * Store the result of a RETURNING clause
363 *
364 * On error, be sure to release the PGresult on the way out. Callers do not
365 * have PG_TRY blocks to ensure this happens.
366 */
367 static void
store_returning_result(TsFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)368 store_returning_result(TsFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
369 {
370 PG_TRY();
371 {
372 HeapTuple newtup =
373 tuplefactory_make_tuple(fmstate->tupfactory, res, 0, PQbinaryTuples(res));
374
375 /* tuple will be deleted when it is cleared from the slot */
376 ExecStoreHeapTuple(newtup, slot, true);
377 }
378 PG_CATCH();
379 {
380 if (res)
381 PQclear(res);
382 PG_RE_THROW();
383 }
384 PG_END_TRY();
385 }
386
387 static int
response_type(AttConvInMetadata * att_conv_metadata)388 response_type(AttConvInMetadata *att_conv_metadata)
389 {
390 if (!ts_guc_enable_connection_binary_data)
391 return FORMAT_TEXT;
392 return att_conv_metadata == NULL || att_conv_metadata->binary ? FORMAT_BINARY : FORMAT_TEXT;
393 }
394
395 TupleTableSlot *
fdw_exec_foreign_insert(TsFdwModifyState * fmstate,EState * estate,TupleTableSlot * slot,TupleTableSlot * planslot)396 fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot,
397 TupleTableSlot *planslot)
398 {
399 StmtParams *params = fmstate->stmt_params;
400 AsyncRequestSet *reqset;
401 AsyncResponseResult *rsp;
402 int n_rows = -1;
403 int i;
404
405 if (!fmstate->prepared)
406 prepare_foreign_modify(fmstate);
407
408 reqset = async_request_set_create();
409
410 stmt_params_convert_values(params, slot, NULL);
411
412 for (i = 0; i < fmstate->num_data_nodes; i++)
413 {
414 TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
415 AsyncRequest *req = NULL;
416 int type = response_type(fmstate->att_conv_metadata);
417 req = async_request_send_prepared_stmt_with_params(fdw_data_node->p_stmt, params, type);
418 Assert(NULL != req);
419 async_request_set_add(reqset, req);
420 }
421
422 while ((rsp = async_request_set_wait_any_result(reqset)))
423 {
424 PGresult *res = async_response_result_get_pg_result(rsp);
425
426 if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
427 async_response_report_error((AsyncResponse *) rsp, ERROR);
428
429 /*
430 * If we insert into multiple replica chunks, we should only return
431 * the results from the first one
432 */
433 if (n_rows == -1)
434 {
435 /* Check number of rows affected, and fetch RETURNING tuple if any */
436 if (fmstate->has_returning)
437 {
438 n_rows = PQntuples(res);
439
440 if (n_rows > 0)
441 store_returning_result(fmstate, slot, res);
442 }
443 else
444 n_rows = atoi(PQcmdTuples(res));
445 }
446
447 /* And clean up */
448 async_response_result_close(rsp);
449 stmt_params_reset(params);
450 }
451
452 /*
453 * Currently no way to do a deep cleanup of all request in the request
454 * set. The worry here is that since this runs in a per-chunk insert state
455 * memory context, the async API will accumulate a lot of cruft during
456 * inserts
457 */
458 pfree(reqset);
459
460 /* Return NULL if nothing was inserted on the remote end */
461 return (n_rows > 0) ? slot : NULL;
462 }
463
464 /*
465 * Execute either an UPDATE or DELETE.
466 */
467 TupleTableSlot *
fdw_exec_foreign_update_or_delete(TsFdwModifyState * fmstate,EState * estate,TupleTableSlot * slot,TupleTableSlot * planslot,ModifyCommand cmd)468 fdw_exec_foreign_update_or_delete(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot,
469 TupleTableSlot *planslot, ModifyCommand cmd)
470 {
471 StmtParams *params = fmstate->stmt_params;
472 AsyncRequestSet *reqset;
473 AsyncResponseResult *rsp;
474 Datum datum;
475 bool is_null;
476 int n_rows = -1;
477 int i;
478
479 /* Set up the prepared statement on the data node, if we didn't yet */
480 if (!fmstate->prepared)
481 prepare_foreign_modify(fmstate);
482
483 /* Get the ctid that was passed up as a resjunk column */
484 datum = ExecGetJunkAttribute(planslot, fmstate->ctid_attno, &is_null);
485
486 /* shouldn't ever get a null result... */
487 if (is_null)
488 elog(ERROR, "ctid is NULL");
489
490 stmt_params_convert_values(params,
491 (cmd == UPDATE_CMD ? slot : NULL),
492 (ItemPointer) DatumGetPointer(datum));
493 reqset = async_request_set_create();
494
495 for (i = 0; i < fmstate->num_data_nodes; i++)
496 {
497 AsyncRequest *req = NULL;
498 TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
499 int type = response_type(fmstate->att_conv_metadata);
500 req = async_request_send_prepared_stmt_with_params(fdw_data_node->p_stmt, params, type);
501
502 Assert(NULL != req);
503
504 async_request_attach_user_data(req, fdw_data_node);
505 async_request_set_add(reqset, req);
506 }
507
508 while ((rsp = async_request_set_wait_any_result(reqset)))
509 {
510 PGresult *res = async_response_result_get_pg_result(rsp);
511
512 if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
513 remote_result_elog(res, ERROR);
514
515 /*
516 * If we update multiple replica chunks, we should only return the
517 * results from the first one.
518 */
519 if (n_rows == -1)
520 {
521 /* Check number of rows affected, and fetch RETURNING tuple if any */
522 if (fmstate->has_returning)
523 {
524 n_rows = PQntuples(res);
525
526 if (n_rows > 0)
527 store_returning_result(fmstate, slot, res);
528 }
529 else
530 n_rows = atoi(PQcmdTuples(res));
531 }
532
533 /* And clean up */
534 async_response_result_close(rsp);
535 }
536
537 /*
538 * Currently no way to do a deep cleanup of all request in the request
539 * set. The worry here is that since this runs in a per-chunk insert state
540 * memory context, the async API will accumulate a lot of cruft during
541 * inserts
542 */
543 pfree(reqset);
544 stmt_params_reset(params);
545
546 /* Return NULL if nothing was updated on the remote end */
547 return (n_rows > 0) ? slot : NULL;
548 }
549
550 /*
551 * finish_foreign_modify
552 * Release resources for a foreign insert/update/delete operation
553 */
554 void
fdw_finish_foreign_modify(TsFdwModifyState * fmstate)555 fdw_finish_foreign_modify(TsFdwModifyState *fmstate)
556 {
557 int i;
558
559 Assert(fmstate != NULL);
560
561 for (i = 0; i < fmstate->num_data_nodes; i++)
562 {
563 TsFdwDataNodeState *fdw_data_node = &fmstate->data_nodes[i];
564
565 /* If we created a prepared statement, destroy it */
566 if (NULL != fdw_data_node->p_stmt)
567 {
568 prepared_stmt_close(fdw_data_node->p_stmt);
569 fdw_data_node->p_stmt = NULL;
570 }
571
572 fdw_data_node->conn = NULL;
573 }
574
575 stmt_params_free(fmstate->stmt_params);
576 }
577
578 void
fdw_explain_modify(PlanState * ps,ResultRelInfo * rri,List * fdw_private,int subplan_index,ExplainState * es)579 fdw_explain_modify(PlanState *ps, ResultRelInfo *rri, List *fdw_private, int subplan_index,
580 ExplainState *es)
581 {
582 if (es->verbose)
583 {
584 const char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql));
585
586 ExplainPropertyText("Remote SQL", sql, es);
587 }
588 }
589