1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *	  routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitMergeAppend		- initialize the MergeAppend node
17  *		ExecMergeAppend			- retrieve the next tuple from the node
18  *		ExecEndMergeAppend		- shut down the MergeAppend node
19  *		ExecReScanMergeAppend	- rescan the MergeAppend node
20  *
21  *	 NOTES
22  *		A MergeAppend node contains a list of one or more subplans.
23  *		These are each expected to deliver tuples that are sorted according
24  *		to a common sort key.  The MergeAppend node merges these streams
25  *		to produce output sorted the same way.
26  *
27  *		MergeAppend nodes don't make use of their left and right
28  *		subtrees, rather they maintain a list of subplans so
29  *		a typical MergeAppend node looks like this in the plan tree:
30  *
31  *				   ...
32  *				   /
33  *				MergeAppend---+------+------+--- nil
34  *				/	\		  |		 |		|
35  *			  nil	nil		 ...    ...    ...
36  *								 subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43 
44 #include "lib/binaryheap.h"
45 
46 /*
47  * We have one slot for each item in the heap array.  We use SlotNumber
48  * to store slot indexes.  This doesn't actually provide any formal
49  * type-safety, but it makes the code more self-documenting.
50  */
51 typedef int32 SlotNumber;
52 
53 static int	heap_compare_slots(Datum a, Datum b, void *arg);
54 
55 
56 /* ----------------------------------------------------------------
57  *		ExecInitMergeAppend
58  *
59  *		Begin all of the subscans of the MergeAppend node.
60  * ----------------------------------------------------------------
61  */
62 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)63 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
64 {
65 	MergeAppendState *mergestate = makeNode(MergeAppendState);
66 	PlanState **mergeplanstates;
67 	int			nplans;
68 	int			i;
69 	ListCell   *lc;
70 
71 	/* check for unsupported flags */
72 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
73 
74 	/*
75 	 * Set up empty vector of subplan states
76 	 */
77 	nplans = list_length(node->mergeplans);
78 
79 	mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
80 
81 	/*
82 	 * create new MergeAppendState for our node
83 	 */
84 	mergestate->ps.plan = (Plan *) node;
85 	mergestate->ps.state = estate;
86 	mergestate->mergeplans = mergeplanstates;
87 	mergestate->ms_nplans = nplans;
88 
89 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
90 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
91 											  mergestate);
92 
93 	/*
94 	 * Miscellaneous initialization
95 	 *
96 	 * MergeAppend plans don't have expression contexts because they never
97 	 * call ExecQual or ExecProject.
98 	 */
99 
100 	/*
101 	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
102 	 * so we have to initialize them.
103 	 */
104 	ExecInitResultTupleSlot(estate, &mergestate->ps);
105 
106 	/*
107 	 * call ExecInitNode on each of the plans to be executed and save the
108 	 * results into the array "mergeplans".
109 	 */
110 	i = 0;
111 	foreach(lc, node->mergeplans)
112 	{
113 		Plan	   *initNode = (Plan *) lfirst(lc);
114 
115 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
116 		i++;
117 	}
118 
119 	/*
120 	 * initialize output tuple type
121 	 */
122 	ExecAssignResultTypeFromTL(&mergestate->ps);
123 	mergestate->ps.ps_ProjInfo = NULL;
124 
125 	/*
126 	 * initialize sort-key information
127 	 */
128 	mergestate->ms_nkeys = node->numCols;
129 	mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
130 
131 	for (i = 0; i < node->numCols; i++)
132 	{
133 		SortSupport sortKey = mergestate->ms_sortkeys + i;
134 
135 		sortKey->ssup_cxt = CurrentMemoryContext;
136 		sortKey->ssup_collation = node->collations[i];
137 		sortKey->ssup_nulls_first = node->nullsFirst[i];
138 		sortKey->ssup_attno = node->sortColIdx[i];
139 
140 		/*
141 		 * It isn't feasible to perform abbreviated key conversion, since
142 		 * tuples are pulled into mergestate's binary heap as needed.  It
143 		 * would likely be counter-productive to convert tuples into an
144 		 * abbreviated representation as they're pulled up, so opt out of that
145 		 * additional optimization entirely.
146 		 */
147 		sortKey->abbreviate = false;
148 
149 		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
150 	}
151 
152 	/*
153 	 * initialize to show we have not run the subplans yet
154 	 */
155 	mergestate->ms_initialized = false;
156 
157 	return mergestate;
158 }
159 
160 /* ----------------------------------------------------------------
161  *	   ExecMergeAppend
162  *
163  *		Handles iteration over multiple subplans.
164  * ----------------------------------------------------------------
165  */
166 TupleTableSlot *
ExecMergeAppend(MergeAppendState * node)167 ExecMergeAppend(MergeAppendState *node)
168 {
169 	TupleTableSlot *result;
170 	SlotNumber	i;
171 
172 	if (!node->ms_initialized)
173 	{
174 		/*
175 		 * First time through: pull the first tuple from each subplan, and set
176 		 * up the heap.
177 		 */
178 		for (i = 0; i < node->ms_nplans; i++)
179 		{
180 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
181 			if (!TupIsNull(node->ms_slots[i]))
182 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
183 		}
184 		binaryheap_build(node->ms_heap);
185 		node->ms_initialized = true;
186 	}
187 	else
188 	{
189 		/*
190 		 * Otherwise, pull the next tuple from whichever subplan we returned
191 		 * from last time, and reinsert the subplan index into the heap,
192 		 * because it might now compare differently against the existing
193 		 * elements of the heap.  (We could perhaps simplify the logic a bit
194 		 * by doing this before returning from the prior call, but it's better
195 		 * to not pull tuples until necessary.)
196 		 */
197 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
198 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
199 		if (!TupIsNull(node->ms_slots[i]))
200 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
201 		else
202 			(void) binaryheap_remove_first(node->ms_heap);
203 	}
204 
205 	if (binaryheap_empty(node->ms_heap))
206 	{
207 		/* All the subplans are exhausted, and so is the heap */
208 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
209 	}
210 	else
211 	{
212 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
213 		result = node->ms_slots[i];
214 	}
215 
216 	return result;
217 }
218 
219 /*
220  * Compare the tuples in the two given slots.
221  */
222 static int32
heap_compare_slots(Datum a,Datum b,void * arg)223 heap_compare_slots(Datum a, Datum b, void *arg)
224 {
225 	MergeAppendState *node = (MergeAppendState *) arg;
226 	SlotNumber	slot1 = DatumGetInt32(a);
227 	SlotNumber	slot2 = DatumGetInt32(b);
228 
229 	TupleTableSlot *s1 = node->ms_slots[slot1];
230 	TupleTableSlot *s2 = node->ms_slots[slot2];
231 	int			nkey;
232 
233 	Assert(!TupIsNull(s1));
234 	Assert(!TupIsNull(s2));
235 
236 	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
237 	{
238 		SortSupport sortKey = node->ms_sortkeys + nkey;
239 		AttrNumber	attno = sortKey->ssup_attno;
240 		Datum		datum1,
241 					datum2;
242 		bool		isNull1,
243 					isNull2;
244 		int			compare;
245 
246 		datum1 = slot_getattr(s1, attno, &isNull1);
247 		datum2 = slot_getattr(s2, attno, &isNull2);
248 
249 		compare = ApplySortComparator(datum1, isNull1,
250 									  datum2, isNull2,
251 									  sortKey);
252 		if (compare != 0)
253 		{
254 			INVERT_COMPARE_RESULT(compare);
255 			return compare;
256 		}
257 	}
258 	return 0;
259 }
260 
261 /* ----------------------------------------------------------------
262  *		ExecEndMergeAppend
263  *
264  *		Shuts down the subscans of the MergeAppend node.
265  *
266  *		Returns nothing of interest.
267  * ----------------------------------------------------------------
268  */
269 void
ExecEndMergeAppend(MergeAppendState * node)270 ExecEndMergeAppend(MergeAppendState *node)
271 {
272 	PlanState **mergeplans;
273 	int			nplans;
274 	int			i;
275 
276 	/*
277 	 * get information from the node
278 	 */
279 	mergeplans = node->mergeplans;
280 	nplans = node->ms_nplans;
281 
282 	/*
283 	 * shut down each of the subscans
284 	 */
285 	for (i = 0; i < nplans; i++)
286 		ExecEndNode(mergeplans[i]);
287 }
288 
289 void
ExecReScanMergeAppend(MergeAppendState * node)290 ExecReScanMergeAppend(MergeAppendState *node)
291 {
292 	int			i;
293 
294 	for (i = 0; i < node->ms_nplans; i++)
295 	{
296 		PlanState  *subnode = node->mergeplans[i];
297 
298 		/*
299 		 * ExecReScan doesn't know about my subplans, so I have to do
300 		 * changed-parameter signaling myself.
301 		 */
302 		if (node->ps.chgParam != NULL)
303 			UpdateChangedParamSet(subnode, node->ps.chgParam);
304 
305 		/*
306 		 * If chgParam of subnode is not null then plan will be re-scanned by
307 		 * first ExecProcNode.
308 		 */
309 		if (subnode->chgParam == NULL)
310 			ExecReScan(subnode);
311 	}
312 	binaryheap_reset(node->ms_heap);
313 	node->ms_initialized = false;
314 }
315