1 /*-------------------------------------------------------------------------
2  *
3  * nodeIncrementalSort.c
4  *	  Routines to handle incremental sorting of relations.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/executor/nodeIncrementalSort.c
11  *
12  * DESCRIPTION
13  *
14  *	Incremental sort is an optimized variant of multikey sort for cases
15  *	when the input is already sorted by a prefix of the sort keys.  For
16  *	example when a sort by (key1, key2 ... keyN) is requested, and the
17  *	input is already sorted by (key1, key2 ... keyM), M < N, we can
18  *	divide the input into groups where keys (key1, ... keyM) are equal,
19  *	and only sort on the remaining columns.
20  *
21  *	Consider the following example.  We have input tuples consisting of
22  *	two integers (X, Y) already presorted by X, while it's required to
23  *	sort them by both X and Y.  Let input tuples be following.
24  *
25  *	(1, 5)
26  *	(1, 2)
27  *	(2, 9)
28  *	(2, 1)
29  *	(2, 5)
30  *	(3, 3)
31  *	(3, 7)
32  *
33  *	An incremental sort algorithm would split the input into the following
34  *	groups, which have equal X, and then sort them by Y individually:
35  *
36  *		(1, 5) (1, 2)
37  *		(2, 9) (2, 1) (2, 5)
38  *		(3, 3) (3, 7)
39  *
40  *	After sorting these groups and putting them altogether, we would get
41  *	the following result which is sorted by X and Y, as requested:
42  *
43  *	(1, 2)
44  *	(1, 5)
45  *	(2, 1)
46  *	(2, 5)
47  *	(2, 9)
48  *	(3, 3)
49  *	(3, 7)
50  *
51  *	Incremental sort may be more efficient than plain sort, particularly
52  *	on large datasets, as it reduces the amount of data to sort at once,
53  *	making it more likely it fits into work_mem (eliminating the need to
54  *	spill to disk).  But the main advantage of incremental sort is that
55  *	it can start producing rows early, before sorting the whole dataset,
56  *	which is a significant benefit especially for queries with LIMIT.
57  *
58  *	The algorithm we've implemented here is modified from the theoretical
59  *	base described above by operating in two different modes:
60  *	  - Fetching a minimum number of tuples without checking prefix key
61  *	    group membership and sorting on all columns when safe.
62  *	  - Fetching all tuples for a single prefix key group and sorting on
63  *	    solely the unsorted columns.
64  *	We always begin in the first mode, and employ a heuristic to switch
65  *	into the second mode if we believe it's beneficial.
66  *
67  *	Sorting incrementally can potentially use less memory, avoid fetching
68  *	and sorting all tuples in the dataset, and begin returning tuples before
69  *	the entire result set is available.
70  *
71  *	The hybrid mode approach allows us to optimize for both very small
72  *	groups (where the overhead of a new tuplesort is high) and very	large
73  *	groups (where we can lower cost by not having to sort on already sorted
74  *	columns), albeit at some extra cost while switching between modes.
75  *
76  *-------------------------------------------------------------------------
77  */
78 
79 #include "postgres.h"
80 
81 #include "access/htup_details.h"
82 #include "executor/execdebug.h"
83 #include "executor/nodeIncrementalSort.h"
84 #include "miscadmin.h"
85 #include "utils/lsyscache.h"
86 #include "utils/tuplesort.h"
87 
88 /*
89  * We need to store the instrumentation information in either local node's sort
90  * info or, for a parallel worker process, in the shared info (this avoids
91  * having to additionally memcpy the info from local memory to shared memory
92  * at each instrumentation call). This macro expands to choose the proper sort
93  * state and group info.
94  *
95  * Arguments:
96  * - node: type IncrementalSortState *
97  * - groupName: the token fullsort or prefixsort
98  */
99 #define INSTRUMENT_SORT_GROUP(node, groupName) \
100 	do { \
101 		if ((node)->ss.ps.instrument != NULL) \
102 		{ \
103 			if ((node)->shared_info && (node)->am_worker) \
104 			{ \
105 				Assert(IsParallelWorker()); \
106 				Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
107 				instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
108 									  (node)->groupName##_state); \
109 			} \
110 			else \
111 			{ \
112 				instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
113 									  (node)->groupName##_state); \
114 			} \
115 		} \
116 	} while (0)
117 
118 
119 /* ----------------------------------------------------------------
120  * instrumentSortedGroup
121  *
122  * Because incremental sort processes (potentially many) sort batches, we need
123  * to capture tuplesort stats each time we finalize a sort state. This summary
124  * data is later used for EXPLAIN ANALYZE output.
125  * ----------------------------------------------------------------
126  */
127 static void
128 instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo,
129 					  Tuplesortstate *sortState)
130 {
131 	TuplesortInstrumentation sort_instr;
132 
133 	groupInfo->groupCount++;
134 
135 	tuplesort_get_stats(sortState, &sort_instr);
136 
137 	/* Calculate total and maximum memory and disk space used. */
138 	switch (sort_instr.spaceType)
139 	{
140 		case SORT_SPACE_TYPE_DISK:
141 			groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
142 			if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
143 				groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
144 
145 			break;
146 		case SORT_SPACE_TYPE_MEMORY:
147 			groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
148 			if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
149 				groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
150 
151 			break;
152 	}
153 
154 	/* Track each sort method we've used. */
155 	groupInfo->sortMethods |= sort_instr.sortMethod;
156 }
157 
158 /* ----------------------------------------------------------------
159  * preparePresortedCols
160  *
161  * Prepare information for presorted_keys comparisons.
162  * ----------------------------------------------------------------
163  */
164 static void
165 preparePresortedCols(IncrementalSortState *node)
166 {
167 	IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
168 
169 	node->presorted_keys =
170 		(PresortedKeyData *) palloc(plannode->nPresortedCols *
171 									sizeof(PresortedKeyData));
172 
173 	/* Pre-cache comparison functions for each pre-sorted key. */
174 	for (int i = 0; i < plannode->nPresortedCols; i++)
175 	{
176 		Oid			equalityOp,
177 					equalityFunc;
178 		PresortedKeyData *key;
179 
180 		key = &node->presorted_keys[i];
181 		key->attno = plannode->sort.sortColIdx[i];
182 
183 		equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
184 													 NULL);
185 		if (!OidIsValid(equalityOp))
186 			elog(ERROR, "missing equality operator for ordering operator %u",
187 				 plannode->sort.sortOperators[i]);
188 
189 		equalityFunc = get_opcode(equalityOp);
190 		if (!OidIsValid(equalityFunc))
191 			elog(ERROR, "missing function for operator %u", equalityOp);
192 
193 		/* Lookup the comparison function */
194 		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
195 
196 		/* We can initialize the callinfo just once and re-use it */
197 		key->fcinfo = palloc0(SizeForFunctionCallInfo(2));
198 		InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
199 								 plannode->sort.collations[i], NULL, NULL);
200 		key->fcinfo->args[0].isnull = false;
201 		key->fcinfo->args[1].isnull = false;
202 	}
203 }
204 
205 /* ----------------------------------------------------------------
206  * isCurrentGroup
207  *
208  * Check whether a given tuple belongs to the current sort group by comparing
209  * the presorted column values to the pivot tuple of the current group.
210  * ----------------------------------------------------------------
211  */
212 static bool
213 isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
214 {
215 	int			nPresortedCols;
216 
217 	nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
218 
219 	/*
220 	 * That the input is sorted by keys * (0, ... n) implies that the tail
221 	 * keys are more likely to change. Therefore we do our comparison starting
222 	 * from the last pre-sorted column to optimize for early detection of
223 	 * inequality and minimizing the number of function calls..
224 	 */
225 	for (int i = nPresortedCols - 1; i >= 0; i--)
226 	{
227 		Datum		datumA,
228 					datumB,
229 					result;
230 		bool		isnullA,
231 					isnullB;
232 		AttrNumber	attno = node->presorted_keys[i].attno;
233 		PresortedKeyData *key;
234 
235 		datumA = slot_getattr(pivot, attno, &isnullA);
236 		datumB = slot_getattr(tuple, attno, &isnullB);
237 
238 		/* Special case for NULL-vs-NULL, else use standard comparison */
239 		if (isnullA || isnullB)
240 		{
241 			if (isnullA == isnullB)
242 				continue;
243 			else
244 				return false;
245 		}
246 
247 		key = &node->presorted_keys[i];
248 
249 		key->fcinfo->args[0].value = datumA;
250 		key->fcinfo->args[1].value = datumB;
251 
252 		/* just for paranoia's sake, we reset isnull each time */
253 		key->fcinfo->isnull = false;
254 
255 		result = FunctionCallInvoke(key->fcinfo);
256 
257 		/* Check for null result, since caller is clearly not expecting one */
258 		if (key->fcinfo->isnull)
259 			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
260 
261 		if (!DatumGetBool(result))
262 			return false;
263 	}
264 	return true;
265 }
266 
267 /* ----------------------------------------------------------------
268  * switchToPresortedPrefixMode
269  *
270  * When we determine that we've likely encountered a large batch of tuples all
271  * having the same presorted prefix values, we want to optimize tuplesort by
272  * only sorting on unsorted suffix keys.
273  *
274  * The problem is that we've already accumulated several tuples in another
275  * tuplesort configured to sort by all columns (assuming that there may be
276  * more than one prefix key group). So to switch to presorted prefix mode we
277  * have to go back and look at all the tuples we've already accumulated to
278  * verify they're all part of the same prefix key group before sorting them
279  * solely by unsorted suffix keys.
280  *
281  * While it's likely that all tuples already fetched are all part of a single
282  * prefix group, we also have to handle the possibility that there is at least
283  * one different prefix key group before the large prefix key group.
284  * ----------------------------------------------------------------
285  */
286 static void
287 switchToPresortedPrefixMode(PlanState *pstate)
288 {
289 	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
290 	ScanDirection dir;
291 	int64		nTuples;
292 	TupleDesc	tupDesc;
293 	PlanState  *outerNode;
294 	IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
295 
296 	dir = node->ss.ps.state->es_direction;
297 	outerNode = outerPlanState(node);
298 	tupDesc = ExecGetResultType(outerNode);
299 
300 	/* Configure the prefix sort state the first time around. */
301 	if (node->prefixsort_state == NULL)
302 	{
303 		Tuplesortstate *prefixsort_state;
304 		int			nPresortedCols = plannode->nPresortedCols;
305 
306 		/*
307 		 * Optimize the sort by assuming the prefix columns are all equal and
308 		 * thus we only need to sort by any remaining columns.
309 		 */
310 		prefixsort_state = tuplesort_begin_heap(tupDesc,
311 												plannode->sort.numCols - nPresortedCols,
312 												&(plannode->sort.sortColIdx[nPresortedCols]),
313 												&(plannode->sort.sortOperators[nPresortedCols]),
314 												&(plannode->sort.collations[nPresortedCols]),
315 												&(plannode->sort.nullsFirst[nPresortedCols]),
316 												work_mem,
317 												NULL,
318 												false);
319 		node->prefixsort_state = prefixsort_state;
320 	}
321 	else
322 	{
323 		/* Next group of presorted data */
324 		tuplesort_reset(node->prefixsort_state);
325 	}
326 
327 	/*
328 	 * If the current node has a bound, then it's reasonably likely that a
329 	 * large prefix key group will benefit from bounded sort, so configure the
330 	 * tuplesort to allow for that optimization.
331 	 */
332 	if (node->bounded)
333 	{
334 		SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
335 				   node->bound - node->bound_Done);
336 		tuplesort_set_bound(node->prefixsort_state,
337 							node->bound - node->bound_Done);
338 	}
339 
340 	/*
341 	 * Copy as many tuples as we can (i.e., in the same prefix key group) from
342 	 * the full sort state to the prefix sort state.
343 	 */
344 	for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
345 	{
346 		/*
347 		 * When we encounter multiple prefix key groups inside the full sort
348 		 * tuplesort we have to carry over the last read tuple into the next
349 		 * batch.
350 		 */
351 		if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
352 		{
353 			tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
354 			/* The carried over tuple is our new group pivot tuple. */
355 			ExecCopySlot(node->group_pivot, node->transfer_tuple);
356 		}
357 		else
358 		{
359 			tuplesort_gettupleslot(node->fullsort_state,
360 								   ScanDirectionIsForward(dir),
361 								   false, node->transfer_tuple, NULL);
362 
363 			/*
364 			 * If this is our first time through the loop, then we need to
365 			 * save the first tuple we get as our new group pivot.
366 			 */
367 			if (TupIsNull(node->group_pivot))
368 				ExecCopySlot(node->group_pivot, node->transfer_tuple);
369 
370 			if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
371 			{
372 				tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
373 			}
374 			else
375 			{
376 				/*
377 				 * The tuple isn't part of the current batch so we need to
378 				 * carry it over into the next batch of tuples we transfer out
379 				 * of the full sort tuplesort into the presorted prefix
380 				 * tuplesort. We don't actually have to do anything special to
381 				 * save the tuple since we've already loaded it into the
382 				 * node->transfer_tuple slot, and, even though that slot
383 				 * points to memory inside the full sort tuplesort, we can't
384 				 * reset that tuplesort anyway until we've fully transferred
385 				 * out its tuples, so this reference is safe. We do need to
386 				 * reset the group pivot tuple though since we've finished the
387 				 * current prefix key group.
388 				 */
389 				ExecClearTuple(node->group_pivot);
390 
391 				/* Break out of for-loop early */
392 				break;
393 			}
394 		}
395 	}
396 
397 	/*
398 	 * Track how many tuples remain in the full sort batch so that we know if
399 	 * we need to sort multiple prefix key groups before processing tuples
400 	 * remaining in the large single prefix key group we think we've
401 	 * encountered.
402 	 */
403 	SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
404 	node->n_fullsort_remaining -= nTuples;
405 	SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
406 
407 	if (node->n_fullsort_remaining == 0)
408 	{
409 		/*
410 		 * We've found that all tuples remaining in the full sort batch are in
411 		 * the same prefix key group and moved all of those tuples into the
412 		 * presorted prefix tuplesort.  We don't know that we've yet found the
413 		 * last tuple in the current prefix key group, so save our pivot
414 		 * comparison tuple and continue fetching tuples from the outer
415 		 * execution node to load into the presorted prefix tuplesort.
416 		 */
417 		ExecCopySlot(node->group_pivot, node->transfer_tuple);
418 		SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
419 		node->execution_status = INCSORT_LOADPREFIXSORT;
420 
421 		/*
422 		 * Make sure we clear the transfer tuple slot so that next time we
423 		 * encounter a large prefix key group we don't incorrectly assume we
424 		 * have a tuple carried over from the previous group.
425 		 */
426 		ExecClearTuple(node->transfer_tuple);
427 	}
428 	else
429 	{
430 		/*
431 		 * We finished a group but didn't consume all of the tuples from the
432 		 * full sort state, so we'll sort this batch, let the outer node read
433 		 * out all of those tuples, and then come back around to find another
434 		 * batch.
435 		 */
436 		SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
437 		tuplesort_performsort(node->prefixsort_state);
438 
439 		INSTRUMENT_SORT_GROUP(node, prefixsort);
440 
441 		if (node->bounded)
442 		{
443 			/*
444 			 * If the current node has a bound and we've already sorted n
445 			 * tuples, then the functional bound remaining is (original bound
446 			 * - n), so store the current number of processed tuples for use
447 			 * in configuring sorting bound.
448 			 */
449 			SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
450 					   Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
451 			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
452 		}
453 
454 		SO_printf("Setting execution_status to INCSORT_READPREFIXSORT  (switchToPresortedPrefixMode)\n");
455 		node->execution_status = INCSORT_READPREFIXSORT;
456 	}
457 }
458 
459 /*
460  * Sorting many small groups with tuplesort is inefficient. In order to
461  * cope with this problem we don't start a new group until the current one
462  * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also
463  * means we can't assume small groups of tuples all have the same prefix keys.)
464  * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking
465  * for the new group as soon as we've met our bound to avoid fetching more
466  * tuples than we absolutely have to fetch.
467  */
468 #define DEFAULT_MIN_GROUP_SIZE 32
469 
470 /*
471  * While we've optimized for small prefix key groups by not starting our prefix
472  * key comparisons until we've reached a minimum number of tuples, we don't want
473  * that optimization to cause us to lose out on the benefits of being able to
474  * assume a large group of tuples is fully presorted by its prefix keys.
475  * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic
476  * for determining when we believe we've encountered a large group, and, if we
477  * get to that point without finding a new prefix key group we transition to
478  * presorted prefix key mode.
479  */
480 #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
481 
482 /* ----------------------------------------------------------------
483  *		ExecIncrementalSort
484  *
485  *		Assuming that outer subtree returns tuple presorted by some prefix
486  *		of target sort columns, performs incremental sort.
487  *
488  *		Conditions:
489  *		  -- none.
490  *
491  *		Initial States:
492  *		  -- the outer child is prepared to return the first tuple.
493  * ----------------------------------------------------------------
494  */
495 static TupleTableSlot *
496 ExecIncrementalSort(PlanState *pstate)
497 {
498 	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
499 	EState	   *estate;
500 	ScanDirection dir;
501 	Tuplesortstate *read_sortstate;
502 	Tuplesortstate *fullsort_state;
503 	TupleTableSlot *slot;
504 	IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
505 	PlanState  *outerNode;
506 	TupleDesc	tupDesc;
507 	int64		nTuples = 0;
508 	int64		minGroupSize;
509 
510 	CHECK_FOR_INTERRUPTS();
511 
512 	estate = node->ss.ps.state;
513 	dir = estate->es_direction;
514 	fullsort_state = node->fullsort_state;
515 
516 	/*
517 	 * If a previous iteration has sorted a batch, then we need to check to
518 	 * see if there are any remaining tuples in that batch that we can return
519 	 * before moving on to other execution states.
520 	 */
521 	if (node->execution_status == INCSORT_READFULLSORT
522 		|| node->execution_status == INCSORT_READPREFIXSORT)
523 	{
524 		/*
525 		 * Return next tuple from the current sorted group set if available.
526 		 */
527 		read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
528 			fullsort_state : node->prefixsort_state;
529 		slot = node->ss.ps.ps_ResultTupleSlot;
530 
531 		/*
532 		 * We have to populate the slot from the tuplesort before checking
533 		 * outerNodeDone because it will set the slot to NULL if no more
534 		 * tuples remain. If the tuplesort is empty, but we don't have any
535 		 * more tuples available for sort from the outer node, then
536 		 * outerNodeDone will have been set so we'll return that now-empty
537 		 * slot to the caller.
538 		 */
539 		if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
540 								   false, slot, NULL) || node->outerNodeDone)
541 
542 			/*
543 			 * Note: there isn't a good test case for the node->outerNodeDone
544 			 * check directly, but we need it for any plan where the outer
545 			 * node will fail when trying to fetch too many tuples.
546 			 */
547 			return slot;
548 		else if (node->n_fullsort_remaining > 0)
549 		{
550 			/*
551 			 * When we transition to presorted prefix mode, we might have
552 			 * accumulated at least one additional prefix key group in the
553 			 * full sort tuplesort. The first call to
554 			 * switchToPresortedPrefixMode() will have pulled the first one of
555 			 * those groups out, and we've returned those tuples to the parent
556 			 * node, but if at this point we still have tuples remaining in
557 			 * the full sort state (i.e., n_fullsort_remaining > 0), then we
558 			 * need to re-execute the prefix mode transition function to pull
559 			 * out the next prefix key group.
560 			 */
561 			SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
562 					   node->n_fullsort_remaining);
563 			switchToPresortedPrefixMode(pstate);
564 		}
565 		else
566 		{
567 			/*
568 			 * If we don't have any sorted tuples to read and we're not
569 			 * currently transitioning into presorted prefix sort mode, then
570 			 * it's time to start the process all over again by building a new
571 			 * group in the full sort state.
572 			 */
573 			SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
574 			node->execution_status = INCSORT_LOADFULLSORT;
575 		}
576 	}
577 
578 	/*
579 	 * Scan the subplan in the forward direction while creating the sorted
580 	 * data.
581 	 */
582 	estate->es_direction = ForwardScanDirection;
583 
584 	outerNode = outerPlanState(node);
585 	tupDesc = ExecGetResultType(outerNode);
586 
587 	/* Load tuples into the full sort state. */
588 	if (node->execution_status == INCSORT_LOADFULLSORT)
589 	{
590 		/*
591 		 * Initialize sorting structures.
592 		 */
593 		if (fullsort_state == NULL)
594 		{
595 			/*
596 			 * Initialize presorted column support structures for
597 			 * isCurrentGroup(). It's correct to do this along with the
598 			 * initial initialization for the full sort state (and not for the
599 			 * prefix sort state) since we always load the full sort state
600 			 * first.
601 			 */
602 			preparePresortedCols(node);
603 
604 			/*
605 			 * Since we optimize small prefix key groups by accumulating a
606 			 * minimum number of tuples before sorting, we can't assume that a
607 			 * group of tuples all have the same prefix key values. Hence we
608 			 * setup the full sort tuplesort to sort by all requested sort
609 			 * keys.
610 			 */
611 			fullsort_state = tuplesort_begin_heap(tupDesc,
612 												  plannode->sort.numCols,
613 												  plannode->sort.sortColIdx,
614 												  plannode->sort.sortOperators,
615 												  plannode->sort.collations,
616 												  plannode->sort.nullsFirst,
617 												  work_mem,
618 												  NULL,
619 												  false);
620 			node->fullsort_state = fullsort_state;
621 		}
622 		else
623 		{
624 			/* Reset sort for the next batch. */
625 			tuplesort_reset(fullsort_state);
626 		}
627 
628 		/*
629 		 * Calculate the remaining tuples left if bounded and configure both
630 		 * bounded sort and the minimum group size accordingly.
631 		 */
632 		if (node->bounded)
633 		{
634 			int64		currentBound = node->bound - node->bound_Done;
635 
636 			/*
637 			 * Bounded sort isn't likely to be a useful optimization for full
638 			 * sort mode since we limit full sort mode to a relatively small
639 			 * number of tuples and tuplesort doesn't switch over to top-n
640 			 * heap sort anyway unless it hits (2 * bound) tuples.
641 			 */
642 			if (currentBound < DEFAULT_MIN_GROUP_SIZE)
643 				tuplesort_set_bound(fullsort_state, currentBound);
644 
645 			minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
646 		}
647 		else
648 			minGroupSize = DEFAULT_MIN_GROUP_SIZE;
649 
650 		/*
651 		 * Because we have to read the next tuple to find out that we've
652 		 * encountered a new prefix key group, on subsequent groups we have to
653 		 * carry over that extra tuple and add it to the new group's sort here
654 		 * before we read any new tuples from the outer node.
655 		 */
656 		if (!TupIsNull(node->group_pivot))
657 		{
658 			tuplesort_puttupleslot(fullsort_state, node->group_pivot);
659 			nTuples++;
660 
661 			/*
662 			 * We're in full sort mode accumulating a minimum number of tuples
663 			 * and not checking for prefix key equality yet, so we can't
664 			 * assume the group pivot tuple will remain the same -- unless
665 			 * we're using a minimum group size of 1, in which case the pivot
666 			 * is obviously still the pivot.
667 			 */
668 			if (nTuples != minGroupSize)
669 				ExecClearTuple(node->group_pivot);
670 		}
671 
672 
673 		/*
674 		 * Pull as many tuples from the outer node as possible given our
675 		 * current operating mode.
676 		 */
677 		for (;;)
678 		{
679 			slot = ExecProcNode(outerNode);
680 
681 			/*
682 			 * If the outer node can't provide us any more tuples, then we can
683 			 * sort the current group and return those tuples.
684 			 */
685 			if (TupIsNull(slot))
686 			{
687 				/*
688 				 * We need to know later if the outer node has completed to be
689 				 * able to distinguish between being done with a batch and
690 				 * being done with the whole node.
691 				 */
692 				node->outerNodeDone = true;
693 
694 				SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
695 				tuplesort_performsort(fullsort_state);
696 
697 				INSTRUMENT_SORT_GROUP(node, fullsort);
698 
699 				SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
700 				node->execution_status = INCSORT_READFULLSORT;
701 				break;
702 			}
703 
704 			/* Accumulate the next group of presorted tuples. */
705 			if (nTuples < minGroupSize)
706 			{
707 				/*
708 				 * If we haven't yet hit our target minimum group size, then
709 				 * we don't need to bother checking for inclusion in the
710 				 * current prefix group since at this point we'll assume that
711 				 * we'll full sort this batch to avoid a large number of very
712 				 * tiny (and thus inefficient) sorts.
713 				 */
714 				tuplesort_puttupleslot(fullsort_state, slot);
715 				nTuples++;
716 
717 				/*
718 				 * If we've reached our minimum group size, then we need to
719 				 * store the most recent tuple as a pivot.
720 				 */
721 				if (nTuples == minGroupSize)
722 					ExecCopySlot(node->group_pivot, slot);
723 			}
724 			else
725 			{
726 				/*
727 				 * If we've already accumulated enough tuples to reach our
728 				 * minimum group size, then we need to compare any additional
729 				 * tuples to our pivot tuple to see if we reach the end of
730 				 * that prefix key group. Only after we find changed prefix
731 				 * keys can we guarantee sort stability of the tuples we've
732 				 * already accumulated.
733 				 */
734 				if (isCurrentGroup(node, node->group_pivot, slot))
735 				{
736 					/*
737 					 * As long as the prefix keys match the pivot tuple then
738 					 * load the tuple into the tuplesort.
739 					 */
740 					tuplesort_puttupleslot(fullsort_state, slot);
741 					nTuples++;
742 				}
743 				else
744 				{
745 					/*
746 					 * Since the tuple we fetched isn't part of the current
747 					 * prefix key group we don't want to sort it as part of
748 					 * the current batch. Instead we use the group_pivot slot
749 					 * to carry it over to the next batch (even though we
750 					 * won't actually treat it as a group pivot).
751 					 */
752 					ExecCopySlot(node->group_pivot, slot);
753 
754 					if (node->bounded)
755 					{
756 						/*
757 						 * If the current node has a bound, and we've already
758 						 * sorted n tuples, then the functional bound
759 						 * remaining is (original bound - n), so store the
760 						 * current number of processed tuples for later use
761 						 * configuring the sort state's bound.
762 						 */
763 						SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
764 								   node->bound_Done,
765 								   Min(node->bound, node->bound_Done + nTuples));
766 						node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
767 					}
768 
769 					/*
770 					 * Once we find changed prefix keys we can complete the
771 					 * sort and transition modes to reading out the sorted
772 					 * tuples.
773 					 */
774 					SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
775 							   nTuples);
776 					tuplesort_performsort(fullsort_state);
777 
778 					INSTRUMENT_SORT_GROUP(node, fullsort);
779 
780 					SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
781 					node->execution_status = INCSORT_READFULLSORT;
782 					break;
783 				}
784 			}
785 
786 			/*
787 			 * Unless we've already transitioned modes to reading from the
788 			 * full sort state, then we assume that having read at least
789 			 * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
790 			 * processing a large group of tuples all having equal prefix keys
791 			 * (but haven't yet found the final tuple in that prefix key
792 			 * group), so we need to transition into presorted prefix mode.
793 			 */
794 			if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
795 				node->execution_status != INCSORT_READFULLSORT)
796 			{
797 				/*
798 				 * The group pivot we have stored has already been put into
799 				 * the tuplesort; we don't want to carry it over. Since we
800 				 * haven't yet found the end of the prefix key group, it might
801 				 * seem like we should keep this, but we don't actually know
802 				 * how many prefix key groups might be represented in the full
803 				 * sort state, so we'll let the mode transition function
804 				 * manage this state for us.
805 				 */
806 				ExecClearTuple(node->group_pivot);
807 
808 				/*
809 				 * Unfortunately the tuplesort API doesn't include a way to
810 				 * retrieve tuples unless a sort has been performed, so we
811 				 * perform the sort even though we could just as easily rely
812 				 * on FIFO retrieval semantics when transferring them to the
813 				 * presorted prefix tuplesort.
814 				 */
815 				SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
816 				tuplesort_performsort(fullsort_state);
817 
818 				INSTRUMENT_SORT_GROUP(node, fullsort);
819 
820 				/*
821 				 * If the full sort tuplesort happened to switch into top-n
822 				 * heapsort mode then we will only be able to retrieve
823 				 * currentBound tuples (since the tuplesort will have only
824 				 * retained the top-n tuples). This is safe even though we
825 				 * haven't yet completed fetching the current prefix key group
826 				 * because the tuples we've "lost" already sorted "below" the
827 				 * retained ones, and we're already contractually guaranteed
828 				 * to not need any more than the currentBound tuples.
829 				 */
830 				if (tuplesort_used_bound(node->fullsort_state))
831 				{
832 					int64		currentBound = node->bound - node->bound_Done;
833 
834 					SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
835 							   nTuples, Min(currentBound, nTuples));
836 					nTuples = Min(currentBound, nTuples);
837 				}
838 
839 				SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
840 						   nTuples);
841 
842 				/*
843 				 * We might have multiple prefix key groups in the full sort
844 				 * state, so the mode transition function needs to know that
845 				 * it needs to move from the fullsort to presorted prefix
846 				 * sort.
847 				 */
848 				node->n_fullsort_remaining = nTuples;
849 
850 				/* Transition the tuples to the presorted prefix tuplesort. */
851 				switchToPresortedPrefixMode(pstate);
852 
853 				/*
854 				 * Since we know we had tuples to move to the presorted prefix
855 				 * tuplesort, we know that unless that transition has verified
856 				 * that all tuples belonged to the same prefix key group (in
857 				 * which case we can go straight to continuing to load tuples
858 				 * into that tuplesort), we should have a tuple to return
859 				 * here.
860 				 *
861 				 * Either way, the appropriate execution status should have
862 				 * been set by switchToPresortedPrefixMode(), so we can drop
863 				 * out of the loop here and let the appropriate path kick in.
864 				 */
865 				break;
866 			}
867 		}
868 	}
869 
870 	if (node->execution_status == INCSORT_LOADPREFIXSORT)
871 	{
872 		/*
873 		 * We only enter this state after the mode transition function has
874 		 * confirmed all remaining tuples from the full sort state have the
875 		 * same prefix and moved those tuples to the prefix sort state. That
876 		 * function has also set a group pivot tuple (which doesn't need to be
877 		 * carried over; it's already been put into the prefix sort state).
878 		 */
879 		Assert(!TupIsNull(node->group_pivot));
880 
881 		/*
882 		 * Read tuples from the outer node and load them into the prefix sort
883 		 * state until we encounter a tuple whose prefix keys don't match the
884 		 * current group_pivot tuple, since we can't guarantee sort stability
885 		 * until we have all tuples matching those prefix keys.
886 		 */
887 		for (;;)
888 		{
889 			slot = ExecProcNode(outerNode);
890 
891 			/*
892 			 * If we've exhausted tuples from the outer node we're done
893 			 * loading the prefix sort state.
894 			 */
895 			if (TupIsNull(slot))
896 			{
897 				/*
898 				 * We need to know later if the outer node has completed to be
899 				 * able to distinguish between being done with a batch and
900 				 * being done with the whole node.
901 				 */
902 				node->outerNodeDone = true;
903 				break;
904 			}
905 
906 			/*
907 			 * If the tuple's prefix keys match our pivot tuple, we're not
908 			 * done yet and can load it into the prefix sort state. If not, we
909 			 * don't want to sort it as part of the current batch. Instead we
910 			 * use the group_pivot slot to carry it over to the next batch
911 			 * (even though we won't actually treat it as a group pivot).
912 			 */
913 			if (isCurrentGroup(node, node->group_pivot, slot))
914 			{
915 				tuplesort_puttupleslot(node->prefixsort_state, slot);
916 				nTuples++;
917 			}
918 			else
919 			{
920 				ExecCopySlot(node->group_pivot, slot);
921 				break;
922 			}
923 		}
924 
925 		/*
926 		 * Perform the sort and begin returning the tuples to the parent plan
927 		 * node.
928 		 */
929 		SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
930 		tuplesort_performsort(node->prefixsort_state);
931 
932 		INSTRUMENT_SORT_GROUP(node, prefixsort);
933 
934 		SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
935 		node->execution_status = INCSORT_READPREFIXSORT;
936 
937 		if (node->bounded)
938 		{
939 			/*
940 			 * If the current node has a bound, and we've already sorted n
941 			 * tuples, then the functional bound remaining is (original bound
942 			 * - n), so store the current number of processed tuples for use
943 			 * in configuring sorting bound.
944 			 */
945 			SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
946 					   node->bound_Done,
947 					   Min(node->bound, node->bound_Done + nTuples));
948 			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
949 		}
950 	}
951 
952 	/* Restore to user specified direction. */
953 	estate->es_direction = dir;
954 
955 	/*
956 	 * Get the first or next tuple from tuplesort. Returns NULL if no more
957 	 * tuples.
958 	 */
959 	read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
960 		fullsort_state : node->prefixsort_state;
961 	slot = node->ss.ps.ps_ResultTupleSlot;
962 	(void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
963 								  false, slot, NULL);
964 	return slot;
965 }
966 
967 /* ----------------------------------------------------------------
968  *		ExecInitIncrementalSort
969  *
970  *		Creates the run-time state information for the sort node
971  *		produced by the planner and initializes its outer subtree.
972  * ----------------------------------------------------------------
973  */
974 IncrementalSortState *
975 ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
976 {
977 	IncrementalSortState *incrsortstate;
978 
979 	SO_printf("ExecInitIncrementalSort: initializing sort node\n");
980 
981 	/*
982 	 * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
983 	 * EXEC_FLAG_MARK, because the current sort state contains only one sort
984 	 * batch rather than the full result set.
985 	 */
986 	Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
987 
988 	/* Initialize state structure. */
989 	incrsortstate = makeNode(IncrementalSortState);
990 	incrsortstate->ss.ps.plan = (Plan *) node;
991 	incrsortstate->ss.ps.state = estate;
992 	incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
993 
994 	incrsortstate->execution_status = INCSORT_LOADFULLSORT;
995 	incrsortstate->bounded = false;
996 	incrsortstate->outerNodeDone = false;
997 	incrsortstate->bound_Done = 0;
998 	incrsortstate->fullsort_state = NULL;
999 	incrsortstate->prefixsort_state = NULL;
1000 	incrsortstate->group_pivot = NULL;
1001 	incrsortstate->transfer_tuple = NULL;
1002 	incrsortstate->n_fullsort_remaining = 0;
1003 	incrsortstate->presorted_keys = NULL;
1004 
1005 	if (incrsortstate->ss.ps.instrument != NULL)
1006 	{
1007 		IncrementalSortGroupInfo *fullsortGroupInfo =
1008 		&incrsortstate->incsort_info.fullsortGroupInfo;
1009 		IncrementalSortGroupInfo *prefixsortGroupInfo =
1010 		&incrsortstate->incsort_info.prefixsortGroupInfo;
1011 
1012 		fullsortGroupInfo->groupCount = 0;
1013 		fullsortGroupInfo->maxDiskSpaceUsed = 0;
1014 		fullsortGroupInfo->totalDiskSpaceUsed = 0;
1015 		fullsortGroupInfo->maxMemorySpaceUsed = 0;
1016 		fullsortGroupInfo->totalMemorySpaceUsed = 0;
1017 		fullsortGroupInfo->sortMethods = 0;
1018 		prefixsortGroupInfo->groupCount = 0;
1019 		prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1020 		prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1021 		prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1022 		prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1023 		prefixsortGroupInfo->sortMethods = 0;
1024 	}
1025 
1026 	/*
1027 	 * Miscellaneous initialization
1028 	 *
1029 	 * Sort nodes don't initialize their ExprContexts because they never call
1030 	 * ExecQual or ExecProject.
1031 	 */
1032 
1033 	/*
1034 	 * Initialize child nodes.
1035 	 *
1036 	 * Incremental sort does not support backwards scans and mark/restore, so
1037 	 * we don't bother removing the flags from eflags here. We allow passing a
1038 	 * REWIND flag, because although incremental sort can't use it, the child
1039 	 * nodes may be able to do something more useful.
1040 	 */
1041 	outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1042 
1043 	/*
1044 	 * Initialize scan slot and type.
1045 	 */
1046 	ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1047 
1048 	/*
1049 	 * Initialize return slot and type. No need to initialize projection info
1050 	 * because we don't do any projections.
1051 	 */
1052 	ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple);
1053 	incrsortstate->ss.ps.ps_ProjInfo = NULL;
1054 
1055 	/*
1056 	 * Initialize standalone slots to store a tuple for pivot prefix keys and
1057 	 * for carrying over a tuple from one batch to the next.
1058 	 */
1059 	incrsortstate->group_pivot =
1060 		MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1061 								 &TTSOpsMinimalTuple);
1062 	incrsortstate->transfer_tuple =
1063 		MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1064 								 &TTSOpsMinimalTuple);
1065 
1066 	SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1067 
1068 	return incrsortstate;
1069 }
1070 
1071 /* ----------------------------------------------------------------
1072  *		ExecEndIncrementalSort(node)
1073  * ----------------------------------------------------------------
1074  */
1075 void
1076 ExecEndIncrementalSort(IncrementalSortState *node)
1077 {
1078 	SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1079 
1080 	/* clean out the scan tuple */
1081 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
1082 	/* must drop pointer to sort result tuple */
1083 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1084 	/* must drop standalone tuple slots from outer node */
1085 	ExecDropSingleTupleTableSlot(node->group_pivot);
1086 	ExecDropSingleTupleTableSlot(node->transfer_tuple);
1087 
1088 	/*
1089 	 * Release tuplesort resources.
1090 	 */
1091 	if (node->fullsort_state != NULL)
1092 	{
1093 		tuplesort_end(node->fullsort_state);
1094 		node->fullsort_state = NULL;
1095 	}
1096 	if (node->prefixsort_state != NULL)
1097 	{
1098 		tuplesort_end(node->prefixsort_state);
1099 		node->prefixsort_state = NULL;
1100 	}
1101 
1102 	/*
1103 	 * Shut down the subplan.
1104 	 */
1105 	ExecEndNode(outerPlanState(node));
1106 
1107 	SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1108 }
1109 
1110 void
1111 ExecReScanIncrementalSort(IncrementalSortState *node)
1112 {
1113 	PlanState  *outerPlan = outerPlanState(node);
1114 
1115 	/*
1116 	 * Incremental sort doesn't support efficient rescan even when parameters
1117 	 * haven't changed (e.g., rewind) because unlike regular sort we don't
1118 	 * store all tuples at once for the full sort.
1119 	 *
1120 	 * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1121 	 * re-execute the sort along with the child node. Incremental sort itself
1122 	 * can't do anything smarter, but maybe the child nodes can.
1123 	 *
1124 	 * In theory if we've only filled the full sort with one batch (and
1125 	 * haven't reset it for a new batch yet) then we could efficiently rewind,
1126 	 * but that seems a narrow enough case that it's not worth handling
1127 	 * specially at this time.
1128 	 */
1129 
1130 	/* must drop pointer to sort result tuple */
1131 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1132 
1133 	if (node->group_pivot != NULL)
1134 		ExecClearTuple(node->group_pivot);
1135 	if (node->transfer_tuple != NULL)
1136 		ExecClearTuple(node->transfer_tuple);
1137 
1138 	node->outerNodeDone = false;
1139 	node->n_fullsort_remaining = 0;
1140 	node->bound_Done = 0;
1141 	node->presorted_keys = NULL;
1142 
1143 	node->execution_status = INCSORT_LOADFULLSORT;
1144 
1145 	/*
1146 	 * If we've set up either of the sort states yet, we need to reset them.
1147 	 * We could end them and null out the pointers, but there's no reason to
1148 	 * repay the setup cost, and because ExecIncrementalSort guards presorted
1149 	 * column functions by checking to see if the full sort state has been
1150 	 * initialized yet, setting the sort states to null here might actually
1151 	 * cause a leak.
1152 	 */
1153 	if (node->fullsort_state != NULL)
1154 	{
1155 		tuplesort_reset(node->fullsort_state);
1156 		node->fullsort_state = NULL;
1157 	}
1158 	if (node->prefixsort_state != NULL)
1159 	{
1160 		tuplesort_reset(node->prefixsort_state);
1161 		node->prefixsort_state = NULL;
1162 	}
1163 
1164 	/*
1165 	 * If chgParam of subnode is not null, then the plan will be re-scanned by
1166 	 * the first ExecProcNode.
1167 	 */
1168 	if (outerPlan->chgParam == NULL)
1169 		ExecReScan(outerPlan);
1170 }
1171 
1172 /* ----------------------------------------------------------------
1173  *						Parallel Query Support
1174  * ----------------------------------------------------------------
1175  */
1176 
1177 /* ----------------------------------------------------------------
1178  *		ExecSortEstimate
1179  *
1180  *		Estimate space required to propagate sort statistics.
1181  * ----------------------------------------------------------------
1182  */
1183 void
1184 ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
1185 {
1186 	Size		size;
1187 
1188 	/* don't need this if not instrumenting or no workers */
1189 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1190 		return;
1191 
1192 	size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1193 	size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1194 	shm_toc_estimate_chunk(&pcxt->estimator, size);
1195 	shm_toc_estimate_keys(&pcxt->estimator, 1);
1196 }
1197 
1198 /* ----------------------------------------------------------------
1199  *		ExecSortInitializeDSM
1200  *
1201  *		Initialize DSM space for sort statistics.
1202  * ----------------------------------------------------------------
1203  */
1204 void
1205 ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
1206 {
1207 	Size		size;
1208 
1209 	/* don't need this if not instrumenting or no workers */
1210 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1211 		return;
1212 
1213 	size = offsetof(SharedIncrementalSortInfo, sinfo)
1214 		+ pcxt->nworkers * sizeof(IncrementalSortInfo);
1215 	node->shared_info = shm_toc_allocate(pcxt->toc, size);
1216 	/* ensure any unfilled slots will contain zeroes */
1217 	memset(node->shared_info, 0, size);
1218 	node->shared_info->num_workers = pcxt->nworkers;
1219 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1220 				   node->shared_info);
1221 }
1222 
1223 /* ----------------------------------------------------------------
1224  *		ExecSortInitializeWorker
1225  *
1226  *		Attach worker to DSM space for sort statistics.
1227  * ----------------------------------------------------------------
1228  */
1229 void
1230 ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
1231 {
1232 	node->shared_info =
1233 		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1234 	node->am_worker = true;
1235 }
1236 
1237 /* ----------------------------------------------------------------
1238  *		ExecSortRetrieveInstrumentation
1239  *
1240  *		Transfer sort statistics from DSM to private memory.
1241  * ----------------------------------------------------------------
1242  */
1243 void
1244 ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
1245 {
1246 	Size		size;
1247 	SharedIncrementalSortInfo *si;
1248 
1249 	if (node->shared_info == NULL)
1250 		return;
1251 
1252 	size = offsetof(SharedIncrementalSortInfo, sinfo)
1253 		+ node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1254 	si = palloc(size);
1255 	memcpy(si, node->shared_info, size);
1256 	node->shared_info = si;
1257 }
1258