1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <parser/parsetree.h>
8 #include <nodes/execnodes.h>
9 #include <nodes/extensible.h>
10 #include <nodes/makefuncs.h>
11 #include <nodes/nodeFuncs.h>
12 #include <nodes/plannodes.h>
13 #include <executor/executor.h>
14 #include <executor/nodeModifyTable.h>
15 #include <utils/lsyscache.h>
16 #include <utils/builtins.h>
17 #include <utils/hsearch.h>
18 #include <utils/tuplestore.h>
19 #include <utils/memutils.h>
20 #include <funcapi.h>
21 #include <miscadmin.h>
22
23 #include <chunk_data_node.h>
24 #include <nodes/chunk_dispatch_plan.h>
25 #include <nodes/chunk_dispatch_state.h>
26 #include <nodes/chunk_insert_state.h>
27 #include <hypertable_cache.h>
28 #include <compat/compat.h>
29 #include <guc.h>
30
31 #include "data_node_dispatch.h"
32 #include "fdw/scan_exec.h"
33 #include "fdw/deparse.h"
34 #include "remote/utils.h"
35 #include "remote/dist_txn.h"
36 #include "remote/async.h"
37 #include "remote/data_format.h"
38 #include "remote/tuplefactory.h"
39
40 #define TUPSTORE_MEMSIZE_KB work_mem
41 #define TUPSTORE_FLUSH_THRESHOLD ts_guc_max_insert_batch_size
42
43 typedef struct DataNodeDispatchPath
44 {
45 CustomPath cpath;
46 ModifyTablePath *mtpath;
47 Index hypertable_rti; /* range table index of Hypertable */
48 int subplan_index;
49 } DataNodeDispatchPath;
50
51 /*
52 * DataNodeDispatch dispatches tuples to data nodes using batching. It inserts
53 * itself below a ModifyTable node in the plan and subsequent execution tree,
54 * like so:
55 *
56 * -------------------- Set "direct modify plans" to
57 * | HypertableInsert | signal ModifyTable to only
58 * -------------------- handle returning projection.
59 * |
60 * ---------------- resultRelInfo->ri_usesFdwDirectModify
61 * | ModifyTable | should be TRUE. Handle returning projection.
62 * ----------------
63 * ^
64 * | RETURNING tuple or nothing
65 * --------------------
66 * | DataNodeDispatch | Batch and send tuples to data nodes.
67 * --------------------
68 * ^
69 * | Chunk-routed tuple
70 * -----------------
71 * | ChunkDispatch | Route tuple to chunk.
72 * ----------------- Set es_result_relation.
73 * ^
74 * | tuple
75 * --------------
76 * | ValuesScan | VALUES ('2019-02-23 13:43', 1, 8.9),
77 * -------------- ('2019-02-23 13:46', 2, 1.5);
78 *
79 *
80 * Data node dispatching uses the state machine outlined below:
81 *
82 * READ: read tuples from the subnode and save in per-node stores until one
83 * of them reaches FLUSH_THRESHOLD and then move to FLUSH. If a NULL-tuple is
84 * read before the threshold is reached, move to LAST_FLUSH. In case of
85 * replication, tuples are split across a primary and a replica tuple store.
86 *
87 * FLUSH: flush the tuples for the data nodes that reached
88 * FLUSH_THRESHOLD.
89 *
90 * LAST_FLUSH: flush tuples for all data nodes.
91 *
92 * RETURNING: if there is a RETURNING clause, return the inserted tuples
93 * one-by-one from all flushed nodes. When no more tuples remain, move to
94 * READ again or DONE if the previous state was LAST_FLUSH. Note, that in case
95 * of replication, the tuples are split across a primary and a replica tuple
96 * store for each data node. only tuples in the primary tuple store are
97 * returned. It is implicitly assumed that the primary tuples are sent on a
98 * connection before the replica tuples and thus the data node will also return
99 * the primary tuples first (in order of insertion).
100 *
101 * read
102 * ==
103 * thresh -------------
104 * -----> | FLUSH |--->----
105 * | ------------- | prev_state
106 * | | ==
107 * -------- ------------- LAST_FLUSH ----------
108 * | READ | <-------------- | RETURNING | ---------> | DONE |
109 * -------- ------------- ----------
110 * | ^
111 * | -------------- |
112 * ----> | LAST_FLUSH | -------
113 * read == 0 --------------
114 *
115 *
116 * Potential optimizations
117 * =======================
118 *
119 * - Tuples from both the primary and the replica tuple store are flushed with
120 * a RETURNING clause when such a clause is available. However, tuples from
121 * the replica store need not be returned, so using a separate prepared
122 * statement without RETURNING for the replica store would save bandwidth.
123 *
124 * - Better asynchronous behavior. When reading tuples, a flush happens as
125 * soon as a tuple store is filled instead of continuing to fill until more
126 * stores can be flushed. Further, after flushing tuples for a data node, the
127 * code immediately waits for a response instead of doing other work while
128 * waiting.
129 *
130 * - Currently, there is one "global" state machine for the
131 * DataNodeDispatchState executor node. Turning this into per-node state
132 * machines might make the code more asynchronous and/or amenable to
133 * parallel mode support.
134 *
135 * - COPY instead of INSERT. When there's no RETURNING clause, it is more
136 * efficient to COPY data rather than using a prepared statement.
137 *
138 * - Binary instead of text format. Send tuples in binary format instead of
139 * text to save bandwidth and reduce latency.
140 */
141 typedef enum DispatchState
142 {
143 SD_READ,
144 SD_FLUSH,
145 SD_LAST_FLUSH,
146 SD_RETURNING,
147 SD_DONE,
148 } DispatchState;
149
150 typedef struct DataNodeDispatchState
151 {
152 CustomScanState cstate;
153 DispatchState prevstate; /* Previous state in state machine */
154 DispatchState state; /* Current state in state machine */
155 Relation rel; /* The (local) relation we're inserting into */
156 bool set_processed; /* Indicates whether to set the number or processed tuples */
157 DeparsedInsertStmt stmt; /* Partially deparsed insert statement */
158 const char *sql_stmt; /* Fully deparsed insert statement */
159 TupleFactory *tupfactory;
160 List *target_attrs; /* The attributes to send to remote data nodes */
161 List *responses; /* List of responses to process in RETURNING state */
162 HTAB *nodestates; /* Hashtable of per-nodestate (tuple stores) */
163 MemoryContext mcxt; /* Memory context for per-node state */
164 MemoryContext batch_mcxt; /* Memory context for batches of data */
165 int64 num_tuples; /* Total number of tuples flushed each round */
166 int64 next_tuple; /* Next tuple to return to the parent node when in
167 * RETURNING state */
168 int replication_factor; /* > 1 if we replicate tuples across data nodes */
169 StmtParams *stmt_params; /* Parameters to send with statement. Format can be binary or text */
170 int flush_threshold; /* Batch size used for this dispatch state */
171 TupleTableSlot *batch_slot; /* Slot used for sending tuples to data
172 * nodes. Note that this needs to be a
173 * MinimalTuple slot, so we cannot use the
174 * standard ScanSlot in the ScanState because
175 * CustomNode sets it up to be a
176 * VirtualTuple. */
177 } DataNodeDispatchState;
178
179 /*
180 * Plan metadata list indexes.
181 */
182 enum CustomScanPrivateIndex
183 {
184 CustomScanPrivateSql,
185 CustomScanPrivateTargetAttrs,
186 CustomScanPrivateDeparsedInsertStmt,
187 CustomScanPrivateSetProcessed,
188 CustomScanPrivateFlushThreshold
189 };
190
191 #define HAS_RETURNING(sds) ((sds)->stmt.returning != NULL)
192
193 /*
194 * DataNodeState for each data node.
195 *
196 * Tuples destined for a data node are batched in a tuple store until dispatched
197 * using a "flush". A flush happens using the prepared (insert) statement,
198 * which can only be used to send a "full" batch of tuples as the number of
199 * rows in the statement is predefined. Thus, a flush only happens when the
200 * tuple store reaches the predefined size. Once the last tuple is read from
201 * the subnode, a final flush occurs. In that case, a flush is "partial" (less
202 * than the predefined amount). A partial flush cannot use the prepared
203 * statement, since the number of rows do not match, and therefore a one-time
204 * statement is created for the last insert.
205 *
206 * Note that, since we use one DataNodeState per connection, we
207 * could technically have multiple DataNodeStates per data node.
208 */
209 typedef struct DataNodeState
210 {
211 TSConnectionId id; /* Must be first */
212 TSConnection *conn;
213 Tuplestorestate *primary_tupstore; /* Tuples this data node is primary
214 * for. These tuples are returned when
215 * RETURNING is specified. */
216 Tuplestorestate *replica_tupstore; /* Tuples this data node is a replica
217 * for. These tuples are NOT returned
218 * when RETURNING is specified. */
219 PreparedStmt *pstmt; /* Prepared statement to use in the FLUSH state */
220 int num_tuples_sent; /* Number of tuples sent in the FLUSH or LAST_FLUSH states */
221 int num_tuples_inserted; /* Number of tuples inserted (returned in result)
222 * during the FLUSH or LAST_FLUSH states */
223 int next_tuple; /* The next tuple to return in the RETURNING state */
224 TupleTableSlot *slot;
225 } DataNodeState;
226
227 #define NUM_STORED_TUPLES(ss) \
228 (tuplestore_tuple_count((ss)->primary_tupstore) + \
229 ((ss)->replica_tupstore != NULL ? tuplestore_tuple_count((ss)->replica_tupstore) : 0))
230
231 static void
data_node_state_init(DataNodeState * ss,DataNodeDispatchState * sds,TSConnectionId id)232 data_node_state_init(DataNodeState *ss, DataNodeDispatchState *sds, TSConnectionId id)
233 {
234 MemoryContext old = MemoryContextSwitchTo(sds->mcxt);
235
236 memset(ss, 0, sizeof(DataNodeState));
237 ss->id = id;
238 ss->primary_tupstore = tuplestore_begin_heap(false, false, TUPSTORE_MEMSIZE_KB);
239 if (sds->replication_factor > 1)
240 ss->replica_tupstore = tuplestore_begin_heap(false, false, TUPSTORE_MEMSIZE_KB);
241 else
242 ss->replica_tupstore = NULL;
243
244 ss->conn = remote_dist_txn_get_connection(id, REMOTE_TXN_USE_PREP_STMT);
245 ss->pstmt = NULL;
246 ss->next_tuple = 0;
247 ss->num_tuples_sent = 0;
248 ss->num_tuples_inserted = 0;
249
250 MemoryContextSwitchTo(old);
251 }
252
253 static DataNodeState *
data_node_state_get_or_create(DataNodeDispatchState * sds,TSConnectionId id)254 data_node_state_get_or_create(DataNodeDispatchState *sds, TSConnectionId id)
255 {
256 DataNodeState *ss;
257 bool found;
258
259 ss = hash_search(sds->nodestates, &id, HASH_ENTER, &found);
260
261 if (!found)
262 data_node_state_init(ss, sds, id);
263
264 return ss;
265 }
266
267 static void
data_node_state_clear_primary_store(DataNodeState * ss)268 data_node_state_clear_primary_store(DataNodeState *ss)
269 {
270 tuplestore_clear(ss->primary_tupstore);
271 Assert(tuplestore_tuple_count(ss->primary_tupstore) == 0);
272 ss->next_tuple = 0;
273 }
274
275 static void
data_node_state_clear_replica_store(DataNodeState * ss)276 data_node_state_clear_replica_store(DataNodeState *ss)
277 {
278 if (NULL == ss->replica_tupstore)
279 return;
280
281 tuplestore_clear(ss->replica_tupstore);
282 Assert(tuplestore_tuple_count(ss->replica_tupstore) == 0);
283 }
284
285 static void
data_node_state_close(DataNodeState * ss)286 data_node_state_close(DataNodeState *ss)
287 {
288 if (NULL != ss->pstmt)
289 prepared_stmt_close(ss->pstmt);
290
291 tuplestore_end(ss->primary_tupstore);
292
293 if (NULL != ss->replica_tupstore)
294 tuplestore_end(ss->replica_tupstore);
295 }
296
297 static void
data_node_dispatch_begin(CustomScanState * node,EState * estate,int eflags)298 data_node_dispatch_begin(CustomScanState *node, EState *estate, int eflags)
299 {
300 DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
301 CustomScan *cscan = castNode(CustomScan, node->ss.ps.plan);
302 #if PG14_LT
303 ResultRelInfo *rri = estate->es_result_relation_info;
304 #else
305 ResultRelInfo *rri = linitial_node(ResultRelInfo, estate->es_opened_result_relations);
306 #endif
307 Relation rel = rri->ri_RelationDesc;
308 TupleDesc tupdesc = RelationGetDescr(rel);
309 Plan *subplan = linitial(cscan->custom_plans);
310 Cache *hcache = ts_hypertable_cache_pin();
311 Hypertable *ht = ts_hypertable_cache_get_entry(hcache, rel->rd_id, CACHE_FLAG_NONE);
312 PlanState *ps;
313 MemoryContext mcxt =
314 AllocSetContextCreate(estate->es_query_cxt, "DataNodeState", ALLOCSET_SMALL_SIZES);
315 HASHCTL hctl = {
316 .keysize = sizeof(TSConnectionId),
317 .entrysize = sizeof(DataNodeState),
318 .hcxt = mcxt,
319 };
320 List *available_nodes = ts_hypertable_get_available_data_nodes(ht, true);
321
322 Assert(NULL != ht);
323 Assert(hypertable_is_distributed(ht));
324 Assert(NIL != available_nodes);
325
326 ps = ExecInitNode(subplan, estate, eflags);
327
328 node->custom_ps = list_make1(ps);
329 sds->state = SD_READ;
330 sds->rel = rel;
331 sds->replication_factor = ht->fd.replication_factor;
332 sds->sql_stmt = strVal(list_nth(cscan->custom_private, CustomScanPrivateSql));
333 sds->target_attrs = list_nth(cscan->custom_private, CustomScanPrivateTargetAttrs);
334 sds->set_processed = intVal(list_nth(cscan->custom_private, CustomScanPrivateSetProcessed));
335 sds->flush_threshold = intVal(list_nth(cscan->custom_private, CustomScanPrivateFlushThreshold));
336
337 sds->mcxt = mcxt;
338 sds->batch_mcxt = AllocSetContextCreate(mcxt, "DataNodeDispatch batch", ALLOCSET_SMALL_SIZES);
339 sds->nodestates = hash_create("DataNodeDispatch tuple stores",
340 list_length(available_nodes),
341 &hctl,
342 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
343
344 deparsed_insert_stmt_from_list(&sds->stmt,
345 list_nth(cscan->custom_private,
346 CustomScanPrivateDeparsedInsertStmt));
347 /* Setup output functions to generate string values for each target attribute */
348 sds->stmt_params = stmt_params_create(sds->target_attrs, false, tupdesc, sds->flush_threshold);
349
350 if (HAS_RETURNING(sds))
351 sds->tupfactory = tuplefactory_create_for_rel(rel, sds->stmt.retrieved_attrs);
352
353 /* The tuplestores that hold batches of tuples only allow MinimalTuples so
354 * we need a dedicated slot for getting tuples from the stores since the
355 * CustomScan's ScanTupleSlot is a VirtualTuple. */
356 sds->batch_slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
357
358 ts_cache_release(hcache);
359 }
360
361 /*
362 * Store the result of a RETURNING clause.
363 */
364 static void
store_returning_result(DataNodeDispatchState * sds,int row,TupleTableSlot * slot,PGresult * res)365 store_returning_result(DataNodeDispatchState *sds, int row, TupleTableSlot *slot, PGresult *res)
366 {
367 PG_TRY();
368 {
369 HeapTuple newtup = tuplefactory_make_tuple(sds->tupfactory, res, row, PQbinaryTuples(res));
370
371 /* We need to force the tuple into the slot since it is not of the
372 * right type (conversion to the right type will happen if
373 * necessary) */
374 ExecForceStoreHeapTuple(newtup, slot, true);
375 }
376 PG_CATCH();
377 {
378 if (res)
379 PQclear(res);
380 PG_RE_THROW();
381 }
382 PG_END_TRY();
383 }
384
385 static const char *state_names[] = {
386 [SD_READ] = "READ", [SD_FLUSH] = "FLUSH", [SD_LAST_FLUSH] = "LAST_FLUSH",
387 [SD_RETURNING] = "RETURNING", [SD_DONE] = "DONE",
388 };
389
390 /*
391 * Move the state machine to a new state.
392 */
393 static void
data_node_dispatch_set_state(DataNodeDispatchState * sds,DispatchState new_state)394 data_node_dispatch_set_state(DataNodeDispatchState *sds, DispatchState new_state)
395 {
396 Assert(sds->state != new_state);
397
398 elog(DEBUG2,
399 "DataNodeDispatchState: %s -> %s",
400 state_names[sds->state],
401 state_names[new_state]);
402
403 #ifdef USE_ASSERT_CHECKING
404
405 switch (new_state)
406 {
407 case SD_READ:
408 Assert(sds->state == SD_RETURNING);
409 break;
410 case SD_FLUSH:
411 case SD_LAST_FLUSH:
412 Assert(sds->state == SD_READ);
413 break;
414 case SD_RETURNING:
415 Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
416 break;
417 case SD_DONE:
418 Assert(sds->state == SD_RETURNING);
419 }
420
421 #endif
422
423 sds->prevstate = sds->state;
424 sds->state = new_state;
425 }
426
427 static PreparedStmt *
prepare_data_node_insert_stmt(DataNodeDispatchState * sds,TSConnection * conn,int total_params)428 prepare_data_node_insert_stmt(DataNodeDispatchState *sds, TSConnection *conn, int total_params)
429 {
430 AsyncRequest *req;
431 PreparedStmt *stmt;
432 MemoryContext oldcontext = MemoryContextSwitchTo(sds->mcxt);
433
434 req = async_request_send_prepare(conn, sds->sql_stmt, total_params);
435 Assert(req != NULL);
436 stmt = async_request_wait_prepared_statement(req);
437 MemoryContextSwitchTo(oldcontext);
438
439 return stmt;
440 }
441
442 /*
443 * Send a batch of tuples to a data node.
444 *
445 * All stored tuples for the given data node are sent on the node's
446 * connection. If in FLUSH state (i.e., sending a predefined amount of
447 * tuples), use a prepared statement, or construct a custom statement if in
448 * LAST_FLUSH state.
449 *
450 * If there's a RETURNING clause, we reset the read pointer for the store,
451 * since the original tuples need to be returned along with the
452 * node-returned ones. If there is no RETURNING clause, simply clear the
453 * store.
454 */
455 static AsyncRequest *
send_batch_to_data_node(DataNodeDispatchState * sds,DataNodeState * ss)456 send_batch_to_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
457 {
458 TupleTableSlot *slot = sds->batch_slot;
459 AsyncRequest *req;
460 const char *sql_stmt;
461 int response_type = FORMAT_TEXT;
462
463 Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
464 Assert(NUM_STORED_TUPLES(ss) <= sds->flush_threshold);
465 Assert(NUM_STORED_TUPLES(ss) > 0);
466
467 ss->num_tuples_sent = 0;
468
469 while (
470 tuplestore_gettupleslot(ss->primary_tupstore, true /* forward */, false /* copy */, slot))
471 {
472 /* get following parameters from slot */
473 stmt_params_convert_values(sds->stmt_params, slot, NULL);
474 ss->num_tuples_sent++;
475 }
476
477 if (NULL != ss->replica_tupstore)
478 {
479 while (tuplestore_gettupleslot(ss->replica_tupstore,
480 true /* forward */,
481 false /* copy */,
482 slot))
483 {
484 /* get following parameters from slot */
485 stmt_params_convert_values(sds->stmt_params, slot, NULL);
486 ss->num_tuples_sent++;
487 }
488 }
489
490 Assert(ss->num_tuples_sent == NUM_STORED_TUPLES(ss));
491 Assert(ss->num_tuples_sent == stmt_params_converted_tuples(sds->stmt_params));
492
493 if (HAS_RETURNING(sds) && tuplefactory_is_binary(sds->tupfactory))
494 response_type = FORMAT_BINARY;
495 else if (ts_guc_enable_connection_binary_data)
496 response_type = FORMAT_BINARY;
497
498 /* Send tuples */
499 switch (sds->state)
500 {
501 case SD_FLUSH:
502 /* Lazy initialize the prepared statement */
503 if (NULL == ss->pstmt)
504 ss->pstmt =
505 prepare_data_node_insert_stmt(sds,
506 ss->conn,
507 stmt_params_total_values(sds->stmt_params));
508
509 Assert(ss->num_tuples_sent == sds->flush_threshold);
510 req = async_request_send_prepared_stmt_with_params(ss->pstmt,
511 sds->stmt_params,
512 response_type);
513 break;
514 case SD_LAST_FLUSH:
515 sql_stmt = deparsed_insert_stmt_get_sql(&sds->stmt,
516 stmt_params_converted_tuples(sds->stmt_params));
517 Assert(sql_stmt != NULL);
518 Assert(ss->num_tuples_sent < sds->flush_threshold);
519 req =
520 async_request_send_with_params(ss->conn, sql_stmt, sds->stmt_params, response_type);
521 break;
522 default:
523 elog(ERROR, "unexpected data node dispatch state %s", state_names[sds->state]);
524 Assert(false);
525 break;
526 }
527
528 Assert(NULL != req);
529 async_request_attach_user_data(req, ss);
530
531 sds->num_tuples += tuplestore_tuple_count(ss->primary_tupstore);
532
533 /* If there's a RETURNING clause, we need to also return the inserted
534 tuples in
535 rri->ri_projectReturning->pi_exprContext->ecxt_scantuple */
536 if (HAS_RETURNING(sds))
537 tuplestore_rescan(ss->primary_tupstore);
538 else
539 data_node_state_clear_primary_store(ss);
540
541 /* No tuples are returned from the replica store so safe to clear */
542 data_node_state_clear_replica_store(ss);
543
544 /* Since we're done with current batch, reset params so they are safe to use in the next
545 * batch/node */
546 stmt_params_reset(sds->stmt_params);
547
548 return req;
549 }
550
551 /*
552 * Check if we should flush tuples stored for a data node.
553 *
554 * There are two cases when this happens:
555 *
556 * 1. State is SD_FLUSH and the flush threshold is reached.
557 * 2. State is SD_LAST_FLUSH and there are tuples to send.
558 */
559 static bool
should_flush_data_node(DataNodeDispatchState * sds,DataNodeState * ss)560 should_flush_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
561 {
562 int64 num_tuples = NUM_STORED_TUPLES(ss);
563
564 Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
565
566 if (sds->state == SD_FLUSH)
567 {
568 if (num_tuples >= sds->flush_threshold)
569 return true;
570 return false;
571 }
572
573 return num_tuples > 0;
574 }
575
576 /*
577 * Flush the tuples of data nodes that have a full batch.
578 */
579 static AsyncRequestSet *
flush_data_nodes(DataNodeDispatchState * sds)580 flush_data_nodes(DataNodeDispatchState *sds)
581 {
582 AsyncRequestSet *reqset = NULL;
583 DataNodeState *ss;
584 HASH_SEQ_STATUS hseq;
585
586 Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
587
588 hash_seq_init(&hseq, sds->nodestates);
589
590 for (ss = hash_seq_search(&hseq); ss != NULL; ss = hash_seq_search(&hseq))
591 {
592 if (should_flush_data_node(sds, ss))
593 {
594 AsyncRequest *req = send_batch_to_data_node(sds, ss);
595
596 if (NULL != req)
597 {
598 if (NULL == reqset)
599 reqset = async_request_set_create();
600
601 async_request_set_add(reqset, req);
602 }
603 }
604 }
605
606 return reqset;
607 }
608
609 /*
610 * Wait for responses from data nodes after INSERT.
611 *
612 * In case of RETURNING, return a list of responses, otherwise NIL.
613 */
614 static List *
await_all_responses(DataNodeDispatchState * sds,AsyncRequestSet * reqset)615 await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
616 {
617 AsyncResponseResult *rsp;
618 List *results = NIL;
619
620 sds->next_tuple = 0;
621
622 while ((rsp = async_request_set_wait_any_result(reqset)))
623 {
624 DataNodeState *ss = async_response_result_get_user_data(rsp);
625 PGresult *res = async_response_result_get_pg_result(rsp);
626 ExecStatusType status = PQresultStatus(res);
627 bool report_error = true;
628
629 switch (status)
630 {
631 case PGRES_TUPLES_OK:
632 if (!HAS_RETURNING(sds))
633 break;
634
635 results = lappend(results, rsp);
636 ss->num_tuples_inserted = PQntuples(res);
637 Assert(sds->stmt.do_nothing || (ss->num_tuples_inserted == ss->num_tuples_sent));
638 report_error = false;
639 break;
640 case PGRES_COMMAND_OK:
641 if (HAS_RETURNING(sds))
642 break;
643
644 ss->num_tuples_inserted = atoi(PQcmdTuples(res));
645 async_response_result_close(rsp);
646 Assert(sds->stmt.do_nothing || (ss->num_tuples_inserted == ss->num_tuples_sent));
647 report_error = false;
648 break;
649 default:
650 break;
651 }
652
653 if (report_error)
654 async_response_report_error((AsyncResponse *) rsp, ERROR);
655
656 /* Unless there is an ON CONFLICT clause, the number of tuples
657 * returned should greater than zero and be the same as the number of
658 * tuples sent. */
659 Assert(sds->stmt.do_nothing || ss->num_tuples_inserted > 0);
660 ss->next_tuple = 0;
661 }
662
663 return results;
664 }
665
666 /*
667 * Read tuples from the child scan node.
668 *
669 * Read until there's a NULL tuple or we've filled a data node's batch. Ideally,
670 * we'd continue to read more tuples to fill other data nodes' batches, but since
671 * the next tuple might be for the same node that has the full batch, we
672 * risk overfilling. This could be mitigated by using two tuple stores per
673 * data node (current and next batch) and alternate between them. But that also
674 * increases memory requirements and complicates the code, so that's left as a
675 * future optimization.
676 *
677 * Return the number of tuples read.
678 */
679 static int64
handle_read(DataNodeDispatchState * sds)680 handle_read(DataNodeDispatchState *sds)
681 {
682 PlanState *substate = linitial(sds->cstate.custom_ps);
683 ChunkDispatchState *cds = (ChunkDispatchState *) substate;
684 EState *estate = sds->cstate.ss.ps.state;
685 #if PG14_LT
686 ResultRelInfo *rri_saved = estate->es_result_relation_info;
687 #endif
688 int64 num_tuples_read = 0;
689
690 Assert(sds->state == SD_READ);
691
692 /* If we are reading new tuples, we either do it for the first batch or we
693 * finished a previous batch. In either case, reset the batch memory
694 * context so that we release the memory after each batch is finished. */
695 MemoryContextReset(sds->batch_mcxt);
696
697 /* Read tuples from the subnode until flush */
698 while (sds->state == SD_READ)
699 {
700 TupleTableSlot *slot = ExecProcNode(substate);
701
702 if (TupIsNull(slot))
703 data_node_dispatch_set_state(sds, SD_LAST_FLUSH);
704 else
705 {
706 /* The previous node should have routed the tuple to the right
707 * chunk and set the corresponding result relation. The FdwState
708 * should also point to the chunk's insert state. */
709 ResultRelInfo *rri = cds->rri;
710 ChunkInsertState *cis = rri->ri_FdwState;
711 TriggerDesc *trigdesc = rri->ri_TrigDesc;
712 ListCell *lc;
713 bool primary_data_node = true;
714 MemoryContext oldcontext;
715 TupleDesc rri_desc = RelationGetDescr(rri->ri_RelationDesc);
716
717 if (NULL != rri->ri_projectReturning && rri_desc->constr &&
718 rri_desc->constr->has_generated_stored)
719 ExecComputeStoredGeneratedCompat(rri, estate, slot, CMD_INSERT);
720
721 Assert(NULL != cis);
722
723 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
724
725 /* While we could potentially support triggers on frontend nodes,
726 * the triggers should exists also on the remote node and will be
727 * executed there. For new, the safest bet is to avoid triggers on
728 * the frontend. */
729 if (trigdesc && (trigdesc->trig_insert_after_row || trigdesc->trig_insert_before_row))
730 elog(ERROR, "cannot insert into remote chunk with row triggers");
731
732 /* Total count */
733 num_tuples_read++;
734
735 foreach (lc, cis->chunk_data_nodes)
736 {
737 ChunkDataNode *cdn = lfirst(lc);
738 TSConnectionId id = remote_connection_id(cdn->foreign_server_oid, cis->user_id);
739 DataNodeState *ss = data_node_state_get_or_create(sds, id);
740
741 /* This will store one copy of the tuple per data node, which is
742 * a bit inefficient. Note that we put the tuple in the
743 * primary store for the first data node, but the replica store
744 * for all other data nodes. This is to be able to know which
745 * tuples to return in a RETURNING statement. */
746 if (primary_data_node)
747 tuplestore_puttupleslot(ss->primary_tupstore, slot);
748 else
749 tuplestore_puttupleslot(ss->replica_tupstore, slot);
750
751 /* Once one data node has reached the batch size, we stop
752 * reading. */
753 if (sds->state != SD_FLUSH && NUM_STORED_TUPLES(ss) >= sds->flush_threshold)
754 data_node_dispatch_set_state(sds, SD_FLUSH);
755
756 primary_data_node = false;
757 }
758
759 MemoryContextSwitchTo(oldcontext);
760 }
761 }
762
763 #if PG14_LT
764 estate->es_result_relation_info = rri_saved;
765 #endif
766
767 return num_tuples_read;
768 }
769
770 /*
771 * Flush all data nodes and move to the RETURNING state.
772 *
773 * Note that future optimizations could do this more asynchronously by doing
774 * other work until responses are available (e.g., one could start to fill the
775 * next batch while waiting for a response). However, the async API currently
776 * doesn't expose a way to check for a response without blocking and
777 * interleaving different tasks would also complicate the state machine.
778 */
779 static void
handle_flush(DataNodeDispatchState * sds)780 handle_flush(DataNodeDispatchState *sds)
781 {
782 MemoryContext oldcontext;
783 AsyncRequestSet *reqset;
784
785 Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
786
787 /* Save the requests and responses in the batch memory context since they
788 * need to survive across several iterations of the executor loop when
789 * there is a RETURNING clause. The batch memory context is cleared the
790 * next time we read a batch. */
791 oldcontext = MemoryContextSwitchTo(sds->batch_mcxt);
792 reqset = flush_data_nodes(sds);
793
794 if (NULL != reqset)
795 {
796 sds->responses = await_all_responses(sds, reqset);
797 pfree(reqset);
798 }
799
800 data_node_dispatch_set_state(sds, SD_RETURNING);
801 MemoryContextSwitchTo(oldcontext);
802 }
803
804 /*
805 * Get a tuple when there's a RETURNING clause.
806 */
807 static TupleTableSlot *
get_returning_tuple(DataNodeDispatchState * sds)808 get_returning_tuple(DataNodeDispatchState *sds)
809 {
810 ChunkDispatchState *cds = (ChunkDispatchState *) linitial(sds->cstate.custom_ps);
811 ResultRelInfo *rri = cds->rri;
812 TupleTableSlot *res_slot = sds->batch_slot;
813 TupleTableSlot *slot = sds->cstate.ss.ss_ScanTupleSlot;
814 ExprContext *econtext;
815
816 Assert(NULL != rri->ri_projectReturning);
817
818 econtext = rri->ri_projectReturning->pi_exprContext;
819
820 /*
821 * Store a RETURNING tuple. If HAS_RETURNING() is false, just emit a
822 * dummy tuple. (has_returning is false when the local query is of the
823 * form "UPDATE/DELETE .. RETURNING 1" for example.)
824 */
825 if (!HAS_RETURNING(sds))
826 {
827 Assert(NIL == sds->responses);
828 ExecStoreAllNullTuple(slot);
829 res_slot = slot;
830 }
831 else
832 {
833 while (NIL != sds->responses)
834 {
835 AsyncResponseResult *rsp = linitial(sds->responses);
836 DataNodeState *ss = async_response_result_get_user_data(rsp);
837 PGresult *res = async_response_result_get_pg_result(rsp);
838 int64 num_tuples_to_return = tuplestore_tuple_count(ss->primary_tupstore);
839 bool last_tuple;
840 bool got_tuple = false;
841
842 if (num_tuples_to_return > 0)
843 {
844 MemoryContext oldcontext;
845
846 last_tuple = (ss->next_tuple + 1) == num_tuples_to_return;
847
848 Assert(ss->next_tuple < ss->num_tuples_inserted);
849 Assert(ss->next_tuple < num_tuples_to_return);
850
851 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
852 store_returning_result(sds, ss->next_tuple, slot, res);
853
854 /* Get the next tuple from the store. If it is the last tuple, we
855 * need to make a copy since we will clear the store before
856 * returning. */
857 got_tuple = tuplestore_gettupleslot(ss->primary_tupstore,
858 true /* forward */,
859 last_tuple /* copy */,
860 res_slot);
861
862 MemoryContextSwitchTo(oldcontext);
863 Assert(got_tuple);
864 Assert(!TupIsNull(res_slot));
865 ss->next_tuple++;
866 }
867 else
868 last_tuple = true;
869
870 if (last_tuple)
871 {
872 sds->responses = list_delete_first(sds->responses);
873 async_response_result_close(rsp);
874 data_node_state_clear_primary_store(ss);
875 }
876
877 if (got_tuple)
878 break;
879 }
880 }
881
882 econtext->ecxt_scantuple = slot;
883
884 return res_slot;
885 }
886
887 /*
888 * Get the next tuple slot to return when there's a RETURNING
889 * clause. Otherwise, return an empty slot.
890 */
891 static TupleTableSlot *
handle_returning(DataNodeDispatchState * sds)892 handle_returning(DataNodeDispatchState *sds)
893 {
894 EState *estate = sds->cstate.ss.ps.state;
895 ChunkDispatchState *cds = (ChunkDispatchState *) linitial(sds->cstate.custom_ps);
896 ResultRelInfo *rri = cds->rri;
897 TupleTableSlot *slot = sds->cstate.ss.ss_ScanTupleSlot;
898 bool done = false;
899 MemoryContext oldcontext;
900
901 Assert(sds->state == SD_RETURNING);
902 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
903
904 /* No returning projection, which means we are done */
905 if (NULL == rri->ri_projectReturning)
906 {
907 Assert(!HAS_RETURNING(sds));
908 Assert(NIL == sds->responses);
909 done = true;
910
911 if (sds->set_processed)
912 estate->es_processed += sds->num_tuples;
913 }
914
915 /* If we've processed all tuples, then we're also done */
916 if (sds->next_tuple >= sds->num_tuples)
917 done = true;
918
919 if (done)
920 {
921 sds->next_tuple = 0;
922 sds->num_tuples = 0;
923
924 slot = ExecClearTuple(slot);
925
926 if (sds->prevstate == SD_LAST_FLUSH)
927 data_node_dispatch_set_state(sds, SD_DONE);
928 else
929 data_node_dispatch_set_state(sds, SD_READ);
930 }
931 else
932 {
933 slot = get_returning_tuple(sds);
934 Assert(!TupIsNull(slot));
935 sds->next_tuple++;
936
937 if (sds->set_processed)
938 estate->es_processed++;
939 }
940
941 MemoryContextSwitchTo(oldcontext);
942
943 return slot;
944 }
945
946 /*
947 * Execute the remote INSERT.
948 *
949 * This is called every time the parent asks for a new tuple. Read the child
950 * scan node and buffer until there's a full batch, then flush by sending to
951 * data node(s). If there's a returning statement, we return the flushed tuples
952 * one-by-one, or continue reading more tuples from the child until there's a
953 * NULL tuple.
954 */
955 static TupleTableSlot *
data_node_dispatch_exec(CustomScanState * node)956 data_node_dispatch_exec(CustomScanState *node)
957 {
958 DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
959 TupleTableSlot *slot = NULL;
960 bool done = false;
961
962 #if PG14_LT
963 /* Initially, the result relation should always match the hypertable. */
964 Assert(node->ss.ps.state->es_result_relation_info->ri_RelationDesc->rd_id == sds->rel->rd_id);
965 #endif
966
967 /* Read tuples and flush until there's either something to return or no
968 * more tuples to read */
969 while (!done)
970 {
971 switch (sds->state)
972 {
973 case SD_READ:
974 handle_read(sds);
975 break;
976 case SD_FLUSH:
977 case SD_LAST_FLUSH:
978 handle_flush(sds);
979 break;
980 case SD_RETURNING:
981 slot = handle_returning(sds);
982 /* If a tuple was read, return it and wait to get called again */
983 done = !TupIsNull(slot);
984 break;
985 case SD_DONE:
986 done = true;
987 Assert(TupIsNull(slot));
988 break;
989 }
990 }
991
992 #if PG14_LT
993 /* Tuple routing in the ChunkDispatchState subnode sets the result
994 * relation to a chunk when routing, but the read handler should have
995 * ensured the result relation is reset. */
996 Assert(node->ss.ps.state->es_result_relation_info->ri_RelationDesc->rd_id == sds->rel->rd_id);
997 Assert(node->ss.ps.state->es_result_relation_info->ri_usesFdwDirectModify);
998 #endif
999
1000 return slot;
1001 }
1002
1003 static void
data_node_dispatch_rescan(CustomScanState * node)1004 data_node_dispatch_rescan(CustomScanState *node)
1005 {
1006 /* Cannot rescan and start from the beginning since we might already have
1007 * sent data to remote nodes */
1008 elog(ERROR, "cannot restart inserts to remote nodes");
1009 }
1010
1011 static void
data_node_dispatch_end(CustomScanState * node)1012 data_node_dispatch_end(CustomScanState *node)
1013 {
1014 DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
1015 DataNodeState *ss;
1016 HASH_SEQ_STATUS hseq;
1017
1018 hash_seq_init(&hseq, sds->nodestates);
1019
1020 for (ss = hash_seq_search(&hseq); ss != NULL; ss = hash_seq_search(&hseq))
1021 data_node_state_close(ss);
1022
1023 hash_destroy(sds->nodestates);
1024 ExecDropSingleTupleTableSlot(sds->batch_slot);
1025 ExecEndNode(linitial(node->custom_ps));
1026 }
1027
1028 static void
data_node_dispatch_explain(CustomScanState * node,List * ancestors,ExplainState * es)1029 data_node_dispatch_explain(CustomScanState *node, List *ancestors, ExplainState *es)
1030 {
1031 DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
1032
1033 ExplainPropertyInteger("Batch size", NULL, sds->flush_threshold, es);
1034
1035 /*
1036 * Add remote query, when VERBOSE option is specified.
1037 */
1038 if (es->verbose)
1039 {
1040 const char *explain_sql =
1041 deparsed_insert_stmt_get_sql_explain(&sds->stmt, sds->flush_threshold);
1042
1043 ExplainPropertyText("Remote SQL", explain_sql, es);
1044 }
1045 }
1046
1047 static CustomExecMethods data_node_dispatch_state_methods = {
1048 .CustomName = "DataNodeDispatchState",
1049 .BeginCustomScan = data_node_dispatch_begin,
1050 .EndCustomScan = data_node_dispatch_end,
1051 .ExecCustomScan = data_node_dispatch_exec,
1052 .ReScanCustomScan = data_node_dispatch_rescan,
1053 .ExplainCustomScan = data_node_dispatch_explain,
1054 };
1055
1056 /* Only allocate the custom scan state. Initialize in the begin handler. */
1057 static Node *
data_node_dispatch_state_create(CustomScan * cscan)1058 data_node_dispatch_state_create(CustomScan *cscan)
1059 {
1060 DataNodeDispatchState *sds;
1061
1062 sds = (DataNodeDispatchState *) newNode(sizeof(DataNodeDispatchState), T_CustomScanState);
1063 sds->cstate.methods = &data_node_dispatch_state_methods;
1064
1065 return (Node *) sds;
1066 }
1067
1068 static CustomScanMethods data_node_dispatch_plan_methods = {
1069 .CustomName = "DataNodeDispatch",
1070 .CreateCustomScanState = data_node_dispatch_state_create,
1071 };
1072
1073 static List *
get_insert_attrs(Relation rel)1074 get_insert_attrs(Relation rel)
1075 {
1076 TupleDesc tupdesc = RelationGetDescr(rel);
1077 List *attrs = NIL;
1078 int i;
1079
1080 for (i = 0; i < tupdesc->natts; i++)
1081 {
1082 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
1083
1084 if (attr->attisdropped || attr->attgenerated != '\0')
1085 continue;
1086
1087 attrs = lappend_int(attrs, AttrOffsetGetAttrNumber(i));
1088 }
1089
1090 return attrs;
1091 }
1092
1093 /*
1094 * Plan a remote INSERT on a hypertable.
1095 *
1096 * Create the metadata needed for a remote INSERT. This mostly involves
1097 * deparsing the INSERT statement.
1098 *
1099 * Return the metadata as a list of Nodes that can be saved in a prepared
1100 * statement.
1101 */
1102 static List *
plan_remote_insert(PlannerInfo * root,DataNodeDispatchPath * sdpath)1103 plan_remote_insert(PlannerInfo *root, DataNodeDispatchPath *sdpath)
1104 {
1105 ModifyTablePath *mtpath = sdpath->mtpath;
1106 OnConflictAction onconflict =
1107 mtpath->onconflict == NULL ? ONCONFLICT_NONE : mtpath->onconflict->action;
1108 List *returning_lists = sdpath->mtpath->returningLists;
1109 RangeTblEntry *rte = planner_rt_fetch(sdpath->hypertable_rti, root);
1110 Relation rel;
1111 DeparsedInsertStmt stmt;
1112 const char *sql;
1113 List *target_attrs = NIL;
1114 List *returning_list = NIL;
1115 bool do_nothing = false;
1116 int flush_threshold;
1117
1118 /*
1119 * Core code already has some lock on each rel being planned, so we can
1120 * use NoLock here.
1121 */
1122 rel = table_open(rte->relid, NoLock);
1123
1124 /*
1125 * Extract the relevant RETURNING list if any.
1126 */
1127 if (NIL != returning_lists)
1128 returning_list = (List *) list_nth(returning_lists, sdpath->subplan_index);
1129
1130 /*
1131 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1132 * should have already been rejected in the optimizer, as presently there
1133 * is no way to recognize an arbiter index on a foreign table. Only DO
1134 * NOTHING is supported without an inference specification.
1135 */
1136 if (onconflict == ONCONFLICT_NOTHING)
1137 do_nothing = true;
1138 else if (onconflict != ONCONFLICT_NONE)
1139 ereport(ERROR,
1140 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1141 errmsg("ON CONFLICT DO UPDATE not supported"
1142 " on distributed hypertables")));
1143
1144 /*
1145 * Construct the SQL command string matching the fixed batch size. We also
1146 * save the partially deparsed SQL command so that we can easily create
1147 * one with less value parameters later for flushing a partially filled
1148 * batch.
1149 */
1150 target_attrs = get_insert_attrs(rel);
1151
1152 deparse_insert_stmt(&stmt,
1153 rte,
1154 sdpath->hypertable_rti,
1155 rel,
1156 target_attrs,
1157 do_nothing,
1158 returning_list);
1159
1160 /* Set suitable flush threshold value that takes into account the max number
1161 * of prepared statement arguments */
1162 flush_threshold =
1163 stmt_params_validate_num_tuples(list_length(target_attrs), TUPSTORE_FLUSH_THRESHOLD);
1164
1165 sql = deparsed_insert_stmt_get_sql(&stmt, flush_threshold);
1166
1167 table_close(rel, NoLock);
1168
1169 return list_make5(makeString((char *) sql),
1170 target_attrs,
1171 deparsed_insert_stmt_to_list(&stmt),
1172 makeInteger(sdpath->mtpath->canSetTag),
1173 makeInteger(flush_threshold));
1174 }
1175
1176 static Plan *
data_node_dispatch_plan_create(PlannerInfo * root,RelOptInfo * rel,struct CustomPath * best_path,List * tlist,List * clauses,List * custom_plans)1177 data_node_dispatch_plan_create(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path,
1178 List *tlist, List *clauses, List *custom_plans)
1179 {
1180 DataNodeDispatchPath *sdpath = (DataNodeDispatchPath *) best_path;
1181 CustomScan *cscan = makeNode(CustomScan);
1182 Plan *subplan;
1183
1184 Assert(list_length(custom_plans) == 1);
1185
1186 subplan = linitial(custom_plans);
1187 cscan->methods = &data_node_dispatch_plan_methods;
1188 cscan->custom_plans = custom_plans;
1189 cscan->scan.scanrelid = 0;
1190 cscan->scan.plan.targetlist = tlist;
1191 cscan->custom_scan_tlist = subplan->targetlist;
1192 cscan->custom_private = plan_remote_insert(root, sdpath);
1193
1194 return &cscan->scan.plan;
1195 }
1196
1197 static CustomPathMethods data_node_dispatch_path_methods = {
1198 .CustomName = "DataNodeDispatchPath",
1199 .PlanCustomPath = data_node_dispatch_plan_create,
1200 };
1201
1202 Path *
data_node_dispatch_path_create(PlannerInfo * root,ModifyTablePath * mtpath,Index hypertable_rti,int subplan_index)1203 data_node_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath, Index hypertable_rti,
1204 int subplan_index)
1205 {
1206 DataNodeDispatchPath *sdpath = palloc0(sizeof(DataNodeDispatchPath));
1207 Path *subpath = ts_chunk_dispatch_path_create(root, mtpath, hypertable_rti, subplan_index);
1208
1209 /* Copy costs, etc. from the subpath */
1210 memcpy(&sdpath->cpath.path, subpath, sizeof(Path));
1211
1212 sdpath->cpath.path.type = T_CustomPath;
1213 sdpath->cpath.path.pathtype = T_CustomScan;
1214 sdpath->cpath.custom_paths = list_make1(subpath);
1215 sdpath->cpath.methods = &data_node_dispatch_path_methods;
1216 sdpath->mtpath = mtpath;
1217 sdpath->hypertable_rti = hypertable_rti;
1218 sdpath->subplan_index = subplan_index;
1219
1220 return &sdpath->cpath.path;
1221 }
1222