1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *	  routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitMergeAppend		- initialize the MergeAppend node
17  *		ExecMergeAppend			- retrieve the next tuple from the node
18  *		ExecEndMergeAppend		- shut down the MergeAppend node
19  *		ExecReScanMergeAppend	- rescan the MergeAppend node
20  *
21  *	 NOTES
22  *		A MergeAppend node contains a list of one or more subplans.
23  *		These are each expected to deliver tuples that are sorted according
24  *		to a common sort key.  The MergeAppend node merges these streams
25  *		to produce output sorted the same way.
26  *
27  *		MergeAppend nodes don't make use of their left and right
28  *		subtrees, rather they maintain a list of subplans so
29  *		a typical MergeAppend node looks like this in the plan tree:
get(context.Context)30  *
31  *				   ...
32  *				   /
33  *				MergeAppend---+------+------+--- nil
34  *				/	\		  |		 |		|
35  *			  nil	nil		 ...    ...    ...
36  *								 subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/execPartition.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
46 
47 /*
48  * We have one slot for each item in the heap array.  We use SlotNumber
49  * to store slot indexes.  This doesn't actually provide any formal
50  * type-safety, but it makes the code more self-documenting.
51  */
52 typedef int32 SlotNumber;
53 
54 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 static int	heap_compare_slots(Datum a, Datum b, void *arg);
56 
57 
58 /* ----------------------------------------------------------------
59  *		ExecInitMergeAppend
60  *
61  *		Begin all of the subscans of the MergeAppend node.
62  * ----------------------------------------------------------------
63  */
64 MergeAppendState *
65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 {
67 	MergeAppendState *mergestate = makeNode(MergeAppendState);
68 	PlanState **mergeplanstates;
69 	Bitmapset  *validsubplans;
70 	int			nplans;
71 	int			i,
72 				j;
73 	ListCell   *lc;
74 
75 	/* check for unsupported flags */
76 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77 
78 	/*
79 	 * create new MergeAppendState for our node
80 	 */
81 	mergestate->ps.plan = (Plan *) node;
82 	mergestate->ps.state = estate;
83 	mergestate->ps.ExecProcNode = ExecMergeAppend;
84 	mergestate->ms_noopscan = false;
85 
86 	/* If run-time partition pruning is enabled, then set that up now */
87 	if (node->part_prune_info != NULL)
88 	{
89 		PartitionPruneState *prunestate;
90 
91 		/* We may need an expression context to evaluate partition exprs */
92 		ExecAssignExprContext(estate, &mergestate->ps);
93 
94 		prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
95 												   node->part_prune_info);
96 		mergestate->ms_prune_state = prunestate;
97 
98 		/* Perform an initial partition prune, if required. */
99 		if (prunestate->do_initial_prune)
100 		{
101 			/* Determine which subplans survive initial pruning */
102 			validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
103 															list_length(node->mergeplans));
104 
105 			/*
106 			 * The case where no subplans survive pruning must be handled
107 			 * specially.  The problem here is that code in explain.c requires
108 			 * a MergeAppend to have at least one subplan in order for it to
109 			 * properly determine the Vars in that subplan's targetlist.  We
110 			 * sidestep this issue by just initializing the first subplan and
111 			 * setting ms_noopscan to true to indicate that we don't really
112 			 * need to scan any subnodes.
113 			 */
114 			if (bms_is_empty(validsubplans))
115 			{
116 				mergestate->ms_noopscan = true;
117 
118 				/* Mark the first as valid so that it's initialized below */
119 				validsubplans = bms_make_singleton(0);
120 			}
121 
122 			nplans = bms_num_members(validsubplans);
123 		}
124 		else
125 		{
126 			/* We'll need to initialize all subplans */
127 			nplans = list_length(node->mergeplans);
128 			Assert(nplans > 0);
129 			validsubplans = bms_add_range(NULL, 0, nplans - 1);
130 		}
131 
132 		/*
133 		 * If no runtime pruning is required, we can fill ms_valid_subplans
134 		 * immediately, preventing later calls to ExecFindMatchingSubPlans.
135 		 */
136 		if (!prunestate->do_exec_prune)
137 		{
138 			Assert(nplans > 0);
139 			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
140 		}
141 	}
142 	else
143 	{
144 		nplans = list_length(node->mergeplans);
145 
146 		/*
147 		 * When run-time partition pruning is not enabled we can just mark all
148 		 * subplans as valid; they must also all be initialized.
149 		 */
150 		Assert(nplans > 0);
151 		mergestate->ms_valid_subplans = validsubplans =
152 			bms_add_range(NULL, 0, nplans - 1);
153 		mergestate->ms_prune_state = NULL;
154 	}
155 
156 	mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
157 	mergestate->mergeplans = mergeplanstates;
158 	mergestate->ms_nplans = nplans;
159 
160 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
161 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
162 											  mergestate);
163 
164 	/*
165 	 * Miscellaneous initialization
166 	 *
167 	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
168 	 * so we have to initialize them.  FIXME
169 	 */
170 	ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
171 
172 	/* node returns slots from each of its subnodes, therefore not fixed */
173 	mergestate->ps.resultopsset = true;
174 	mergestate->ps.resultopsfixed = false;
175 
176 	/*
177 	 * call ExecInitNode on each of the valid plans to be executed and save
178 	 * the results into the mergeplanstates array.
179 	 */
180 	j = i = 0;
181 	foreach(lc, node->mergeplans)
182 	{
183 		if (bms_is_member(i, validsubplans))
184 		{
185 			Plan	   *initNode = (Plan *) lfirst(lc);
186 
187 			mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
188 		}
189 		i++;
190 	}
191 
192 	mergestate->ps.ps_ProjInfo = NULL;
193 
194 	/*
195 	 * initialize sort-key information
196 	 */
197 	mergestate->ms_nkeys = node->numCols;
198 	mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
199 
200 	for (i = 0; i < node->numCols; i++)
201 	{
202 		SortSupport sortKey = mergestate->ms_sortkeys + i;
203 
204 		sortKey->ssup_cxt = CurrentMemoryContext;
205 		sortKey->ssup_collation = node->collations[i];
206 		sortKey->ssup_nulls_first = node->nullsFirst[i];
207 		sortKey->ssup_attno = node->sortColIdx[i];
208 
209 		/*
210 		 * It isn't feasible to perform abbreviated key conversion, since
211 		 * tuples are pulled into mergestate's binary heap as needed.  It
212 		 * would likely be counter-productive to convert tuples into an
213 		 * abbreviated representation as they're pulled up, so opt out of that
214 		 * additional optimization entirely.
215 		 */
216 		sortKey->abbreviate = false;
217 
218 		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
219 	}
220 
221 	/*
222 	 * initialize to show we have not run the subplans yet
223 	 */
224 	mergestate->ms_initialized = false;
225 
226 	return mergestate;
227 }
228 
229 /* ----------------------------------------------------------------
230  *	   ExecMergeAppend
231  *
232  *		Handles iteration over multiple subplans.
233  * ----------------------------------------------------------------
234  */
235 static TupleTableSlot *
236 ExecMergeAppend(PlanState *pstate)
237 {
238 	MergeAppendState *node = castNode(MergeAppendState, pstate);
239 	TupleTableSlot *result;
240 	SlotNumber	i;
241 
242 	CHECK_FOR_INTERRUPTS();
243 
244 	if (!node->ms_initialized)
245 	{
246 		/* Nothing to do if all subplans were pruned */
247 		if (node->ms_noopscan)
248 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
249 
250 		/*
251 		 * If we've yet to determine the valid subplans then do so now.  If
252 		 * run-time pruning is disabled then the valid subplans will always be
253 		 * set to all subplans.
254 		 */
255 		if (node->ms_valid_subplans == NULL)
256 			node->ms_valid_subplans =
257 				ExecFindMatchingSubPlans(node->ms_prune_state);
258 
259 		/*
260 		 * First time through: pull the first tuple from each valid subplan,
261 		 * and set up the heap.
262 		 */
263 		i = -1;
264 		while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
265 		{
266 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
267 			if (!TupIsNull(node->ms_slots[i]))
268 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
269 		}
270 		binaryheap_build(node->ms_heap);
271 		node->ms_initialized = true;
272 	}
273 	else
274 	{
275 		/*
276 		 * Otherwise, pull the next tuple from whichever subplan we returned
277 		 * from last time, and reinsert the subplan index into the heap,
278 		 * because it might now compare differently against the existing
279 		 * elements of the heap.  (We could perhaps simplify the logic a bit
280 		 * by doing this before returning from the prior call, but it's better
281 		 * to not pull tuples until necessary.)
282 		 */
283 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
284 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
285 		if (!TupIsNull(node->ms_slots[i]))
286 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
287 		else
288 			(void) binaryheap_remove_first(node->ms_heap);
289 	}
290 
291 	if (binaryheap_empty(node->ms_heap))
292 	{
293 		/* All the subplans are exhausted, and so is the heap */
294 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
295 	}
296 	else
297 	{
298 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
299 		result = node->ms_slots[i];
300 	}
301 
302 	return result;
303 }
304 
305 /*
306  * Compare the tuples in the two given slots.
307  */
308 static int32
309 heap_compare_slots(Datum a, Datum b, void *arg)
310 {
311 	MergeAppendState *node = (MergeAppendState *) arg;
312 	SlotNumber	slot1 = DatumGetInt32(a);
313 	SlotNumber	slot2 = DatumGetInt32(b);
314 
315 	TupleTableSlot *s1 = node->ms_slots[slot1];
316 	TupleTableSlot *s2 = node->ms_slots[slot2];
317 	int			nkey;
318 
319 	Assert(!TupIsNull(s1));
320 	Assert(!TupIsNull(s2));
321 
322 	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
323 	{
324 		SortSupport sortKey = node->ms_sortkeys + nkey;
325 		AttrNumber	attno = sortKey->ssup_attno;
326 		Datum		datum1,
327 					datum2;
328 		bool		isNull1,
329 					isNull2;
330 		int			compare;
331 
332 		datum1 = slot_getattr(s1, attno, &isNull1);
333 		datum2 = slot_getattr(s2, attno, &isNull2);
334 
335 		compare = ApplySortComparator(datum1, isNull1,
336 									  datum2, isNull2,
337 									  sortKey);
338 		if (compare != 0)
339 		{
340 			INVERT_COMPARE_RESULT(compare);
341 			return compare;
342 		}
343 	}
344 	return 0;
345 }
346 
347 /* ----------------------------------------------------------------
348  *		ExecEndMergeAppend
349  *
350  *		Shuts down the subscans of the MergeAppend node.
351  *
352  *		Returns nothing of interest.
353  * ----------------------------------------------------------------
354  */
355 void
356 ExecEndMergeAppend(MergeAppendState *node)
357 {
358 	PlanState **mergeplans;
359 	int			nplans;
360 	int			i;
361 
362 	/*
363 	 * get information from the node
364 	 */
365 	mergeplans = node->mergeplans;
366 	nplans = node->ms_nplans;
367 
368 	/*
369 	 * shut down each of the subscans
370 	 */
371 	for (i = 0; i < nplans; i++)
372 		ExecEndNode(mergeplans[i]);
373 }
374 
375 void
376 ExecReScanMergeAppend(MergeAppendState *node)
377 {
378 	int			i;
379 
380 	/*
381 	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
382 	 * we'd better unset the valid subplans so that they are reselected for
383 	 * the new parameter values.
384 	 */
385 	if (node->ms_prune_state &&
386 		bms_overlap(node->ps.chgParam,
387 					node->ms_prune_state->execparamids))
388 	{
389 		bms_free(node->ms_valid_subplans);
390 		node->ms_valid_subplans = NULL;
391 	}
392 
393 	for (i = 0; i < node->ms_nplans; i++)
394 	{
395 		PlanState  *subnode = node->mergeplans[i];
396 
397 		/*
398 		 * ExecReScan doesn't know about my subplans, so I have to do
399 		 * changed-parameter signaling myself.
400 		 */
401 		if (node->ps.chgParam != NULL)
402 			UpdateChangedParamSet(subnode, node->ps.chgParam);
403 
404 		/*
405 		 * If chgParam of subnode is not null then plan will be re-scanned by
406 		 * first ExecProcNode.
407 		 */
408 		if (subnode->chgParam == NULL)
409 			ExecReScan(subnode);
410 	}
411 	binaryheap_reset(node->ms_heap);
412 	node->ms_initialized = false;
413 }
414