1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *	  routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitMergeAppend		- initialize the MergeAppend node
17  *		ExecMergeAppend			- retrieve the next tuple from the node
18  *		ExecEndMergeAppend		- shut down the MergeAppend node
19  *		ExecReScanMergeAppend	- rescan the MergeAppend node
20  *
21  *	 NOTES
22  *		A MergeAppend node contains a list of one or more subplans.
23  *		These are each expected to deliver tuples that are sorted according
24  *		to a common sort key.  The MergeAppend node merges these streams
25  *		to produce output sorted the same way.
26  *
27  *		MergeAppend nodes don't make use of their left and right
28  *		subtrees, rather they maintain a list of subplans so
29  *		a typical MergeAppend node looks like this in the plan tree:
30  *
31  *				   ...
32  *				   /
33  *				MergeAppend---+------+------+--- nil
34  *				/	\		  |		 |		|
35  *			  nil	nil		 ...    ...    ...
36  *								 subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/execPartition.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
46 
47 /*
48  * We have one slot for each item in the heap array.  We use SlotNumber
49  * to store slot indexes.  This doesn't actually provide any formal
50  * type-safety, but it makes the code more self-documenting.
51  */
52 typedef int32 SlotNumber;
53 
54 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 static int	heap_compare_slots(Datum a, Datum b, void *arg);
56 
57 
58 /* ----------------------------------------------------------------
59  *		ExecInitMergeAppend
60  *
61  *		Begin all of the subscans of the MergeAppend node.
62  * ----------------------------------------------------------------
63  */
64 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 {
67 	MergeAppendState *mergestate = makeNode(MergeAppendState);
68 	PlanState **mergeplanstates;
69 	Bitmapset  *validsubplans;
70 	int			nplans;
71 	int			i,
72 				j;
73 
74 	/* check for unsupported flags */
75 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 
77 	/*
78 	 * create new MergeAppendState for our node
79 	 */
80 	mergestate->ps.plan = (Plan *) node;
81 	mergestate->ps.state = estate;
82 	mergestate->ps.ExecProcNode = ExecMergeAppend;
83 
84 	/* If run-time partition pruning is enabled, then set that up now */
85 	if (node->part_prune_info != NULL)
86 	{
87 		PartitionPruneState *prunestate;
88 
89 		/* We may need an expression context to evaluate partition exprs */
90 		ExecAssignExprContext(estate, &mergestate->ps);
91 
92 		prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
93 												   node->part_prune_info);
94 		mergestate->ms_prune_state = prunestate;
95 
96 		/* Perform an initial partition prune, if required. */
97 		if (prunestate->do_initial_prune)
98 		{
99 			/* Determine which subplans survive initial pruning */
100 			validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
101 															list_length(node->mergeplans));
102 
103 			nplans = bms_num_members(validsubplans);
104 		}
105 		else
106 		{
107 			/* We'll need to initialize all subplans */
108 			nplans = list_length(node->mergeplans);
109 			Assert(nplans > 0);
110 			validsubplans = bms_add_range(NULL, 0, nplans - 1);
111 		}
112 
113 		/*
114 		 * When no run-time pruning is required and there's at least one
115 		 * subplan, we can fill as_valid_subplans immediately, preventing
116 		 * later calls to ExecFindMatchingSubPlans.
117 		 */
118 		if (!prunestate->do_exec_prune && nplans > 0)
119 			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
120 	}
121 	else
122 	{
123 		nplans = list_length(node->mergeplans);
124 
125 		/*
126 		 * When run-time partition pruning is not enabled we can just mark all
127 		 * subplans as valid; they must also all be initialized.
128 		 */
129 		Assert(nplans > 0);
130 		mergestate->ms_valid_subplans = validsubplans =
131 			bms_add_range(NULL, 0, nplans - 1);
132 		mergestate->ms_prune_state = NULL;
133 	}
134 
135 	mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
136 	mergestate->mergeplans = mergeplanstates;
137 	mergestate->ms_nplans = nplans;
138 
139 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
140 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
141 											  mergestate);
142 
143 	/*
144 	 * Miscellaneous initialization
145 	 *
146 	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
147 	 * so we have to initialize them.  FIXME
148 	 */
149 	ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
150 
151 	/* node returns slots from each of its subnodes, therefore not fixed */
152 	mergestate->ps.resultopsset = true;
153 	mergestate->ps.resultopsfixed = false;
154 
155 	/*
156 	 * call ExecInitNode on each of the valid plans to be executed and save
157 	 * the results into the mergeplanstates array.
158 	 */
159 	j = 0;
160 	i = -1;
161 	while ((i = bms_next_member(validsubplans, i)) >= 0)
162 	{
163 		Plan	   *initNode = (Plan *) list_nth(node->mergeplans, i);
164 
165 		mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
166 	}
167 
168 	mergestate->ps.ps_ProjInfo = NULL;
169 
170 	/*
171 	 * initialize sort-key information
172 	 */
173 	mergestate->ms_nkeys = node->numCols;
174 	mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
175 
176 	for (i = 0; i < node->numCols; i++)
177 	{
178 		SortSupport sortKey = mergestate->ms_sortkeys + i;
179 
180 		sortKey->ssup_cxt = CurrentMemoryContext;
181 		sortKey->ssup_collation = node->collations[i];
182 		sortKey->ssup_nulls_first = node->nullsFirst[i];
183 		sortKey->ssup_attno = node->sortColIdx[i];
184 
185 		/*
186 		 * It isn't feasible to perform abbreviated key conversion, since
187 		 * tuples are pulled into mergestate's binary heap as needed.  It
188 		 * would likely be counter-productive to convert tuples into an
189 		 * abbreviated representation as they're pulled up, so opt out of that
190 		 * additional optimization entirely.
191 		 */
192 		sortKey->abbreviate = false;
193 
194 		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
195 	}
196 
197 	/*
198 	 * initialize to show we have not run the subplans yet
199 	 */
200 	mergestate->ms_initialized = false;
201 
202 	return mergestate;
203 }
204 
205 /* ----------------------------------------------------------------
206  *	   ExecMergeAppend
207  *
208  *		Handles iteration over multiple subplans.
209  * ----------------------------------------------------------------
210  */
211 static TupleTableSlot *
ExecMergeAppend(PlanState * pstate)212 ExecMergeAppend(PlanState *pstate)
213 {
214 	MergeAppendState *node = castNode(MergeAppendState, pstate);
215 	TupleTableSlot *result;
216 	SlotNumber	i;
217 
218 	CHECK_FOR_INTERRUPTS();
219 
220 	if (!node->ms_initialized)
221 	{
222 		/* Nothing to do if all subplans were pruned */
223 		if (node->ms_nplans == 0)
224 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
225 
226 		/*
227 		 * If we've yet to determine the valid subplans then do so now.  If
228 		 * run-time pruning is disabled then the valid subplans will always be
229 		 * set to all subplans.
230 		 */
231 		if (node->ms_valid_subplans == NULL)
232 			node->ms_valid_subplans =
233 				ExecFindMatchingSubPlans(node->ms_prune_state);
234 
235 		/*
236 		 * First time through: pull the first tuple from each valid subplan,
237 		 * and set up the heap.
238 		 */
239 		i = -1;
240 		while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
241 		{
242 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
243 			if (!TupIsNull(node->ms_slots[i]))
244 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
245 		}
246 		binaryheap_build(node->ms_heap);
247 		node->ms_initialized = true;
248 	}
249 	else
250 	{
251 		/*
252 		 * Otherwise, pull the next tuple from whichever subplan we returned
253 		 * from last time, and reinsert the subplan index into the heap,
254 		 * because it might now compare differently against the existing
255 		 * elements of the heap.  (We could perhaps simplify the logic a bit
256 		 * by doing this before returning from the prior call, but it's better
257 		 * to not pull tuples until necessary.)
258 		 */
259 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
260 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
261 		if (!TupIsNull(node->ms_slots[i]))
262 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
263 		else
264 			(void) binaryheap_remove_first(node->ms_heap);
265 	}
266 
267 	if (binaryheap_empty(node->ms_heap))
268 	{
269 		/* All the subplans are exhausted, and so is the heap */
270 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
271 	}
272 	else
273 	{
274 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
275 		result = node->ms_slots[i];
276 	}
277 
278 	return result;
279 }
280 
281 /*
282  * Compare the tuples in the two given slots.
283  */
284 static int32
heap_compare_slots(Datum a,Datum b,void * arg)285 heap_compare_slots(Datum a, Datum b, void *arg)
286 {
287 	MergeAppendState *node = (MergeAppendState *) arg;
288 	SlotNumber	slot1 = DatumGetInt32(a);
289 	SlotNumber	slot2 = DatumGetInt32(b);
290 
291 	TupleTableSlot *s1 = node->ms_slots[slot1];
292 	TupleTableSlot *s2 = node->ms_slots[slot2];
293 	int			nkey;
294 
295 	Assert(!TupIsNull(s1));
296 	Assert(!TupIsNull(s2));
297 
298 	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
299 	{
300 		SortSupport sortKey = node->ms_sortkeys + nkey;
301 		AttrNumber	attno = sortKey->ssup_attno;
302 		Datum		datum1,
303 					datum2;
304 		bool		isNull1,
305 					isNull2;
306 		int			compare;
307 
308 		datum1 = slot_getattr(s1, attno, &isNull1);
309 		datum2 = slot_getattr(s2, attno, &isNull2);
310 
311 		compare = ApplySortComparator(datum1, isNull1,
312 									  datum2, isNull2,
313 									  sortKey);
314 		if (compare != 0)
315 		{
316 			INVERT_COMPARE_RESULT(compare);
317 			return compare;
318 		}
319 	}
320 	return 0;
321 }
322 
323 /* ----------------------------------------------------------------
324  *		ExecEndMergeAppend
325  *
326  *		Shuts down the subscans of the MergeAppend node.
327  *
328  *		Returns nothing of interest.
329  * ----------------------------------------------------------------
330  */
331 void
ExecEndMergeAppend(MergeAppendState * node)332 ExecEndMergeAppend(MergeAppendState *node)
333 {
334 	PlanState **mergeplans;
335 	int			nplans;
336 	int			i;
337 
338 	/*
339 	 * get information from the node
340 	 */
341 	mergeplans = node->mergeplans;
342 	nplans = node->ms_nplans;
343 
344 	/*
345 	 * shut down each of the subscans
346 	 */
347 	for (i = 0; i < nplans; i++)
348 		ExecEndNode(mergeplans[i]);
349 }
350 
351 void
ExecReScanMergeAppend(MergeAppendState * node)352 ExecReScanMergeAppend(MergeAppendState *node)
353 {
354 	int			i;
355 
356 	/*
357 	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
358 	 * we'd better unset the valid subplans so that they are reselected for
359 	 * the new parameter values.
360 	 */
361 	if (node->ms_prune_state &&
362 		bms_overlap(node->ps.chgParam,
363 					node->ms_prune_state->execparamids))
364 	{
365 		bms_free(node->ms_valid_subplans);
366 		node->ms_valid_subplans = NULL;
367 	}
368 
369 	for (i = 0; i < node->ms_nplans; i++)
370 	{
371 		PlanState  *subnode = node->mergeplans[i];
372 
373 		/*
374 		 * ExecReScan doesn't know about my subplans, so I have to do
375 		 * changed-parameter signaling myself.
376 		 */
377 		if (node->ps.chgParam != NULL)
378 			UpdateChangedParamSet(subnode, node->ps.chgParam);
379 
380 		/*
381 		 * If chgParam of subnode is not null then plan will be re-scanned by
382 		 * first ExecProcNode.
383 		 */
384 		if (subnode->chgParam == NULL)
385 			ExecReScan(subnode);
386 	}
387 	binaryheap_reset(node->ms_heap);
388 	node->ms_initialized = false;
389 }
390