1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <parser/parsetree.h>
8 #include <nodes/execnodes.h>
9 #include <nodes/extensible.h>
10 #include <nodes/makefuncs.h>
11 #include <nodes/nodeFuncs.h>
12 #include <nodes/plannodes.h>
13 #include <executor/executor.h>
14 #include <executor/nodeModifyTable.h>
15 #include <utils/lsyscache.h>
16 #include <utils/builtins.h>
17 #include <utils/hsearch.h>
18 #include <utils/tuplestore.h>
19 #include <utils/memutils.h>
20 #include <funcapi.h>
21 #include <miscadmin.h>
22 
23 #include <chunk_data_node.h>
24 #include <nodes/chunk_dispatch_plan.h>
25 #include <nodes/chunk_dispatch_state.h>
26 #include <nodes/chunk_insert_state.h>
27 #include <hypertable_cache.h>
28 #include <compat/compat.h>
29 #include <guc.h>
30 
31 #include "data_node_dispatch.h"
32 #include "fdw/scan_exec.h"
33 #include "fdw/deparse.h"
34 #include "remote/utils.h"
35 #include "remote/dist_txn.h"
36 #include "remote/async.h"
37 #include "remote/data_format.h"
38 #include "remote/tuplefactory.h"
39 
40 #define TUPSTORE_MEMSIZE_KB work_mem
41 #define TUPSTORE_FLUSH_THRESHOLD ts_guc_max_insert_batch_size
42 
43 typedef struct DataNodeDispatchPath
44 {
45 	CustomPath cpath;
46 	ModifyTablePath *mtpath;
47 	Index hypertable_rti; /* range table index of Hypertable */
48 	int subplan_index;
49 } DataNodeDispatchPath;
50 
51 /*
52  * DataNodeDispatch dispatches tuples to data nodes using batching. It inserts
53  * itself below a ModifyTable node in the plan and subsequent execution tree,
54  * like so:
55  *
56  *          --------------------   Set "direct modify plans" to
57  *          | HypertableInsert |   signal ModifyTable to only
58  *          --------------------   handle returning projection.
59  *                   |
60  *            ----------------     resultRelInfo->ri_usesFdwDirectModify
61  *            |  ModifyTable |     should be TRUE. Handle returning projection.
62  *            ----------------
63  *                   ^
64  *                   | RETURNING tuple or nothing
65  *          --------------------
66  *          | DataNodeDispatch |    Batch and send tuples to data nodes.
67  *          --------------------
68  *                   ^
69  *                   | Chunk-routed tuple
70  *           -----------------
71  *           | ChunkDispatch |     Route tuple to chunk.
72  *           -----------------     Set es_result_relation.
73  *                   ^
74  *                   | tuple
75  *             --------------
76  *             | ValuesScan |     VALUES ('2019-02-23 13:43', 1, 8.9),
77  *             --------------            ('2019-02-23 13:46', 2, 1.5);
78  *
79  *
80  * Data node dispatching uses the state machine outlined below:
81  *
82  * READ: read tuples from the subnode and save in per-node stores until one
83  * of them reaches FLUSH_THRESHOLD and then move to FLUSH. If a NULL-tuple is
84  * read before the threshold is reached, move to LAST_FLUSH. In case of
85  * replication, tuples are split across a primary and a replica tuple store.
86  *
87  * FLUSH: flush the tuples for the data nodes that reached
88  * FLUSH_THRESHOLD.
89  *
90  * LAST_FLUSH: flush tuples for all data nodes.
91  *
92  * RETURNING: if there is a RETURNING clause, return the inserted tuples
93  * one-by-one from all flushed nodes. When no more tuples remain, move to
94  * READ again or DONE if the previous state was LAST_FLUSH. Note, that in case
95  * of replication, the tuples are split across a primary and a replica tuple
96  * store for each data node. only tuples in the primary tuple store are
97  * returned. It is implicitly assumed that the primary tuples are sent on a
98  * connection before the replica tuples and thus the data node will also return
99  * the primary tuples first (in order of insertion).
100  *
101  *   read
102  *    ==
103  *  thresh     -------------
104  *      -----> |   FLUSH   |--->----
105  *      |      -------------       |       prev_state
106  *      |                          |           ==
107  *  --------                 ------------- LAST_FLUSH ----------
108  *  | READ | <-------------- | RETURNING | ---------> |  DONE  |
109  *  --------                 -------------            ----------
110  *      |                          ^
111  *      |     --------------       |
112  *      ----> | LAST_FLUSH | -------
113  * read == 0  --------------
114  *
115  *
116  * Potential optimizations
117  * =======================
118  *
119  * - Tuples from both the primary and the replica tuple store are flushed with
120  *   a RETURNING clause when such a clause is available. However, tuples from
121  *   the replica store need not be returned, so using a separate prepared
122  *   statement without RETURNING for the replica store would save bandwidth.
123  *
124  * - Better asynchronous behavior. When reading tuples, a flush happens as
125  *   soon as a tuple store is filled instead of continuing to fill until more
126  *   stores can be flushed. Further, after flushing tuples for a data node, the
127  *   code immediately waits for a response instead of doing other work while
128  *   waiting.
129  *
130  * - Currently, there is one "global" state machine for the
131  *   DataNodeDispatchState executor node. Turning this into per-node state
132  *   machines might make the code more asynchronous and/or amenable to
133  *   parallel mode support.
134  *
135  * - COPY instead of INSERT. When there's no RETURNING clause, it is more
136  *   efficient to COPY data rather than using a prepared statement.
137  *
138  * - Binary instead of text format. Send tuples in binary format instead of
139  *   text to save bandwidth and reduce latency.
140  */
141 typedef enum DispatchState
142 {
143 	SD_READ,
144 	SD_FLUSH,
145 	SD_LAST_FLUSH,
146 	SD_RETURNING,
147 	SD_DONE,
148 } DispatchState;
149 
150 typedef struct DataNodeDispatchState
151 {
152 	CustomScanState cstate;
153 	DispatchState prevstate; /* Previous state in state machine */
154 	DispatchState state;	 /* Current state in state machine */
155 	Relation rel;			 /* The (local) relation we're inserting into */
156 	bool set_processed;		 /* Indicates whether to set the number or processed tuples */
157 	DeparsedInsertStmt stmt; /* Partially deparsed insert statement */
158 	const char *sql_stmt;	/* Fully deparsed insert statement */
159 	TupleFactory *tupfactory;
160 	List *target_attrs;		  /* The attributes to send to remote data nodes */
161 	List *responses;		  /* List of responses to process in RETURNING state */
162 	HTAB *nodestates;		  /* Hashtable of per-nodestate (tuple stores) */
163 	MemoryContext mcxt;		  /* Memory context for per-node state */
164 	MemoryContext batch_mcxt; /* Memory context for batches of data */
165 	int64 num_tuples;		  /* Total number of tuples flushed each round */
166 	int64 next_tuple;		  /* Next tuple to return to the parent node when in
167 							   * RETURNING state */
168 	int replication_factor;   /* > 1 if we replicate tuples across data nodes */
169 	StmtParams *stmt_params;  /* Parameters to send with statement. Format can be binary or text */
170 	int flush_threshold;	  /* Batch size used for this dispatch state */
171 	TupleTableSlot *batch_slot; /* Slot used for sending tuples to data
172 								 * nodes. Note that this needs to be a
173 								 * MinimalTuple slot, so we cannot use the
174 								 * standard ScanSlot in the ScanState because
175 								 * CustomNode sets it up to be a
176 								 * VirtualTuple. */
177 } DataNodeDispatchState;
178 
179 /*
180  * Plan metadata list indexes.
181  */
182 enum CustomScanPrivateIndex
183 {
184 	CustomScanPrivateSql,
185 	CustomScanPrivateTargetAttrs,
186 	CustomScanPrivateDeparsedInsertStmt,
187 	CustomScanPrivateSetProcessed,
188 	CustomScanPrivateFlushThreshold
189 };
190 
191 #define HAS_RETURNING(sds) ((sds)->stmt.returning != NULL)
192 
193 /*
194  * DataNodeState for each data node.
195  *
196  * Tuples destined for a data node are batched in a tuple store until dispatched
197  * using a "flush". A flush happens using the prepared (insert) statement,
198  * which can only be used to send a "full" batch of tuples as the number of
199  * rows in the statement is predefined. Thus, a flush only happens when the
200  * tuple store reaches the predefined size. Once the last tuple is read from
201  * the subnode, a final flush occurs. In that case, a flush is "partial" (less
202  * than the predefined amount). A partial flush cannot use the prepared
203  * statement, since the number of rows do not match, and therefore a one-time
204  * statement is created for the last insert.
205  *
206  * Note that, since we use one DataNodeState per connection, we
207  * could technically have multiple DataNodeStates per data node.
208  */
209 typedef struct DataNodeState
210 {
211 	TSConnectionId id; /* Must be first */
212 	TSConnection *conn;
213 	Tuplestorestate *primary_tupstore; /* Tuples this data node is primary
214 										* for. These tuples are returned when
215 										* RETURNING is specified. */
216 	Tuplestorestate *replica_tupstore; /* Tuples this data node is a replica
217 										* for. These tuples are NOT returned
218 										* when RETURNING is specified. */
219 	PreparedStmt *pstmt;			   /* Prepared statement to use in the FLUSH state */
220 	int num_tuples_sent;			   /* Number of tuples sent in the FLUSH or LAST_FLUSH states */
221 	int num_tuples_inserted;		   /* Number of tuples inserted (returned in result)
222 										* during the FLUSH or LAST_FLUSH states */
223 	int next_tuple;					   /* The next tuple to return in the RETURNING state */
224 	TupleTableSlot *slot;
225 } DataNodeState;
226 
227 #define NUM_STORED_TUPLES(ss)                                                                      \
228 	(tuplestore_tuple_count((ss)->primary_tupstore) +                                              \
229 	 ((ss)->replica_tupstore != NULL ? tuplestore_tuple_count((ss)->replica_tupstore) : 0))
230 
231 static void
data_node_state_init(DataNodeState * ss,DataNodeDispatchState * sds,TSConnectionId id)232 data_node_state_init(DataNodeState *ss, DataNodeDispatchState *sds, TSConnectionId id)
233 {
234 	MemoryContext old = MemoryContextSwitchTo(sds->mcxt);
235 
236 	memset(ss, 0, sizeof(DataNodeState));
237 	ss->id = id;
238 	ss->primary_tupstore = tuplestore_begin_heap(false, false, TUPSTORE_MEMSIZE_KB);
239 	if (sds->replication_factor > 1)
240 		ss->replica_tupstore = tuplestore_begin_heap(false, false, TUPSTORE_MEMSIZE_KB);
241 	else
242 		ss->replica_tupstore = NULL;
243 
244 	ss->conn = remote_dist_txn_get_connection(id, REMOTE_TXN_USE_PREP_STMT);
245 	ss->pstmt = NULL;
246 	ss->next_tuple = 0;
247 	ss->num_tuples_sent = 0;
248 	ss->num_tuples_inserted = 0;
249 
250 	MemoryContextSwitchTo(old);
251 }
252 
253 static DataNodeState *
data_node_state_get_or_create(DataNodeDispatchState * sds,TSConnectionId id)254 data_node_state_get_or_create(DataNodeDispatchState *sds, TSConnectionId id)
255 {
256 	DataNodeState *ss;
257 	bool found;
258 
259 	ss = hash_search(sds->nodestates, &id, HASH_ENTER, &found);
260 
261 	if (!found)
262 		data_node_state_init(ss, sds, id);
263 
264 	return ss;
265 }
266 
267 static void
data_node_state_clear_primary_store(DataNodeState * ss)268 data_node_state_clear_primary_store(DataNodeState *ss)
269 {
270 	tuplestore_clear(ss->primary_tupstore);
271 	Assert(tuplestore_tuple_count(ss->primary_tupstore) == 0);
272 	ss->next_tuple = 0;
273 }
274 
275 static void
data_node_state_clear_replica_store(DataNodeState * ss)276 data_node_state_clear_replica_store(DataNodeState *ss)
277 {
278 	if (NULL == ss->replica_tupstore)
279 		return;
280 
281 	tuplestore_clear(ss->replica_tupstore);
282 	Assert(tuplestore_tuple_count(ss->replica_tupstore) == 0);
283 }
284 
285 static void
data_node_state_close(DataNodeState * ss)286 data_node_state_close(DataNodeState *ss)
287 {
288 	if (NULL != ss->pstmt)
289 		prepared_stmt_close(ss->pstmt);
290 
291 	tuplestore_end(ss->primary_tupstore);
292 
293 	if (NULL != ss->replica_tupstore)
294 		tuplestore_end(ss->replica_tupstore);
295 }
296 
297 static void
data_node_dispatch_begin(CustomScanState * node,EState * estate,int eflags)298 data_node_dispatch_begin(CustomScanState *node, EState *estate, int eflags)
299 {
300 	DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
301 	CustomScan *cscan = castNode(CustomScan, node->ss.ps.plan);
302 #if PG14_LT
303 	ResultRelInfo *rri = estate->es_result_relation_info;
304 #else
305 	ResultRelInfo *rri = linitial_node(ResultRelInfo, estate->es_opened_result_relations);
306 #endif
307 	Relation rel = rri->ri_RelationDesc;
308 	TupleDesc tupdesc = RelationGetDescr(rel);
309 	Plan *subplan = linitial(cscan->custom_plans);
310 	Cache *hcache = ts_hypertable_cache_pin();
311 	Hypertable *ht = ts_hypertable_cache_get_entry(hcache, rel->rd_id, CACHE_FLAG_NONE);
312 	PlanState *ps;
313 	MemoryContext mcxt =
314 		AllocSetContextCreate(estate->es_query_cxt, "DataNodeState", ALLOCSET_SMALL_SIZES);
315 	HASHCTL hctl = {
316 		.keysize = sizeof(TSConnectionId),
317 		.entrysize = sizeof(DataNodeState),
318 		.hcxt = mcxt,
319 	};
320 	List *available_nodes = ts_hypertable_get_available_data_nodes(ht, true);
321 
322 	Assert(NULL != ht);
323 	Assert(hypertable_is_distributed(ht));
324 	Assert(NIL != available_nodes);
325 
326 	ps = ExecInitNode(subplan, estate, eflags);
327 
328 	node->custom_ps = list_make1(ps);
329 	sds->state = SD_READ;
330 	sds->rel = rel;
331 	sds->replication_factor = ht->fd.replication_factor;
332 	sds->sql_stmt = strVal(list_nth(cscan->custom_private, CustomScanPrivateSql));
333 	sds->target_attrs = list_nth(cscan->custom_private, CustomScanPrivateTargetAttrs);
334 	sds->set_processed = intVal(list_nth(cscan->custom_private, CustomScanPrivateSetProcessed));
335 	sds->flush_threshold = intVal(list_nth(cscan->custom_private, CustomScanPrivateFlushThreshold));
336 
337 	sds->mcxt = mcxt;
338 	sds->batch_mcxt = AllocSetContextCreate(mcxt, "DataNodeDispatch batch", ALLOCSET_SMALL_SIZES);
339 	sds->nodestates = hash_create("DataNodeDispatch tuple stores",
340 								  list_length(available_nodes),
341 								  &hctl,
342 								  HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
343 
344 	deparsed_insert_stmt_from_list(&sds->stmt,
345 								   list_nth(cscan->custom_private,
346 											CustomScanPrivateDeparsedInsertStmt));
347 	/* Setup output functions to generate string values for each target attribute */
348 	sds->stmt_params = stmt_params_create(sds->target_attrs, false, tupdesc, sds->flush_threshold);
349 
350 	if (HAS_RETURNING(sds))
351 		sds->tupfactory = tuplefactory_create_for_rel(rel, sds->stmt.retrieved_attrs);
352 
353 	/* The tuplestores that hold batches of tuples only allow MinimalTuples so
354 	 * we need a dedicated slot for getting tuples from the stores since the
355 	 * CustomScan's ScanTupleSlot is a VirtualTuple. */
356 	sds->batch_slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
357 
358 	ts_cache_release(hcache);
359 }
360 
361 /*
362  * Store the result of a RETURNING clause.
363  */
364 static void
store_returning_result(DataNodeDispatchState * sds,int row,TupleTableSlot * slot,PGresult * res)365 store_returning_result(DataNodeDispatchState *sds, int row, TupleTableSlot *slot, PGresult *res)
366 {
367 	PG_TRY();
368 	{
369 		HeapTuple newtup = tuplefactory_make_tuple(sds->tupfactory, res, row, PQbinaryTuples(res));
370 
371 		/* We need to force the tuple into the slot since it is not of the
372 		 * right type (conversion to the right type will happen if
373 		 * necessary) */
374 		ExecForceStoreHeapTuple(newtup, slot, true);
375 	}
376 	PG_CATCH();
377 	{
378 		if (res)
379 			PQclear(res);
380 		PG_RE_THROW();
381 	}
382 	PG_END_TRY();
383 }
384 
385 static const char *state_names[] = {
386 	[SD_READ] = "READ",			  [SD_FLUSH] = "FLUSH", [SD_LAST_FLUSH] = "LAST_FLUSH",
387 	[SD_RETURNING] = "RETURNING", [SD_DONE] = "DONE",
388 };
389 
390 /*
391  * Move the state machine to a new state.
392  */
393 static void
data_node_dispatch_set_state(DataNodeDispatchState * sds,DispatchState new_state)394 data_node_dispatch_set_state(DataNodeDispatchState *sds, DispatchState new_state)
395 {
396 	Assert(sds->state != new_state);
397 
398 	elog(DEBUG2,
399 		 "DataNodeDispatchState: %s -> %s",
400 		 state_names[sds->state],
401 		 state_names[new_state]);
402 
403 #ifdef USE_ASSERT_CHECKING
404 
405 	switch (new_state)
406 	{
407 		case SD_READ:
408 			Assert(sds->state == SD_RETURNING);
409 			break;
410 		case SD_FLUSH:
411 		case SD_LAST_FLUSH:
412 			Assert(sds->state == SD_READ);
413 			break;
414 		case SD_RETURNING:
415 			Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
416 			break;
417 		case SD_DONE:
418 			Assert(sds->state == SD_RETURNING);
419 	}
420 
421 #endif
422 
423 	sds->prevstate = sds->state;
424 	sds->state = new_state;
425 }
426 
427 static PreparedStmt *
prepare_data_node_insert_stmt(DataNodeDispatchState * sds,TSConnection * conn,int total_params)428 prepare_data_node_insert_stmt(DataNodeDispatchState *sds, TSConnection *conn, int total_params)
429 {
430 	AsyncRequest *req;
431 	PreparedStmt *stmt;
432 	MemoryContext oldcontext = MemoryContextSwitchTo(sds->mcxt);
433 
434 	req = async_request_send_prepare(conn, sds->sql_stmt, total_params);
435 	Assert(req != NULL);
436 	stmt = async_request_wait_prepared_statement(req);
437 	MemoryContextSwitchTo(oldcontext);
438 
439 	return stmt;
440 }
441 
442 /*
443  * Send a batch of tuples to a data node.
444  *
445  * All stored tuples for the given data node are sent on the node's
446  * connection. If in FLUSH state (i.e., sending a predefined amount of
447  * tuples), use a prepared statement, or construct a custom statement if in
448  * LAST_FLUSH state.
449  *
450  * If there's a RETURNING clause, we reset the read pointer for the store,
451  * since the original tuples need to be returned along with the
452  * node-returned ones. If there is no RETURNING clause, simply clear the
453  * store.
454  */
455 static AsyncRequest *
send_batch_to_data_node(DataNodeDispatchState * sds,DataNodeState * ss)456 send_batch_to_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
457 {
458 	TupleTableSlot *slot = sds->batch_slot;
459 	AsyncRequest *req;
460 	const char *sql_stmt;
461 	int response_type = FORMAT_TEXT;
462 
463 	Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
464 	Assert(NUM_STORED_TUPLES(ss) <= sds->flush_threshold);
465 	Assert(NUM_STORED_TUPLES(ss) > 0);
466 
467 	ss->num_tuples_sent = 0;
468 
469 	while (
470 		tuplestore_gettupleslot(ss->primary_tupstore, true /* forward */, false /* copy */, slot))
471 	{
472 		/* get following parameters from slot */
473 		stmt_params_convert_values(sds->stmt_params, slot, NULL);
474 		ss->num_tuples_sent++;
475 	}
476 
477 	if (NULL != ss->replica_tupstore)
478 	{
479 		while (tuplestore_gettupleslot(ss->replica_tupstore,
480 									   true /* forward */,
481 									   false /* copy */,
482 									   slot))
483 		{
484 			/* get following parameters from slot */
485 			stmt_params_convert_values(sds->stmt_params, slot, NULL);
486 			ss->num_tuples_sent++;
487 		}
488 	}
489 
490 	Assert(ss->num_tuples_sent == NUM_STORED_TUPLES(ss));
491 	Assert(ss->num_tuples_sent == stmt_params_converted_tuples(sds->stmt_params));
492 
493 	if (HAS_RETURNING(sds) && tuplefactory_is_binary(sds->tupfactory))
494 		response_type = FORMAT_BINARY;
495 	else if (ts_guc_enable_connection_binary_data)
496 		response_type = FORMAT_BINARY;
497 
498 	/* Send tuples */
499 	switch (sds->state)
500 	{
501 		case SD_FLUSH:
502 			/* Lazy initialize the prepared statement */
503 			if (NULL == ss->pstmt)
504 				ss->pstmt =
505 					prepare_data_node_insert_stmt(sds,
506 												  ss->conn,
507 												  stmt_params_total_values(sds->stmt_params));
508 
509 			Assert(ss->num_tuples_sent == sds->flush_threshold);
510 			req = async_request_send_prepared_stmt_with_params(ss->pstmt,
511 															   sds->stmt_params,
512 															   response_type);
513 			break;
514 		case SD_LAST_FLUSH:
515 			sql_stmt = deparsed_insert_stmt_get_sql(&sds->stmt,
516 													stmt_params_converted_tuples(sds->stmt_params));
517 			Assert(sql_stmt != NULL);
518 			Assert(ss->num_tuples_sent < sds->flush_threshold);
519 			req =
520 				async_request_send_with_params(ss->conn, sql_stmt, sds->stmt_params, response_type);
521 			break;
522 		default:
523 			elog(ERROR, "unexpected data node dispatch state %s", state_names[sds->state]);
524 			Assert(false);
525 			break;
526 	}
527 
528 	Assert(NULL != req);
529 	async_request_attach_user_data(req, ss);
530 
531 	sds->num_tuples += tuplestore_tuple_count(ss->primary_tupstore);
532 
533 	/* If there's a RETURNING clause, we need to also return the inserted
534 	   tuples in
535 	   rri->ri_projectReturning->pi_exprContext->ecxt_scantuple */
536 	if (HAS_RETURNING(sds))
537 		tuplestore_rescan(ss->primary_tupstore);
538 	else
539 		data_node_state_clear_primary_store(ss);
540 
541 	/* No tuples are returned from the replica store so safe to clear */
542 	data_node_state_clear_replica_store(ss);
543 
544 	/* Since we're done with current batch, reset params so they are safe to use in the next
545 	 * batch/node */
546 	stmt_params_reset(sds->stmt_params);
547 
548 	return req;
549 }
550 
551 /*
552  * Check if we should flush tuples stored for a data node.
553  *
554  * There are two cases when this happens:
555  *
556  * 1. State is SD_FLUSH and the flush threshold is reached.
557  * 2. State is SD_LAST_FLUSH and there are tuples to send.
558  */
559 static bool
should_flush_data_node(DataNodeDispatchState * sds,DataNodeState * ss)560 should_flush_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
561 {
562 	int64 num_tuples = NUM_STORED_TUPLES(ss);
563 
564 	Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
565 
566 	if (sds->state == SD_FLUSH)
567 	{
568 		if (num_tuples >= sds->flush_threshold)
569 			return true;
570 		return false;
571 	}
572 
573 	return num_tuples > 0;
574 }
575 
576 /*
577  * Flush the tuples of data nodes that have a full batch.
578  */
579 static AsyncRequestSet *
flush_data_nodes(DataNodeDispatchState * sds)580 flush_data_nodes(DataNodeDispatchState *sds)
581 {
582 	AsyncRequestSet *reqset = NULL;
583 	DataNodeState *ss;
584 	HASH_SEQ_STATUS hseq;
585 
586 	Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
587 
588 	hash_seq_init(&hseq, sds->nodestates);
589 
590 	for (ss = hash_seq_search(&hseq); ss != NULL; ss = hash_seq_search(&hseq))
591 	{
592 		if (should_flush_data_node(sds, ss))
593 		{
594 			AsyncRequest *req = send_batch_to_data_node(sds, ss);
595 
596 			if (NULL != req)
597 			{
598 				if (NULL == reqset)
599 					reqset = async_request_set_create();
600 
601 				async_request_set_add(reqset, req);
602 			}
603 		}
604 	}
605 
606 	return reqset;
607 }
608 
609 /*
610  * Wait for responses from data nodes after INSERT.
611  *
612  * In case of RETURNING, return a list of responses, otherwise NIL.
613  */
614 static List *
await_all_responses(DataNodeDispatchState * sds,AsyncRequestSet * reqset)615 await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
616 {
617 	AsyncResponseResult *rsp;
618 	List *results = NIL;
619 
620 	sds->next_tuple = 0;
621 
622 	while ((rsp = async_request_set_wait_any_result(reqset)))
623 	{
624 		DataNodeState *ss = async_response_result_get_user_data(rsp);
625 		PGresult *res = async_response_result_get_pg_result(rsp);
626 		ExecStatusType status = PQresultStatus(res);
627 		bool report_error = true;
628 
629 		switch (status)
630 		{
631 			case PGRES_TUPLES_OK:
632 				if (!HAS_RETURNING(sds))
633 					break;
634 
635 				results = lappend(results, rsp);
636 				ss->num_tuples_inserted = PQntuples(res);
637 				Assert(sds->stmt.do_nothing || (ss->num_tuples_inserted == ss->num_tuples_sent));
638 				report_error = false;
639 				break;
640 			case PGRES_COMMAND_OK:
641 				if (HAS_RETURNING(sds))
642 					break;
643 
644 				ss->num_tuples_inserted = atoi(PQcmdTuples(res));
645 				async_response_result_close(rsp);
646 				Assert(sds->stmt.do_nothing || (ss->num_tuples_inserted == ss->num_tuples_sent));
647 				report_error = false;
648 				break;
649 			default:
650 				break;
651 		}
652 
653 		if (report_error)
654 			async_response_report_error((AsyncResponse *) rsp, ERROR);
655 
656 		/* Unless there is an ON CONFLICT clause, the number of tuples
657 		 * returned should greater than zero and be the same as the number of
658 		 * tuples sent.  */
659 		Assert(sds->stmt.do_nothing || ss->num_tuples_inserted > 0);
660 		ss->next_tuple = 0;
661 	}
662 
663 	return results;
664 }
665 
666 /*
667  * Read tuples from the child scan node.
668  *
669  * Read until there's a NULL tuple or we've filled a data node's batch. Ideally,
670  * we'd continue to read more tuples to fill other data nodes' batches, but since
671  * the next tuple might be for the same node that has the full batch, we
672  * risk overfilling. This could be mitigated by using two tuple stores per
673  * data node (current and next batch) and alternate between them. But that also
674  * increases memory requirements and complicates the code, so that's left as a
675  * future optimization.
676  *
677  * Return the number of tuples read.
678  */
679 static int64
handle_read(DataNodeDispatchState * sds)680 handle_read(DataNodeDispatchState *sds)
681 {
682 	PlanState *substate = linitial(sds->cstate.custom_ps);
683 	ChunkDispatchState *cds = (ChunkDispatchState *) substate;
684 	EState *estate = sds->cstate.ss.ps.state;
685 #if PG14_LT
686 	ResultRelInfo *rri_saved = estate->es_result_relation_info;
687 #endif
688 	int64 num_tuples_read = 0;
689 
690 	Assert(sds->state == SD_READ);
691 
692 	/* If we are reading new tuples, we either do it for the first batch or we
693 	 * finished a previous batch. In either case, reset the batch memory
694 	 * context so that we release the memory after each batch is finished. */
695 	MemoryContextReset(sds->batch_mcxt);
696 
697 	/* Read tuples from the subnode until flush */
698 	while (sds->state == SD_READ)
699 	{
700 		TupleTableSlot *slot = ExecProcNode(substate);
701 
702 		if (TupIsNull(slot))
703 			data_node_dispatch_set_state(sds, SD_LAST_FLUSH);
704 		else
705 		{
706 			/* The previous node should have routed the tuple to the right
707 			 * chunk and set the corresponding result relation. The FdwState
708 			 * should also point to the chunk's insert state. */
709 			ResultRelInfo *rri = cds->rri;
710 			ChunkInsertState *cis = rri->ri_FdwState;
711 			TriggerDesc *trigdesc = rri->ri_TrigDesc;
712 			ListCell *lc;
713 			bool primary_data_node = true;
714 			MemoryContext oldcontext;
715 			TupleDesc rri_desc = RelationGetDescr(rri->ri_RelationDesc);
716 
717 			if (NULL != rri->ri_projectReturning && rri_desc->constr &&
718 				rri_desc->constr->has_generated_stored)
719 				ExecComputeStoredGeneratedCompat(rri, estate, slot, CMD_INSERT);
720 
721 			Assert(NULL != cis);
722 
723 			oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
724 
725 			/* While we could potentially support triggers on frontend nodes,
726 			 * the triggers should exists also on the remote node and will be
727 			 * executed there. For new, the safest bet is to avoid triggers on
728 			 * the frontend. */
729 			if (trigdesc && (trigdesc->trig_insert_after_row || trigdesc->trig_insert_before_row))
730 				elog(ERROR, "cannot insert into remote chunk with row triggers");
731 
732 			/* Total count */
733 			num_tuples_read++;
734 
735 			foreach (lc, cis->chunk_data_nodes)
736 			{
737 				ChunkDataNode *cdn = lfirst(lc);
738 				TSConnectionId id = remote_connection_id(cdn->foreign_server_oid, cis->user_id);
739 				DataNodeState *ss = data_node_state_get_or_create(sds, id);
740 
741 				/* This will store one copy of the tuple per data node, which is
742 				 * a bit inefficient. Note that we put the tuple in the
743 				 * primary store for the first data node, but the replica store
744 				 * for all other data nodes. This is to be able to know which
745 				 * tuples to return in a RETURNING statement. */
746 				if (primary_data_node)
747 					tuplestore_puttupleslot(ss->primary_tupstore, slot);
748 				else
749 					tuplestore_puttupleslot(ss->replica_tupstore, slot);
750 
751 				/* Once one data node has reached the batch size, we stop
752 				 * reading. */
753 				if (sds->state != SD_FLUSH && NUM_STORED_TUPLES(ss) >= sds->flush_threshold)
754 					data_node_dispatch_set_state(sds, SD_FLUSH);
755 
756 				primary_data_node = false;
757 			}
758 
759 			MemoryContextSwitchTo(oldcontext);
760 		}
761 	}
762 
763 #if PG14_LT
764 	estate->es_result_relation_info = rri_saved;
765 #endif
766 
767 	return num_tuples_read;
768 }
769 
770 /*
771  * Flush all data nodes and move to the RETURNING state.
772  *
773  * Note that future optimizations could do this more asynchronously by doing
774  * other work until responses are available (e.g., one could start to fill the
775  * next batch while waiting for a response). However, the async API currently
776  * doesn't expose a way to check for a response without blocking and
777  * interleaving different tasks would also complicate the state machine.
778  */
779 static void
handle_flush(DataNodeDispatchState * sds)780 handle_flush(DataNodeDispatchState *sds)
781 {
782 	MemoryContext oldcontext;
783 	AsyncRequestSet *reqset;
784 
785 	Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);
786 
787 	/* Save the requests and responses in the batch memory context since they
788 	 * need to survive across several iterations of the executor loop when
789 	 * there is a RETURNING clause. The batch memory context is cleared the
790 	 * next time we read a batch. */
791 	oldcontext = MemoryContextSwitchTo(sds->batch_mcxt);
792 	reqset = flush_data_nodes(sds);
793 
794 	if (NULL != reqset)
795 	{
796 		sds->responses = await_all_responses(sds, reqset);
797 		pfree(reqset);
798 	}
799 
800 	data_node_dispatch_set_state(sds, SD_RETURNING);
801 	MemoryContextSwitchTo(oldcontext);
802 }
803 
804 /*
805  * Get a tuple when there's a RETURNING clause.
806  */
807 static TupleTableSlot *
get_returning_tuple(DataNodeDispatchState * sds)808 get_returning_tuple(DataNodeDispatchState *sds)
809 {
810 	ChunkDispatchState *cds = (ChunkDispatchState *) linitial(sds->cstate.custom_ps);
811 	ResultRelInfo *rri = cds->rri;
812 	TupleTableSlot *res_slot = sds->batch_slot;
813 	TupleTableSlot *slot = sds->cstate.ss.ss_ScanTupleSlot;
814 	ExprContext *econtext;
815 
816 	Assert(NULL != rri->ri_projectReturning);
817 
818 	econtext = rri->ri_projectReturning->pi_exprContext;
819 
820 	/*
821 	 * Store a RETURNING tuple.  If HAS_RETURNING() is false, just emit a
822 	 * dummy tuple.  (has_returning is false when the local query is of the
823 	 * form "UPDATE/DELETE .. RETURNING 1" for example.)
824 	 */
825 	if (!HAS_RETURNING(sds))
826 	{
827 		Assert(NIL == sds->responses);
828 		ExecStoreAllNullTuple(slot);
829 		res_slot = slot;
830 	}
831 	else
832 	{
833 		while (NIL != sds->responses)
834 		{
835 			AsyncResponseResult *rsp = linitial(sds->responses);
836 			DataNodeState *ss = async_response_result_get_user_data(rsp);
837 			PGresult *res = async_response_result_get_pg_result(rsp);
838 			int64 num_tuples_to_return = tuplestore_tuple_count(ss->primary_tupstore);
839 			bool last_tuple;
840 			bool got_tuple = false;
841 
842 			if (num_tuples_to_return > 0)
843 			{
844 				MemoryContext oldcontext;
845 
846 				last_tuple = (ss->next_tuple + 1) == num_tuples_to_return;
847 
848 				Assert(ss->next_tuple < ss->num_tuples_inserted);
849 				Assert(ss->next_tuple < num_tuples_to_return);
850 
851 				oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
852 				store_returning_result(sds, ss->next_tuple, slot, res);
853 
854 				/* Get the next tuple from the store. If it is the last tuple, we
855 				 * need to make a copy since we will clear the store before
856 				 * returning. */
857 				got_tuple = tuplestore_gettupleslot(ss->primary_tupstore,
858 													true /* forward */,
859 													last_tuple /* copy */,
860 													res_slot);
861 
862 				MemoryContextSwitchTo(oldcontext);
863 				Assert(got_tuple);
864 				Assert(!TupIsNull(res_slot));
865 				ss->next_tuple++;
866 			}
867 			else
868 				last_tuple = true;
869 
870 			if (last_tuple)
871 			{
872 				sds->responses = list_delete_first(sds->responses);
873 				async_response_result_close(rsp);
874 				data_node_state_clear_primary_store(ss);
875 			}
876 
877 			if (got_tuple)
878 				break;
879 		}
880 	}
881 
882 	econtext->ecxt_scantuple = slot;
883 
884 	return res_slot;
885 }
886 
887 /*
888  * Get the next tuple slot to return when there's a RETURNING
889  * clause. Otherwise, return an empty slot.
890  */
891 static TupleTableSlot *
handle_returning(DataNodeDispatchState * sds)892 handle_returning(DataNodeDispatchState *sds)
893 {
894 	EState *estate = sds->cstate.ss.ps.state;
895 	ChunkDispatchState *cds = (ChunkDispatchState *) linitial(sds->cstate.custom_ps);
896 	ResultRelInfo *rri = cds->rri;
897 	TupleTableSlot *slot = sds->cstate.ss.ss_ScanTupleSlot;
898 	bool done = false;
899 	MemoryContext oldcontext;
900 
901 	Assert(sds->state == SD_RETURNING);
902 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
903 
904 	/* No returning projection, which means we are done */
905 	if (NULL == rri->ri_projectReturning)
906 	{
907 		Assert(!HAS_RETURNING(sds));
908 		Assert(NIL == sds->responses);
909 		done = true;
910 
911 		if (sds->set_processed)
912 			estate->es_processed += sds->num_tuples;
913 	}
914 
915 	/* If we've processed all tuples, then we're also done */
916 	if (sds->next_tuple >= sds->num_tuples)
917 		done = true;
918 
919 	if (done)
920 	{
921 		sds->next_tuple = 0;
922 		sds->num_tuples = 0;
923 
924 		slot = ExecClearTuple(slot);
925 
926 		if (sds->prevstate == SD_LAST_FLUSH)
927 			data_node_dispatch_set_state(sds, SD_DONE);
928 		else
929 			data_node_dispatch_set_state(sds, SD_READ);
930 	}
931 	else
932 	{
933 		slot = get_returning_tuple(sds);
934 		Assert(!TupIsNull(slot));
935 		sds->next_tuple++;
936 
937 		if (sds->set_processed)
938 			estate->es_processed++;
939 	}
940 
941 	MemoryContextSwitchTo(oldcontext);
942 
943 	return slot;
944 }
945 
946 /*
947  * Execute the remote INSERT.
948  *
949  * This is called every time the parent asks for a new tuple. Read the child
950  * scan node and buffer until there's a full batch, then flush by sending to
951  * data node(s). If there's a returning statement, we return the flushed tuples
952  * one-by-one, or continue reading more tuples from the child until there's a
953  * NULL tuple.
954  */
955 static TupleTableSlot *
data_node_dispatch_exec(CustomScanState * node)956 data_node_dispatch_exec(CustomScanState *node)
957 {
958 	DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
959 	TupleTableSlot *slot = NULL;
960 	bool done = false;
961 
962 #if PG14_LT
963 	/* Initially, the result relation should always match the hypertable.  */
964 	Assert(node->ss.ps.state->es_result_relation_info->ri_RelationDesc->rd_id == sds->rel->rd_id);
965 #endif
966 
967 	/* Read tuples and flush until there's either something to return or no
968 	 * more tuples to read */
969 	while (!done)
970 	{
971 		switch (sds->state)
972 		{
973 			case SD_READ:
974 				handle_read(sds);
975 				break;
976 			case SD_FLUSH:
977 			case SD_LAST_FLUSH:
978 				handle_flush(sds);
979 				break;
980 			case SD_RETURNING:
981 				slot = handle_returning(sds);
982 				/* If a tuple was read, return it and wait to get called again */
983 				done = !TupIsNull(slot);
984 				break;
985 			case SD_DONE:
986 				done = true;
987 				Assert(TupIsNull(slot));
988 				break;
989 		}
990 	}
991 
992 #if PG14_LT
993 	/* Tuple routing in the ChunkDispatchState subnode sets the result
994 	 * relation to a chunk when routing, but the read handler should have
995 	 * ensured the result relation is reset. */
996 	Assert(node->ss.ps.state->es_result_relation_info->ri_RelationDesc->rd_id == sds->rel->rd_id);
997 	Assert(node->ss.ps.state->es_result_relation_info->ri_usesFdwDirectModify);
998 #endif
999 
1000 	return slot;
1001 }
1002 
1003 static void
data_node_dispatch_rescan(CustomScanState * node)1004 data_node_dispatch_rescan(CustomScanState *node)
1005 {
1006 	/* Cannot rescan and start from the beginning since we might already have
1007 	 * sent data to remote nodes */
1008 	elog(ERROR, "cannot restart inserts to remote nodes");
1009 }
1010 
1011 static void
data_node_dispatch_end(CustomScanState * node)1012 data_node_dispatch_end(CustomScanState *node)
1013 {
1014 	DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
1015 	DataNodeState *ss;
1016 	HASH_SEQ_STATUS hseq;
1017 
1018 	hash_seq_init(&hseq, sds->nodestates);
1019 
1020 	for (ss = hash_seq_search(&hseq); ss != NULL; ss = hash_seq_search(&hseq))
1021 		data_node_state_close(ss);
1022 
1023 	hash_destroy(sds->nodestates);
1024 	ExecDropSingleTupleTableSlot(sds->batch_slot);
1025 	ExecEndNode(linitial(node->custom_ps));
1026 }
1027 
1028 static void
data_node_dispatch_explain(CustomScanState * node,List * ancestors,ExplainState * es)1029 data_node_dispatch_explain(CustomScanState *node, List *ancestors, ExplainState *es)
1030 {
1031 	DataNodeDispatchState *sds = (DataNodeDispatchState *) node;
1032 
1033 	ExplainPropertyInteger("Batch size", NULL, sds->flush_threshold, es);
1034 
1035 	/*
1036 	 * Add remote query, when VERBOSE option is specified.
1037 	 */
1038 	if (es->verbose)
1039 	{
1040 		const char *explain_sql =
1041 			deparsed_insert_stmt_get_sql_explain(&sds->stmt, sds->flush_threshold);
1042 
1043 		ExplainPropertyText("Remote SQL", explain_sql, es);
1044 	}
1045 }
1046 
1047 static CustomExecMethods data_node_dispatch_state_methods = {
1048 	.CustomName = "DataNodeDispatchState",
1049 	.BeginCustomScan = data_node_dispatch_begin,
1050 	.EndCustomScan = data_node_dispatch_end,
1051 	.ExecCustomScan = data_node_dispatch_exec,
1052 	.ReScanCustomScan = data_node_dispatch_rescan,
1053 	.ExplainCustomScan = data_node_dispatch_explain,
1054 };
1055 
1056 /* Only allocate the custom scan state. Initialize in the begin handler. */
1057 static Node *
data_node_dispatch_state_create(CustomScan * cscan)1058 data_node_dispatch_state_create(CustomScan *cscan)
1059 {
1060 	DataNodeDispatchState *sds;
1061 
1062 	sds = (DataNodeDispatchState *) newNode(sizeof(DataNodeDispatchState), T_CustomScanState);
1063 	sds->cstate.methods = &data_node_dispatch_state_methods;
1064 
1065 	return (Node *) sds;
1066 }
1067 
1068 static CustomScanMethods data_node_dispatch_plan_methods = {
1069 	.CustomName = "DataNodeDispatch",
1070 	.CreateCustomScanState = data_node_dispatch_state_create,
1071 };
1072 
1073 static List *
get_insert_attrs(Relation rel)1074 get_insert_attrs(Relation rel)
1075 {
1076 	TupleDesc tupdesc = RelationGetDescr(rel);
1077 	List *attrs = NIL;
1078 	int i;
1079 
1080 	for (i = 0; i < tupdesc->natts; i++)
1081 	{
1082 		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
1083 
1084 		if (attr->attisdropped || attr->attgenerated != '\0')
1085 			continue;
1086 
1087 		attrs = lappend_int(attrs, AttrOffsetGetAttrNumber(i));
1088 	}
1089 
1090 	return attrs;
1091 }
1092 
1093 /*
1094  * Plan a remote INSERT on a hypertable.
1095  *
1096  * Create the metadata needed for a remote INSERT. This mostly involves
1097  * deparsing the INSERT statement.
1098  *
1099  * Return the metadata as a list of Nodes that can be saved in a prepared
1100  * statement.
1101  */
1102 static List *
plan_remote_insert(PlannerInfo * root,DataNodeDispatchPath * sdpath)1103 plan_remote_insert(PlannerInfo *root, DataNodeDispatchPath *sdpath)
1104 {
1105 	ModifyTablePath *mtpath = sdpath->mtpath;
1106 	OnConflictAction onconflict =
1107 		mtpath->onconflict == NULL ? ONCONFLICT_NONE : mtpath->onconflict->action;
1108 	List *returning_lists = sdpath->mtpath->returningLists;
1109 	RangeTblEntry *rte = planner_rt_fetch(sdpath->hypertable_rti, root);
1110 	Relation rel;
1111 	DeparsedInsertStmt stmt;
1112 	const char *sql;
1113 	List *target_attrs = NIL;
1114 	List *returning_list = NIL;
1115 	bool do_nothing = false;
1116 	int flush_threshold;
1117 
1118 	/*
1119 	 * Core code already has some lock on each rel being planned, so we can
1120 	 * use NoLock here.
1121 	 */
1122 	rel = table_open(rte->relid, NoLock);
1123 
1124 	/*
1125 	 * Extract the relevant RETURNING list if any.
1126 	 */
1127 	if (NIL != returning_lists)
1128 		returning_list = (List *) list_nth(returning_lists, sdpath->subplan_index);
1129 
1130 	/*
1131 	 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1132 	 * should have already been rejected in the optimizer, as presently there
1133 	 * is no way to recognize an arbiter index on a foreign table.  Only DO
1134 	 * NOTHING is supported without an inference specification.
1135 	 */
1136 	if (onconflict == ONCONFLICT_NOTHING)
1137 		do_nothing = true;
1138 	else if (onconflict != ONCONFLICT_NONE)
1139 		ereport(ERROR,
1140 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1141 				 errmsg("ON CONFLICT DO UPDATE not supported"
1142 						" on distributed hypertables")));
1143 
1144 	/*
1145 	 * Construct the SQL command string matching the fixed batch size. We also
1146 	 * save the partially deparsed SQL command so that we can easily create
1147 	 * one with less value parameters later for flushing a partially filled
1148 	 * batch.
1149 	 */
1150 	target_attrs = get_insert_attrs(rel);
1151 
1152 	deparse_insert_stmt(&stmt,
1153 						rte,
1154 						sdpath->hypertable_rti,
1155 						rel,
1156 						target_attrs,
1157 						do_nothing,
1158 						returning_list);
1159 
1160 	/* Set suitable flush threshold value that takes into account the max number
1161 	 * of prepared statement arguments */
1162 	flush_threshold =
1163 		stmt_params_validate_num_tuples(list_length(target_attrs), TUPSTORE_FLUSH_THRESHOLD);
1164 
1165 	sql = deparsed_insert_stmt_get_sql(&stmt, flush_threshold);
1166 
1167 	table_close(rel, NoLock);
1168 
1169 	return list_make5(makeString((char *) sql),
1170 					  target_attrs,
1171 					  deparsed_insert_stmt_to_list(&stmt),
1172 					  makeInteger(sdpath->mtpath->canSetTag),
1173 					  makeInteger(flush_threshold));
1174 }
1175 
1176 static Plan *
data_node_dispatch_plan_create(PlannerInfo * root,RelOptInfo * rel,struct CustomPath * best_path,List * tlist,List * clauses,List * custom_plans)1177 data_node_dispatch_plan_create(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path,
1178 							   List *tlist, List *clauses, List *custom_plans)
1179 {
1180 	DataNodeDispatchPath *sdpath = (DataNodeDispatchPath *) best_path;
1181 	CustomScan *cscan = makeNode(CustomScan);
1182 	Plan *subplan;
1183 
1184 	Assert(list_length(custom_plans) == 1);
1185 
1186 	subplan = linitial(custom_plans);
1187 	cscan->methods = &data_node_dispatch_plan_methods;
1188 	cscan->custom_plans = custom_plans;
1189 	cscan->scan.scanrelid = 0;
1190 	cscan->scan.plan.targetlist = tlist;
1191 	cscan->custom_scan_tlist = subplan->targetlist;
1192 	cscan->custom_private = plan_remote_insert(root, sdpath);
1193 
1194 	return &cscan->scan.plan;
1195 }
1196 
1197 static CustomPathMethods data_node_dispatch_path_methods = {
1198 	.CustomName = "DataNodeDispatchPath",
1199 	.PlanCustomPath = data_node_dispatch_plan_create,
1200 };
1201 
1202 Path *
data_node_dispatch_path_create(PlannerInfo * root,ModifyTablePath * mtpath,Index hypertable_rti,int subplan_index)1203 data_node_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath, Index hypertable_rti,
1204 							   int subplan_index)
1205 {
1206 	DataNodeDispatchPath *sdpath = palloc0(sizeof(DataNodeDispatchPath));
1207 	Path *subpath = ts_chunk_dispatch_path_create(root, mtpath, hypertable_rti, subplan_index);
1208 
1209 	/* Copy costs, etc. from the subpath */
1210 	memcpy(&sdpath->cpath.path, subpath, sizeof(Path));
1211 
1212 	sdpath->cpath.path.type = T_CustomPath;
1213 	sdpath->cpath.path.pathtype = T_CustomScan;
1214 	sdpath->cpath.custom_paths = list_make1(subpath);
1215 	sdpath->cpath.methods = &data_node_dispatch_path_methods;
1216 	sdpath->mtpath = mtpath;
1217 	sdpath->hypertable_rti = hypertable_rti;
1218 	sdpath->subplan_index = subplan_index;
1219 
1220 	return &sdpath->cpath.path;
1221 }
1222