1 /*-------------------------------------------------------------------------
2 *
3 * distributed_intermediate_results.c
4 * Functions for reading and writing distributed intermediate results.
5 *
6 * Copyright (c), Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "distributed/pg_version_constants.h"
12
13 #include <sys/stat.h>
14 #include <unistd.h>
15
16 #include "postgres.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "port.h"
20
21 #include "access/htup_details.h"
22 #include "access/tupdesc.h"
23 #include "catalog/pg_type.h"
24 #include "distributed/deparse_shard_query.h"
25 #include "distributed/intermediate_results.h"
26 #include "distributed/listutils.h"
27 #include "distributed/metadata_utility.h"
28 #include "distributed/metadata_cache.h"
29 #include "distributed/multi_executor.h"
30 #include "distributed/multi_physical_planner.h"
31 #include "distributed/transaction_management.h"
32 #include "distributed/tuple_destination.h"
33 #include "distributed/tuplestore.h"
34 #include "distributed/worker_protocol.h"
35 #include "tcop/pquery.h"
36 #include "tcop/tcopprot.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39
40
41 /*
42 * PartitioningTupleDest is internal representation of a TupleDestination
43 * which consumes queries constructed in WrapTasksForPartitioning.
44 */
45 typedef struct PartitioningTupleDest
46 {
47 TupleDestination pub;
48
49 CitusTableCacheEntry *targetRelation;
50
51 /* MemoryContext in which we add new fragments */
52 MemoryContext fragmentContext;
53
54 /* list of DistributedResultFragment pointer */
55 List *fragmentList;
56
57 /* what do tuples look like */
58 TupleDesc tupleDesc;
59 } PartitioningTupleDest;
60
61
62 /*
63 * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
64 * It is a separate struct to use it as a key in a hash table.
65 */
66 typedef struct NodePair
67 {
68 uint32 sourceNodeId;
69 uint32 targetNodeId;
70 } NodePair;
71
72
73 /*
74 * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from
75 * the source node to the destination node in the NodePair.
76 */
77 typedef struct NodeToNodeFragmentsTransfer
78 {
79 NodePair nodes;
80 List *fragmentList;
81 } NodeToNodeFragmentsTransfer;
82
83
84 /* forward declarations of local functions */
85 static List * WrapTasksForPartitioning(const char *resultIdPrefix,
86 List *selectTaskList,
87 int partitionColumnIndex,
88 CitusTableCacheEntry *targetRelation,
89 bool binaryFormat);
90 static List * ExecutePartitionTaskList(List *partitionTaskList,
91 CitusTableCacheEntry *targetRelation);
92 static PartitioningTupleDest * CreatePartitioningTupleDest(
93 CitusTableCacheEntry *targetRelation);
94 static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
95 int placementIndex, int queryNumber,
96 HeapTuple heapTuple, uint64 tupleLibpqSize);
97 static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int
98 queryNumber);
99 static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
100 datumCount, Oid typeId);
101 static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
102 Oid intervalTypeId, ArrayType **minValueArray,
103 ArrayType **maxValueArray);
104 static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
105 static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple,
106 TupleDesc tupleDesc,
107 CitusTableCacheEntry *
108 targetRelation,
109 uint32 sourceNodeId);
110 static void ExecuteSelectTasksIntoTupleDest(List *taskList,
111 TupleDestination *tupleDestination,
112 bool errorOnAnyFailure);
113 static List ** ColocateFragmentsWithRelation(List *fragmentList,
114 CitusTableCacheEntry *targetRelation);
115 static List * ColocationTransfers(List *fragmentList,
116 CitusTableCacheEntry *targetRelation);
117 static List * FragmentTransferTaskList(List *fragmentListTransfers);
118 static char * QueryStringForFragmentsTransfer(
119 NodeToNodeFragmentsTransfer *fragmentsTransfer);
120 static void ExecuteFetchTaskList(List *fetchTaskList);
121
122
123 /*
124 * RedistributeTaskListResults partitions the results of given task list using
125 * shard ranges and partition method of given targetRelation, and then colocates
126 * the result files with shards.
127 *
128 * If a shard has a replication factor > 1, corresponding result files are copied
129 * to all nodes containing that shard.
130 *
131 * returnValue[shardIndex] is list of cstrings each of which is a resultId which
132 * correspond to targetRelation->sortedShardIntervalArray[shardIndex].
133 *
134 * partitionColumnIndex determines the column in the selectTaskList to use for
135 * partitioning.
136 */
137 List **
RedistributeTaskListResults(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)138 RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList,
139 int partitionColumnIndex,
140 CitusTableCacheEntry *targetRelation,
141 bool binaryFormat)
142 {
143 /*
144 * Make sure that this transaction has a distributed transaction ID.
145 *
146 * Intermediate results will be stored in a directory that is derived
147 * from the distributed transaction ID.
148 */
149 UseCoordinatedTransaction();
150
151 List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList,
152 partitionColumnIndex,
153 targetRelation, binaryFormat);
154 return ColocateFragmentsWithRelation(fragmentList, targetRelation);
155 }
156
157
158 /*
159 * PartitionTasklistResults executes the given task list, and partitions results
160 * of each task based on targetRelation's distribution method and intervals.
161 * Each of the result partitions are stored in the node where task was executed,
162 * and are named as $resultIdPrefix_from_$sourceShardId_to_$targetShardIndex.
163 *
164 * Result is list of DistributedResultFragment, each of which represents a
165 * partition of results. Empty results are omitted. Therefore, if we have N tasks
166 * and target relation has M shards, we will have NxM-(number of empty results)
167 * fragments.
168 *
169 * partitionColumnIndex determines the column in the selectTaskList to use for
170 * partitioning.
171 */
172 List *
PartitionTasklistResults(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)173 PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
174 int partitionColumnIndex,
175 CitusTableCacheEntry *targetRelation,
176 bool binaryFormat)
177 {
178 if (!IsCitusTableTypeCacheEntry(targetRelation, HASH_DISTRIBUTED) &&
179 !IsCitusTableTypeCacheEntry(targetRelation, RANGE_DISTRIBUTED))
180 {
181 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 errmsg("repartitioning results of a tasklist is only supported "
183 "when target relation is hash or range partitioned.")));
184 }
185
186 /*
187 * Make sure that this transaction has a distributed transaction ID.
188 *
189 * Intermediate results will be stored in a directory that is derived
190 * from the distributed transaction ID.
191 */
192 UseCoordinatedTransaction();
193
194 selectTaskList = WrapTasksForPartitioning(resultIdPrefix, selectTaskList,
195 partitionColumnIndex, targetRelation,
196 binaryFormat);
197 return ExecutePartitionTaskList(selectTaskList, targetRelation);
198 }
199
200
201 /*
202 * WrapTasksForPartitioning wraps the query for each of the tasks by a call
203 * to worker_partition_query_result(). Target list of the wrapped query should
204 * match the tuple descriptor in ExecutePartitionTaskList().
205 */
206 static List *
WrapTasksForPartitioning(const char * resultIdPrefix,List * selectTaskList,int partitionColumnIndex,CitusTableCacheEntry * targetRelation,bool binaryFormat)207 WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
208 int partitionColumnIndex,
209 CitusTableCacheEntry *targetRelation,
210 bool binaryFormat)
211 {
212 List *wrappedTaskList = NIL;
213 ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
214 int shardCount = targetRelation->shardIntervalArrayLength;
215
216 ArrayType *minValueArray = NULL;
217 ArrayType *maxValueArray = NULL;
218 Var *partitionColumn = targetRelation->partitionColumn;
219 Oid intervalTypeId = InvalidOid;
220 int32 intervalTypeMod = 0;
221 Oid intervalTypeOutFunc = InvalidOid;
222 bool intervalTypeVarlena = false;
223
224 GetIntervalTypeInfo(targetRelation->partitionMethod, partitionColumn,
225 &intervalTypeId, &intervalTypeMod);
226 getTypeOutputInfo(intervalTypeId, &intervalTypeOutFunc, &intervalTypeVarlena);
227
228 ShardMinMaxValueArrays(shardIntervalArray, shardCount, intervalTypeOutFunc,
229 &minValueArray, &maxValueArray);
230 StringInfo minValuesString = ArrayObjectToString(minValueArray, TEXTOID,
231 intervalTypeMod);
232 StringInfo maxValuesString = ArrayObjectToString(maxValueArray, TEXTOID,
233 intervalTypeMod);
234
235 Task *selectTask = NULL;
236 foreach_ptr(selectTask, selectTaskList)
237 {
238 char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
239 char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
240 "hash" : "range";
241 const char *binaryFormatString = binaryFormat ? "true" : "false";
242
243 Task *wrappedSelectTask = copyObject(selectTask);
244
245 StringInfo wrappedQuery = makeStringInfo();
246 appendStringInfo(wrappedQuery,
247 "SELECT partition_index"
248 ", %s || '_' || partition_index::text "
249 ", rows_written "
250 "FROM worker_partition_query_result"
251 "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
252 quote_literal_cstr(taskPrefix),
253 quote_literal_cstr(taskPrefix),
254 quote_literal_cstr(TaskQueryString(selectTask)),
255 partitionColumnIndex,
256 quote_literal_cstr(partitionMethodString),
257 minValuesString->data, maxValuesString->data,
258 binaryFormatString);
259
260 SetTaskQueryString(wrappedSelectTask, wrappedQuery->data);
261 wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask);
262 }
263
264 return wrappedTaskList;
265 }
266
267
268 /*
269 * CreatePartitioningTupleDest creates a TupleDestination which consumes results of
270 * tasks constructed in WrapTasksForPartitioning.
271 */
272 static PartitioningTupleDest *
CreatePartitioningTupleDest(CitusTableCacheEntry * targetRelation)273 CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation)
274 {
275 int resultColumnCount = 3;
276
277 TupleDesc tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount);
278
279 TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "partition_index",
280 INT4OID, -1, 0);
281 TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "result_id",
282 TEXTOID, -1, 0);
283 TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "rows_written",
284 INT8OID, -1, 0);
285
286
287 PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest));
288 tupleDest->targetRelation = targetRelation;
289 tupleDest->tupleDesc = tupleDescriptor;
290 tupleDest->fragmentContext = CurrentMemoryContext;
291 tupleDest->pub.putTuple = PartitioningTupleDestPutTuple;
292 tupleDest->pub.tupleDescForQuery =
293 PartitioningTupleDestTupleDescForQuery;
294
295 return tupleDest;
296 }
297
298
299 /*
300 * PartitioningTupleDestTupleDescForQuery implements TupleDestination->putTuple for
301 * PartitioningTupleDest.
302 */
303 static void
PartitioningTupleDestPutTuple(TupleDestination * self,Task * task,int placementIndex,int queryNumber,HeapTuple heapTuple,uint64 tupleLibpqSize)304 PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
305 int placementIndex, int queryNumber,
306 HeapTuple heapTuple, uint64 tupleLibpqSize)
307 {
308 PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
309
310 ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex);
311
312 /*
313 * We may be deep inside a nested execution, make sure we can use the
314 * fragment list at the top.
315 */
316 MemoryContext oldContext = MemoryContextSwitchTo(tupleDest->fragmentContext);
317
318 DistributedResultFragment *fragment =
319 TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc,
320 tupleDest->targetRelation,
321 placement->nodeId);
322
323 tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment);
324
325 MemoryContextSwitchTo(oldContext);
326 }
327
328
329 /*
330 * PartitioningTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
331 * for PartitioningTupleDest.
332 */
333 static TupleDesc
PartitioningTupleDestTupleDescForQuery(TupleDestination * self,int queryNumber)334 PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
335 {
336 Assert(queryNumber == 0);
337
338 PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
339
340 return tupleDest->tupleDesc;
341 }
342
343
344 /*
345 * SourceShardPrefix returns result id prefix for partitions which have the
346 * given anchor shard id.
347 */
348 static char *
SourceShardPrefix(const char * resultPrefix,uint64 shardId)349 SourceShardPrefix(const char *resultPrefix, uint64 shardId)
350 {
351 StringInfo taskPrefix = makeStringInfo();
352
353 appendStringInfo(taskPrefix, "%s_from_" UINT64_FORMAT "_to", resultPrefix, shardId);
354
355 return taskPrefix->data;
356 }
357
358
359 /*
360 * ShardMinMaxValueArrays returns min values and max values of given shard
361 * intervals. Returned arrays are text arrays.
362 */
363 static void
ShardMinMaxValueArrays(ShardInterval ** shardIntervalArray,int shardCount,Oid intervalTypeOutFunc,ArrayType ** minValueArray,ArrayType ** maxValueArray)364 ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
365 Oid intervalTypeOutFunc, ArrayType **minValueArray,
366 ArrayType **maxValueArray)
367 {
368 Datum *minValues = palloc0(shardCount * sizeof(Datum));
369 bool *minValueNulls = palloc0(shardCount * sizeof(bool));
370 Datum *maxValues = palloc0(shardCount * sizeof(Datum));
371 bool *maxValueNulls = palloc0(shardCount * sizeof(bool));
372 for (int shardIndex = 0; shardIndex < shardCount; shardIndex++)
373 {
374 minValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->minValueExists;
375 maxValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->maxValueExists;
376
377 if (!minValueNulls[shardIndex])
378 {
379 Datum minValue = shardIntervalArray[shardIndex]->minValue;
380 char *minValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc,
381 minValue));
382 minValues[shardIndex] = CStringGetTextDatum(minValueStr);
383 }
384
385 if (!maxValueNulls[shardIndex])
386 {
387 Datum maxValue = shardIntervalArray[shardIndex]->maxValue;
388 char *maxValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc,
389 maxValue));
390 maxValues[shardIndex] = CStringGetTextDatum(maxValueStr);
391 }
392 }
393
394 *minValueArray = CreateArrayFromDatums(minValues, minValueNulls, shardCount, TEXTOID);
395 *maxValueArray = CreateArrayFromDatums(maxValues, maxValueNulls, shardCount, TEXTOID);
396 }
397
398
399 /*
400 * CreateArrayFromDatums creates an array consisting of given values and nulls.
401 */
402 static ArrayType *
CreateArrayFromDatums(Datum * datumArray,bool * nullsArray,int datumCount,Oid typeId)403 CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId)
404 {
405 bool typeByValue = false;
406 char typeAlignment = 0;
407 int16 typeLength = 0;
408 int dimensions[1] = { datumCount };
409 int lowerbounds[1] = { 1 };
410
411 get_typlenbyvalalign(typeId, &typeLength, &typeByValue, &typeAlignment);
412
413 ArrayType *datumArrayObject = construct_md_array(datumArray, nullsArray, 1,
414 dimensions,
415 lowerbounds, typeId, typeLength,
416 typeByValue, typeAlignment);
417
418 return datumArrayObject;
419 }
420
421
422 /*
423 * ExecutePartitionTaskList executes the queries formed in WrapTasksForPartitioning(),
424 * and returns its results as a list of DistributedResultFragment.
425 */
426 static List *
ExecutePartitionTaskList(List * taskList,CitusTableCacheEntry * targetRelation)427 ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
428 {
429 PartitioningTupleDest *tupleDest = CreatePartitioningTupleDest(targetRelation);
430
431 bool errorOnAnyFailure = false;
432 ExecuteSelectTasksIntoTupleDest(taskList, (TupleDestination *) tupleDest,
433 errorOnAnyFailure);
434
435 return tupleDest->fragmentList;
436 }
437
438
439 /*
440 * TupleToDistributedResultFragment converts a tuple returned by the query in
441 * WrapTasksForPartitioning() to a DistributedResultFragment.
442 */
443 static DistributedResultFragment *
TupleToDistributedResultFragment(HeapTuple tuple,TupleDesc tupleDesc,CitusTableCacheEntry * targetRelation,uint32 sourceNodeId)444 TupleToDistributedResultFragment(HeapTuple tuple,
445 TupleDesc tupleDesc,
446 CitusTableCacheEntry *targetRelation,
447 uint32 sourceNodeId)
448 {
449 bool isNull = false;
450 uint32 targetShardIndex = DatumGetUInt32(heap_getattr(tuple, 1, tupleDesc, &isNull));
451 text *resultId = DatumGetTextP(heap_getattr(tuple, 2, tupleDesc, &isNull));
452 int64 rowCount = DatumGetInt64(heap_getattr(tuple, 3, tupleDesc, &isNull));
453
454 Assert(targetShardIndex < targetRelation->shardIntervalArrayLength);
455 ShardInterval *shardInterval =
456 targetRelation->sortedShardIntervalArray[targetShardIndex];
457
458 DistributedResultFragment *distributedResultFragment =
459 palloc0(sizeof(DistributedResultFragment));
460
461 distributedResultFragment->nodeId = sourceNodeId;
462 distributedResultFragment->targetShardIndex = targetShardIndex;
463 distributedResultFragment->targetShardId = shardInterval->shardId;
464 distributedResultFragment->resultId = text_to_cstring(resultId);
465 distributedResultFragment->rowCount = rowCount;
466
467 return distributedResultFragment;
468 }
469
470
471 /*
472 * ExecuteSelectTasksIntoTupleDest executes the given tasks and forwards its result
473 * to the given destination.
474 */
475 static void
ExecuteSelectTasksIntoTupleDest(List * taskList,TupleDestination * tupleDestination,bool errorOnAnyFailure)476 ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination,
477 bool errorOnAnyFailure)
478 {
479 bool expectResults = true;
480 int targetPoolSize = MaxAdaptiveExecutorPoolSize;
481 TransactionProperties xactProperties = {
482 .errorOnAnyFailure = errorOnAnyFailure,
483 .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
484 .requires2PC = false
485 };
486
487 bool localExecutionSupported = true;
488 ExecutionParams *executionParams = CreateBasicExecutionParams(
489 ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
490 );
491 executionParams->tupleDestination = tupleDestination;
492 executionParams->xactProperties = xactProperties;
493 executionParams->expectResults = expectResults;
494
495 ExecuteTaskListExtended(executionParams);
496 }
497
498
499 /*
500 * ColocateFragmentsWithRelation moves the fragments in the cluster so they are
501 * colocated with the shards of target relation. These transfers are done by
502 * calls to fetch_intermediate_results() between nodes.
503 *
504 * returnValue[shardIndex] is list of result Ids that are colocated with
505 * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are
506 * done.
507 */
508 static List **
ColocateFragmentsWithRelation(List * fragmentList,CitusTableCacheEntry * targetRelation)509 ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation)
510 {
511 List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation);
512 List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers);
513
514 ExecuteFetchTaskList(fragmentTransferTaskList);
515
516 int shardCount = targetRelation->shardIntervalArrayLength;
517 List **shardResultIdList = palloc0(shardCount * sizeof(List *));
518
519 DistributedResultFragment *sourceFragment = NULL;
520 foreach_ptr(sourceFragment, fragmentList)
521 {
522 int shardIndex = sourceFragment->targetShardIndex;
523
524 Assert(shardIndex < shardCount);
525 shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex],
526 sourceFragment->resultId);
527 }
528
529 return shardResultIdList;
530 }
531
532
533 /*
534 * ColocationTransfers returns a list of transfers to colocate given fragments with
535 * shards of the target relation. These transfers also take into account replicated
536 * target relations. This prunes away transfers with same source and target
537 */
538 static List *
ColocationTransfers(List * fragmentList,CitusTableCacheEntry * targetRelation)539 ColocationTransfers(List *fragmentList, CitusTableCacheEntry *targetRelation)
540 {
541 HASHCTL transferHashInfo;
542 MemSet(&transferHashInfo, 0, sizeof(HASHCTL));
543 transferHashInfo.keysize = sizeof(NodePair);
544 transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer);
545 transferHashInfo.hcxt = CurrentMemoryContext;
546 HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo,
547 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
548
549 DistributedResultFragment *fragment = NULL;
550 foreach_ptr(fragment, fragmentList)
551 {
552 List *placementList = ActiveShardPlacementList(fragment->targetShardId);
553 ShardPlacement *placement = NULL;
554 foreach_ptr(placement, placementList)
555 {
556 NodePair transferKey = {
557 .sourceNodeId = fragment->nodeId,
558 .targetNodeId = placement->nodeId
559 };
560
561 if (transferKey.sourceNodeId == transferKey.targetNodeId)
562 {
563 continue;
564 }
565
566 bool foundInCache = false;
567 NodeToNodeFragmentsTransfer *fragmentListTransfer =
568 hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache);
569 if (!foundInCache)
570 {
571 fragmentListTransfer->nodes = transferKey;
572 fragmentListTransfer->fragmentList = NIL;
573 }
574
575 fragmentListTransfer->fragmentList =
576 lappend(fragmentListTransfer->fragmentList, fragment);
577 }
578 }
579
580 List *fragmentListTransfers = NIL;
581 NodeToNodeFragmentsTransfer *transfer = NULL;
582 HASH_SEQ_STATUS hashSeqStatus;
583
584 hash_seq_init(&hashSeqStatus, transferHash);
585
586 while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL)
587 {
588 fragmentListTransfers = lappend(fragmentListTransfers, transfer);
589 }
590
591 return fragmentListTransfers;
592 }
593
594
595 /*
596 * FragmentTransferTaskList returns a list of tasks which performs the given list of
597 * transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results.
598 * See QueryStringForFragmentsTransfer for how the query is constructed.
599 */
600 static List *
FragmentTransferTaskList(List * fragmentListTransfers)601 FragmentTransferTaskList(List *fragmentListTransfers)
602 {
603 List *fetchTaskList = NIL;
604
605 NodeToNodeFragmentsTransfer *fragmentsTransfer = NULL;
606 foreach_ptr(fragmentsTransfer, fragmentListTransfers)
607 {
608 uint32 targetNodeId = fragmentsTransfer->nodes.targetNodeId;
609
610 /* these should have already been pruned away in ColocationTransfers */
611 Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId);
612
613 WorkerNode *workerNode = LookupNodeByNodeIdOrError(targetNodeId);
614
615 ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
616 SetPlacementNodeMetadata(targetPlacement, workerNode);
617
618 Task *task = CitusMakeNode(Task);
619 task->taskType = READ_TASK;
620 SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer));
621 task->taskPlacementList = list_make1(targetPlacement);
622
623 fetchTaskList = lappend(fetchTaskList, task);
624 }
625
626 return fetchTaskList;
627 }
628
629
630 /*
631 * QueryStringForFragmentsTransfer returns a query which fetches distributed
632 * result fragments from source node to target node. See the structure of
633 * NodeToNodeFragmentsTransfer for details of how these are decided.
634 */
635 static char *
QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer * fragmentsTransfer)636 QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
637 {
638 StringInfo queryString = makeStringInfo();
639 StringInfo fragmentNamesArrayString = makeStringInfo();
640 int fragmentCount = 0;
641 NodePair *nodePair = &fragmentsTransfer->nodes;
642 WorkerNode *sourceNode = LookupNodeByNodeIdOrError(nodePair->sourceNodeId);
643
644 appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
645
646 DistributedResultFragment *fragment = NULL;
647 foreach_ptr(fragment, fragmentsTransfer->fragmentList)
648 {
649 const char *fragmentName = fragment->resultId;
650
651 if (fragmentCount > 0)
652 {
653 appendStringInfoString(fragmentNamesArrayString, ",");
654 }
655
656 appendStringInfoString(fragmentNamesArrayString,
657 quote_literal_cstr(fragmentName));
658
659 fragmentCount++;
660 }
661
662 appendStringInfoString(fragmentNamesArrayString, "]::text[]");
663
664 appendStringInfo(queryString,
665 "SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes",
666 fragmentNamesArrayString->data,
667 quote_literal_cstr(sourceNode->workerName),
668 sourceNode->workerPort);
669
670 ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName,
671 sourceNode->workerPort, queryString->data)));
672
673 return queryString->data;
674 }
675
676
677 /*
678 * ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks.
679 * It ignores the byte_count result of the fetch_intermediate_results() calls.
680 */
681 static void
ExecuteFetchTaskList(List * taskList)682 ExecuteFetchTaskList(List *taskList)
683 {
684 int resultColumnCount = 1;
685
686 TupleDesc resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
687
688 TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
689
690 TupleDestination *tupleDestination = CreateTupleDestNone();
691
692 bool errorOnAnyFailure = true;
693 ExecuteSelectTasksIntoTupleDest(taskList, tupleDestination, errorOnAnyFailure);
694 }
695