1 /*-------------------------------------------------------------------------
2  *
3  * distributed_intermediate_results.c
4  *   Functions for reading and writing distributed intermediate results.
5  *
6  * Copyright (c), Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "distributed/pg_version_constants.h"
12 
13 #include <sys/stat.h>
14 #include <unistd.h>
15 
16 #include "postgres.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "port.h"
20 
21 #include "access/htup_details.h"
22 #include "access/tupdesc.h"
23 #include "catalog/pg_type.h"
24 #include "distributed/deparse_shard_query.h"
25 #include "distributed/intermediate_results.h"
26 #include "distributed/listutils.h"
27 #include "distributed/metadata_utility.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/multi_executor.h"
30 #include "distributed/multi_physical_planner.h"
31 #include "distributed/transaction_management.h"
32 #include "distributed/tuple_destination.h"
33 #include "distributed/tuplestore.h"
34 #include "distributed/worker_protocol.h"
35 #include "tcop/pquery.h"
36 #include "tcop/tcopprot.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39 
40 
41 /*
42  * PartitioningTupleDest is internal representation of a TupleDestination
43  * which consumes queries constructed in WrapTasksForPartitioning.
44  */
45 typedef struct PartitioningTupleDest
46 {
47 	TupleDestination pub;
48 
49 	CitusTableCacheEntry *targetRelation;
50 
51 	/* MemoryContext in which we add new fragments */
52 	MemoryContext fragmentContext;
53 
54 	/* list of DistributedResultFragment pointer */
55 	List *fragmentList;
56 
57 	/* what do tuples look like */
58 	TupleDesc tupleDesc;
59 } PartitioningTupleDest;
60 
61 
62 /*
63  * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
64  * It is a separate struct to use it as a key in a hash table.
65  */
66 typedef struct NodePair
67 {
68 	uint32 sourceNodeId;
69 	uint32 targetNodeId;
70 } NodePair;
71 
72 
73 /*
74  * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from
75  * the source node to the destination node in the NodePair.
76  */
77 typedef struct NodeToNodeFragmentsTransfer
78 {
79 	NodePair nodes;
80 	List *fragmentList;
81 } NodeToNodeFragmentsTransfer;
82 
83 
84 /* forward declarations of local functions */
85 static List * WrapTasksForPartitioning(const char *resultIdPrefix,
86 									   List *selectTaskList,
87 									   int partitionColumnIndex,
88 									   CitusTableCacheEntry *targetRelation,
89 									   bool binaryFormat);
90 static List * ExecutePartitionTaskList(List *partitionTaskList,
91 									   CitusTableCacheEntry *targetRelation);
92 static PartitioningTupleDest * CreatePartitioningTupleDest(
93 	CitusTableCacheEntry *targetRelation);
94 static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
95 										  int placementIndex, int queryNumber,
96 										  HeapTuple heapTuple, uint64 tupleLibpqSize);
97 static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int
98 														queryNumber);
99 static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
100 										 datumCount, Oid typeId);
101 static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
102 								   Oid intervalTypeId, ArrayType **minValueArray,
103 								   ArrayType **maxValueArray);
104 static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
105 static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple,
106 																	TupleDesc tupleDesc,
107 																	CitusTableCacheEntry *
108 																	targetRelation,
109 																	uint32 sourceNodeId);
110 static void ExecuteSelectTasksIntoTupleDest(List *taskList,
111 											TupleDestination *tupleDestination,
112 											bool errorOnAnyFailure);
113 static List ** ColocateFragmentsWithRelation(List *fragmentList,
114 											 CitusTableCacheEntry *targetRelation);
115 static List * ColocationTransfers(List *fragmentList,
116 								  CitusTableCacheEntry *targetRelation);
117 static List * FragmentTransferTaskList(List *fragmentListTransfers);
118 static char * QueryStringForFragmentsTransfer(
119 	NodeToNodeFragmentsTransfer *fragmentsTransfer);
120 static void ExecuteFetchTaskList(List *fetchTaskList);
121 
122 
123 /*
124  * RedistributeTaskListResults partitions the results of given task list using
125  * shard ranges and partition method of given targetRelation, and then colocates
126  * the result files with shards.
127  *
128  * If a shard has a replication factor > 1, corresponding result files are copied
129  * to all nodes containing that shard.
130  *
131  * returnValue[shardIndex] is list of cstrings each of which is a resultId which
132  * correspond to targetRelation->sortedShardIntervalArray[shardIndex].
133  *
134  * partitionColumnIndex determines the column in the selectTaskList to use for
135  * partitioning.
136  */
137 List **
RedistributeTaskListResults(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)138 RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList,
139 							int partitionColumnIndex,
140 							CitusTableCacheEntry *targetRelation,
141 							bool binaryFormat)
142 {
143 	/*
144 	 * Make sure that this transaction has a distributed transaction ID.
145 	 *
146 	 * Intermediate results will be stored in a directory that is derived
147 	 * from the distributed transaction ID.
148 	 */
149 	UseCoordinatedTransaction();
150 
151 	List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList,
152 												  partitionColumnIndex,
153 												  targetRelation, binaryFormat);
154 	return ColocateFragmentsWithRelation(fragmentList, targetRelation);
155 }
156 
157 
158 /*
159  * PartitionTasklistResults executes the given task list, and partitions results
160  * of each task based on targetRelation's distribution method and intervals.
161  * Each of the result partitions are stored in the node where task was executed,
162  * and are named as $resultIdPrefix_from_$sourceShardId_to_$targetShardIndex.
163  *
164  * Result is list of DistributedResultFragment, each of which represents a
165  * partition of results. Empty results are omitted. Therefore, if we have N tasks
166  * and target relation has M shards, we will have NxM-(number of empty results)
167  * fragments.
168  *
169  * partitionColumnIndex determines the column in the selectTaskList to use for
170  * partitioning.
171  */
172 List *
PartitionTasklistResults(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)173 PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
174 						 int partitionColumnIndex,
175 						 CitusTableCacheEntry *targetRelation,
176 						 bool binaryFormat)
177 {
178 	if (!IsCitusTableTypeCacheEntry(targetRelation, HASH_DISTRIBUTED) &&
179 		!IsCitusTableTypeCacheEntry(targetRelation, RANGE_DISTRIBUTED))
180 	{
181 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 						errmsg("repartitioning results of a tasklist is only supported "
183 							   "when target relation is hash or range partitioned.")));
184 	}
185 
186 	/*
187 	 * Make sure that this transaction has a distributed transaction ID.
188 	 *
189 	 * Intermediate results will be stored in a directory that is derived
190 	 * from the distributed transaction ID.
191 	 */
192 	UseCoordinatedTransaction();
193 
194 	selectTaskList = WrapTasksForPartitioning(resultIdPrefix, selectTaskList,
195 											  partitionColumnIndex, targetRelation,
196 											  binaryFormat);
197 	return ExecutePartitionTaskList(selectTaskList, targetRelation);
198 }
199 
200 
201 /*
202  * WrapTasksForPartitioning wraps the query for each of the tasks by a call
203  * to worker_partition_query_result(). Target list of the wrapped query should
204  * match the tuple descriptor in ExecutePartitionTaskList().
205  */
206 static List *
WrapTasksForPartitioning(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)207 WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
208 						 int partitionColumnIndex,
209 						 CitusTableCacheEntry *targetRelation,
210 						 bool binaryFormat)
211 {
212 	List *wrappedTaskList = NIL;
213 	ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
214 	int shardCount = targetRelation->shardIntervalArrayLength;
215 
216 	ArrayType *minValueArray = NULL;
217 	ArrayType *maxValueArray = NULL;
218 	Var *partitionColumn = targetRelation->partitionColumn;
219 	Oid intervalTypeId = InvalidOid;
220 	int32 intervalTypeMod = 0;
221 	Oid intervalTypeOutFunc = InvalidOid;
222 	bool intervalTypeVarlena = false;
223 
224 	GetIntervalTypeInfo(targetRelation->partitionMethod, partitionColumn,
225 						&intervalTypeId, &intervalTypeMod);
226 	getTypeOutputInfo(intervalTypeId, &intervalTypeOutFunc, &intervalTypeVarlena);
227 
228 	ShardMinMaxValueArrays(shardIntervalArray, shardCount, intervalTypeOutFunc,
229 						   &minValueArray, &maxValueArray);
230 	StringInfo minValuesString = ArrayObjectToString(minValueArray, TEXTOID,
231 													 intervalTypeMod);
232 	StringInfo maxValuesString = ArrayObjectToString(maxValueArray, TEXTOID,
233 													 intervalTypeMod);
234 
235 	Task *selectTask = NULL;
236 	foreach_ptr(selectTask, selectTaskList)
237 	{
238 		char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
239 		char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
240 									  "hash" : "range";
241 		const char *binaryFormatString = binaryFormat ? "true" : "false";
242 
243 		Task *wrappedSelectTask = copyObject(selectTask);
244 
245 		StringInfo wrappedQuery = makeStringInfo();
246 		appendStringInfo(wrappedQuery,
247 						 "SELECT partition_index"
248 						 ", %s || '_' || partition_index::text "
249 						 ", rows_written "
250 						 "FROM worker_partition_query_result"
251 						 "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
252 						 quote_literal_cstr(taskPrefix),
253 						 quote_literal_cstr(taskPrefix),
254 						 quote_literal_cstr(TaskQueryString(selectTask)),
255 						 partitionColumnIndex,
256 						 quote_literal_cstr(partitionMethodString),
257 						 minValuesString->data, maxValuesString->data,
258 						 binaryFormatString);
259 
260 		SetTaskQueryString(wrappedSelectTask, wrappedQuery->data);
261 		wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask);
262 	}
263 
264 	return wrappedTaskList;
265 }
266 
267 
268 /*
269  * CreatePartitioningTupleDest creates a TupleDestination which consumes results of
270  * tasks constructed in WrapTasksForPartitioning.
271  */
272 static PartitioningTupleDest *
CreatePartitioningTupleDest(CitusTableCacheEntry * targetRelation)273 CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation)
274 {
275 	int resultColumnCount = 3;
276 
277 	TupleDesc tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount);
278 
279 	TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "partition_index",
280 					   INT4OID, -1, 0);
281 	TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "result_id",
282 					   TEXTOID, -1, 0);
283 	TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "rows_written",
284 					   INT8OID, -1, 0);
285 
286 
287 	PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest));
288 	tupleDest->targetRelation = targetRelation;
289 	tupleDest->tupleDesc = tupleDescriptor;
290 	tupleDest->fragmentContext = CurrentMemoryContext;
291 	tupleDest->pub.putTuple = PartitioningTupleDestPutTuple;
292 	tupleDest->pub.tupleDescForQuery =
293 		PartitioningTupleDestTupleDescForQuery;
294 
295 	return tupleDest;
296 }
297 
298 
299 /*
300  * PartitioningTupleDestTupleDescForQuery implements TupleDestination->putTuple for
301  * PartitioningTupleDest.
302  */
303 static void
PartitioningTupleDestPutTuple(TupleDestination * self,Task * task,int placementIndex,int queryNumber,HeapTuple heapTuple,uint64 tupleLibpqSize)304 PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
305 							  int placementIndex, int queryNumber,
306 							  HeapTuple heapTuple, uint64 tupleLibpqSize)
307 {
308 	PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
309 
310 	ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex);
311 
312 	/*
313 	 * We may be deep inside a nested execution, make sure we can use the
314 	 * fragment list at the top.
315 	 */
316 	MemoryContext oldContext = MemoryContextSwitchTo(tupleDest->fragmentContext);
317 
318 	DistributedResultFragment *fragment =
319 		TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc,
320 										 tupleDest->targetRelation,
321 										 placement->nodeId);
322 
323 	tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment);
324 
325 	MemoryContextSwitchTo(oldContext);
326 }
327 
328 
329 /*
330  * PartitioningTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
331  * for PartitioningTupleDest.
332  */
333 static TupleDesc
PartitioningTupleDestTupleDescForQuery(TupleDestination * self,int queryNumber)334 PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
335 {
336 	Assert(queryNumber == 0);
337 
338 	PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
339 
340 	return tupleDest->tupleDesc;
341 }
342 
343 
344 /*
345  * SourceShardPrefix returns result id prefix for partitions which have the
346  * given anchor shard id.
347  */
348 static char *
SourceShardPrefix(const char * resultPrefix,uint64 shardId)349 SourceShardPrefix(const char *resultPrefix, uint64 shardId)
350 {
351 	StringInfo taskPrefix = makeStringInfo();
352 
353 	appendStringInfo(taskPrefix, "%s_from_" UINT64_FORMAT "_to", resultPrefix, shardId);
354 
355 	return taskPrefix->data;
356 }
357 
358 
359 /*
360  * ShardMinMaxValueArrays returns min values and max values of given shard
361  * intervals. Returned arrays are text arrays.
362  */
363 static void
ShardMinMaxValueArrays(ShardInterval ** shardIntervalArray,int shardCount,Oid intervalTypeOutFunc,ArrayType ** minValueArray,ArrayType ** maxValueArray)364 ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
365 					   Oid intervalTypeOutFunc, ArrayType **minValueArray,
366 					   ArrayType **maxValueArray)
367 {
368 	Datum *minValues = palloc0(shardCount * sizeof(Datum));
369 	bool *minValueNulls = palloc0(shardCount * sizeof(bool));
370 	Datum *maxValues = palloc0(shardCount * sizeof(Datum));
371 	bool *maxValueNulls = palloc0(shardCount * sizeof(bool));
372 	for (int shardIndex = 0; shardIndex < shardCount; shardIndex++)
373 	{
374 		minValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->minValueExists;
375 		maxValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->maxValueExists;
376 
377 		if (!minValueNulls[shardIndex])
378 		{
379 			Datum minValue = shardIntervalArray[shardIndex]->minValue;
380 			char *minValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc,
381 																 minValue));
382 			minValues[shardIndex] = CStringGetTextDatum(minValueStr);
383 		}
384 
385 		if (!maxValueNulls[shardIndex])
386 		{
387 			Datum maxValue = shardIntervalArray[shardIndex]->maxValue;
388 			char *maxValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc,
389 																 maxValue));
390 			maxValues[shardIndex] = CStringGetTextDatum(maxValueStr);
391 		}
392 	}
393 
394 	*minValueArray = CreateArrayFromDatums(minValues, minValueNulls, shardCount, TEXTOID);
395 	*maxValueArray = CreateArrayFromDatums(maxValues, maxValueNulls, shardCount, TEXTOID);
396 }
397 
398 
399 /*
400  * CreateArrayFromDatums creates an array consisting of given values and nulls.
401  */
402 static ArrayType *
CreateArrayFromDatums(Datum * datumArray,bool * nullsArray,int datumCount,Oid typeId)403 CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId)
404 {
405 	bool typeByValue = false;
406 	char typeAlignment = 0;
407 	int16 typeLength = 0;
408 	int dimensions[1] = { datumCount };
409 	int lowerbounds[1] = { 1 };
410 
411 	get_typlenbyvalalign(typeId, &typeLength, &typeByValue, &typeAlignment);
412 
413 	ArrayType *datumArrayObject = construct_md_array(datumArray, nullsArray, 1,
414 													 dimensions,
415 													 lowerbounds, typeId, typeLength,
416 													 typeByValue, typeAlignment);
417 
418 	return datumArrayObject;
419 }
420 
421 
422 /*
423  * ExecutePartitionTaskList executes the queries formed in WrapTasksForPartitioning(),
424  * and returns its results as a list of DistributedResultFragment.
425  */
426 static List *
ExecutePartitionTaskList(List * taskList,CitusTableCacheEntry * targetRelation)427 ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
428 {
429 	PartitioningTupleDest *tupleDest = CreatePartitioningTupleDest(targetRelation);
430 
431 	bool errorOnAnyFailure = false;
432 	ExecuteSelectTasksIntoTupleDest(taskList, (TupleDestination *) tupleDest,
433 									errorOnAnyFailure);
434 
435 	return tupleDest->fragmentList;
436 }
437 
438 
439 /*
440  * TupleToDistributedResultFragment converts a tuple returned by the query in
441  * WrapTasksForPartitioning() to a DistributedResultFragment.
442  */
443 static DistributedResultFragment *
TupleToDistributedResultFragment(HeapTuple tuple,TupleDesc tupleDesc,CitusTableCacheEntry * targetRelation,uint32 sourceNodeId)444 TupleToDistributedResultFragment(HeapTuple tuple,
445 								 TupleDesc tupleDesc,
446 								 CitusTableCacheEntry *targetRelation,
447 								 uint32 sourceNodeId)
448 {
449 	bool isNull = false;
450 	uint32 targetShardIndex = DatumGetUInt32(heap_getattr(tuple, 1, tupleDesc, &isNull));
451 	text *resultId = DatumGetTextP(heap_getattr(tuple, 2, tupleDesc, &isNull));
452 	int64 rowCount = DatumGetInt64(heap_getattr(tuple, 3, tupleDesc, &isNull));
453 
454 	Assert(targetShardIndex < targetRelation->shardIntervalArrayLength);
455 	ShardInterval *shardInterval =
456 		targetRelation->sortedShardIntervalArray[targetShardIndex];
457 
458 	DistributedResultFragment *distributedResultFragment =
459 		palloc0(sizeof(DistributedResultFragment));
460 
461 	distributedResultFragment->nodeId = sourceNodeId;
462 	distributedResultFragment->targetShardIndex = targetShardIndex;
463 	distributedResultFragment->targetShardId = shardInterval->shardId;
464 	distributedResultFragment->resultId = text_to_cstring(resultId);
465 	distributedResultFragment->rowCount = rowCount;
466 
467 	return distributedResultFragment;
468 }
469 
470 
471 /*
472  * ExecuteSelectTasksIntoTupleDest executes the given tasks and forwards its result
473  * to the given destination.
474  */
475 static void
ExecuteSelectTasksIntoTupleDest(List * taskList,TupleDestination * tupleDestination,bool errorOnAnyFailure)476 ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination,
477 								bool errorOnAnyFailure)
478 {
479 	bool expectResults = true;
480 	int targetPoolSize = MaxAdaptiveExecutorPoolSize;
481 	TransactionProperties xactProperties = {
482 		.errorOnAnyFailure = errorOnAnyFailure,
483 		.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
484 		.requires2PC = false
485 	};
486 
487 	bool localExecutionSupported = true;
488 	ExecutionParams *executionParams = CreateBasicExecutionParams(
489 		ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
490 		);
491 	executionParams->tupleDestination = tupleDestination;
492 	executionParams->xactProperties = xactProperties;
493 	executionParams->expectResults = expectResults;
494 
495 	ExecuteTaskListExtended(executionParams);
496 }
497 
498 
499 /*
500  * ColocateFragmentsWithRelation moves the fragments in the cluster so they are
501  * colocated with the shards of target relation. These transfers are done by
502  * calls to fetch_intermediate_results() between nodes.
503  *
504  * returnValue[shardIndex] is list of result Ids that are colocated with
505  * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are
506  * done.
507  */
508 static List **
ColocateFragmentsWithRelation(List * fragmentList,CitusTableCacheEntry * targetRelation)509 ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation)
510 {
511 	List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation);
512 	List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers);
513 
514 	ExecuteFetchTaskList(fragmentTransferTaskList);
515 
516 	int shardCount = targetRelation->shardIntervalArrayLength;
517 	List **shardResultIdList = palloc0(shardCount * sizeof(List *));
518 
519 	DistributedResultFragment *sourceFragment = NULL;
520 	foreach_ptr(sourceFragment, fragmentList)
521 	{
522 		int shardIndex = sourceFragment->targetShardIndex;
523 
524 		Assert(shardIndex < shardCount);
525 		shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex],
526 												sourceFragment->resultId);
527 	}
528 
529 	return shardResultIdList;
530 }
531 
532 
533 /*
534  * ColocationTransfers returns a list of transfers to colocate given fragments with
535  * shards of the target relation. These transfers also take into account replicated
536  * target relations. This prunes away transfers with same source and target
537  */
538 static List *
ColocationTransfers(List * fragmentList,CitusTableCacheEntry * targetRelation)539 ColocationTransfers(List *fragmentList, CitusTableCacheEntry *targetRelation)
540 {
541 	HASHCTL transferHashInfo;
542 	MemSet(&transferHashInfo, 0, sizeof(HASHCTL));
543 	transferHashInfo.keysize = sizeof(NodePair);
544 	transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer);
545 	transferHashInfo.hcxt = CurrentMemoryContext;
546 	HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo,
547 									 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
548 
549 	DistributedResultFragment *fragment = NULL;
550 	foreach_ptr(fragment, fragmentList)
551 	{
552 		List *placementList = ActiveShardPlacementList(fragment->targetShardId);
553 		ShardPlacement *placement = NULL;
554 		foreach_ptr(placement, placementList)
555 		{
556 			NodePair transferKey = {
557 				.sourceNodeId = fragment->nodeId,
558 				.targetNodeId = placement->nodeId
559 			};
560 
561 			if (transferKey.sourceNodeId == transferKey.targetNodeId)
562 			{
563 				continue;
564 			}
565 
566 			bool foundInCache = false;
567 			NodeToNodeFragmentsTransfer *fragmentListTransfer =
568 				hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache);
569 			if (!foundInCache)
570 			{
571 				fragmentListTransfer->nodes = transferKey;
572 				fragmentListTransfer->fragmentList = NIL;
573 			}
574 
575 			fragmentListTransfer->fragmentList =
576 				lappend(fragmentListTransfer->fragmentList, fragment);
577 		}
578 	}
579 
580 	List *fragmentListTransfers = NIL;
581 	NodeToNodeFragmentsTransfer *transfer = NULL;
582 	HASH_SEQ_STATUS hashSeqStatus;
583 
584 	hash_seq_init(&hashSeqStatus, transferHash);
585 
586 	while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL)
587 	{
588 		fragmentListTransfers = lappend(fragmentListTransfers, transfer);
589 	}
590 
591 	return fragmentListTransfers;
592 }
593 
594 
595 /*
596  * FragmentTransferTaskList returns a list of tasks which performs the given list of
597  * transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results.
598  * See QueryStringForFragmentsTransfer for how the query is constructed.
599  */
600 static List *
FragmentTransferTaskList(List * fragmentListTransfers)601 FragmentTransferTaskList(List *fragmentListTransfers)
602 {
603 	List *fetchTaskList = NIL;
604 
605 	NodeToNodeFragmentsTransfer *fragmentsTransfer = NULL;
606 	foreach_ptr(fragmentsTransfer, fragmentListTransfers)
607 	{
608 		uint32 targetNodeId = fragmentsTransfer->nodes.targetNodeId;
609 
610 		/* these should have already been pruned away in ColocationTransfers */
611 		Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId);
612 
613 		WorkerNode *workerNode = LookupNodeByNodeIdOrError(targetNodeId);
614 
615 		ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
616 		SetPlacementNodeMetadata(targetPlacement, workerNode);
617 
618 		Task *task = CitusMakeNode(Task);
619 		task->taskType = READ_TASK;
620 		SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer));
621 		task->taskPlacementList = list_make1(targetPlacement);
622 
623 		fetchTaskList = lappend(fetchTaskList, task);
624 	}
625 
626 	return fetchTaskList;
627 }
628 
629 
630 /*
631  * QueryStringForFragmentsTransfer returns a query which fetches distributed
632  * result fragments from source node to target node. See the structure of
633  * NodeToNodeFragmentsTransfer for details of how these are decided.
634  */
635 static char *
QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer * fragmentsTransfer)636 QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
637 {
638 	StringInfo queryString = makeStringInfo();
639 	StringInfo fragmentNamesArrayString = makeStringInfo();
640 	int fragmentCount = 0;
641 	NodePair *nodePair = &fragmentsTransfer->nodes;
642 	WorkerNode *sourceNode = LookupNodeByNodeIdOrError(nodePair->sourceNodeId);
643 
644 	appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
645 
646 	DistributedResultFragment *fragment = NULL;
647 	foreach_ptr(fragment, fragmentsTransfer->fragmentList)
648 	{
649 		const char *fragmentName = fragment->resultId;
650 
651 		if (fragmentCount > 0)
652 		{
653 			appendStringInfoString(fragmentNamesArrayString, ",");
654 		}
655 
656 		appendStringInfoString(fragmentNamesArrayString,
657 							   quote_literal_cstr(fragmentName));
658 
659 		fragmentCount++;
660 	}
661 
662 	appendStringInfoString(fragmentNamesArrayString, "]::text[]");
663 
664 	appendStringInfo(queryString,
665 					 "SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes",
666 					 fragmentNamesArrayString->data,
667 					 quote_literal_cstr(sourceNode->workerName),
668 					 sourceNode->workerPort);
669 
670 	ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName,
671 							sourceNode->workerPort, queryString->data)));
672 
673 	return queryString->data;
674 }
675 
676 
677 /*
678  * ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks.
679  * It ignores the byte_count result of the fetch_intermediate_results() calls.
680  */
681 static void
ExecuteFetchTaskList(List * taskList)682 ExecuteFetchTaskList(List *taskList)
683 {
684 	int resultColumnCount = 1;
685 
686 	TupleDesc resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
687 
688 	TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
689 
690 	TupleDestination *tupleDestination = CreateTupleDestNone();
691 
692 	bool errorOnAnyFailure = true;
693 	ExecuteSelectTasksIntoTupleDest(taskList, tupleDestination, errorOnAnyFailure);
694 }
695