1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *	  routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitMergeAppend		- initialize the MergeAppend node
17  *		ExecMergeAppend			- retrieve the next tuple from the node
18  *		ExecEndMergeAppend		- shut down the MergeAppend node
19  *		ExecReScanMergeAppend	- rescan the MergeAppend node
20  *
21  *	 NOTES
22  *		A MergeAppend node contains a list of one or more subplans.
23  *		These are each expected to deliver tuples that are sorted according
24  *		to a common sort key.  The MergeAppend node merges these streams
25  *		to produce output sorted the same way.
26  *
27  *		MergeAppend nodes don't make use of their left and right
28  *		subtrees, rather they maintain a list of subplans so
29  *		a typical MergeAppend node looks like this in the plan tree:
30  *
31  *				   ...
32  *				   /
33  *				MergeAppend---+------+------+--- nil
34  *				/	\		  |		 |		|
35  *			  nil	nil		 ...    ...    ...
36  *								 subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43 #include "lib/binaryheap.h"
44 #include "miscadmin.h"
45 
46 /*
47  * We have one slot for each item in the heap array.  We use SlotNumber
48  * to store slot indexes.  This doesn't actually provide any formal
49  * type-safety, but it makes the code more self-documenting.
50  */
51 typedef int32 SlotNumber;
52 
53 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
54 static int	heap_compare_slots(Datum a, Datum b, void *arg);
55 
56 
57 /* ----------------------------------------------------------------
58  *		ExecInitMergeAppend
59  *
60  *		Begin all of the subscans of the MergeAppend node.
61  * ----------------------------------------------------------------
62  */
63 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)64 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 {
66 	MergeAppendState *mergestate = makeNode(MergeAppendState);
67 	PlanState **mergeplanstates;
68 	int			nplans;
69 	int			i;
70 	ListCell   *lc;
71 
72 	/* check for unsupported flags */
73 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74 
75 	/*
76 	 * Lock the non-leaf tables in the partition tree controlled by this node.
77 	 * It's a no-op for non-partitioned parent tables.
78 	 */
79 	ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
80 
81 	/*
82 	 * Set up empty vector of subplan states
83 	 */
84 	nplans = list_length(node->mergeplans);
85 
86 	mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
87 
88 	/*
89 	 * create new MergeAppendState for our node
90 	 */
91 	mergestate->ps.plan = (Plan *) node;
92 	mergestate->ps.state = estate;
93 	mergestate->ps.ExecProcNode = ExecMergeAppend;
94 	mergestate->mergeplans = mergeplanstates;
95 	mergestate->ms_nplans = nplans;
96 
97 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
98 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
99 											  mergestate);
100 
101 	/*
102 	 * Miscellaneous initialization
103 	 *
104 	 * MergeAppend plans don't have expression contexts because they never
105 	 * call ExecQual or ExecProject.
106 	 */
107 
108 	/*
109 	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
110 	 * so we have to initialize them.
111 	 */
112 	ExecInitResultTupleSlot(estate, &mergestate->ps);
113 
114 	/*
115 	 * call ExecInitNode on each of the plans to be executed and save the
116 	 * results into the array "mergeplans".
117 	 */
118 	i = 0;
119 	foreach(lc, node->mergeplans)
120 	{
121 		Plan	   *initNode = (Plan *) lfirst(lc);
122 
123 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
124 		i++;
125 	}
126 
127 	/*
128 	 * initialize output tuple type
129 	 */
130 	ExecAssignResultTypeFromTL(&mergestate->ps);
131 	mergestate->ps.ps_ProjInfo = NULL;
132 
133 	/*
134 	 * initialize sort-key information
135 	 */
136 	mergestate->ms_nkeys = node->numCols;
137 	mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
138 
139 	for (i = 0; i < node->numCols; i++)
140 	{
141 		SortSupport sortKey = mergestate->ms_sortkeys + i;
142 
143 		sortKey->ssup_cxt = CurrentMemoryContext;
144 		sortKey->ssup_collation = node->collations[i];
145 		sortKey->ssup_nulls_first = node->nullsFirst[i];
146 		sortKey->ssup_attno = node->sortColIdx[i];
147 
148 		/*
149 		 * It isn't feasible to perform abbreviated key conversion, since
150 		 * tuples are pulled into mergestate's binary heap as needed.  It
151 		 * would likely be counter-productive to convert tuples into an
152 		 * abbreviated representation as they're pulled up, so opt out of that
153 		 * additional optimization entirely.
154 		 */
155 		sortKey->abbreviate = false;
156 
157 		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
158 	}
159 
160 	/*
161 	 * initialize to show we have not run the subplans yet
162 	 */
163 	mergestate->ms_initialized = false;
164 
165 	return mergestate;
166 }
167 
168 /* ----------------------------------------------------------------
169  *	   ExecMergeAppend
170  *
171  *		Handles iteration over multiple subplans.
172  * ----------------------------------------------------------------
173  */
174 static TupleTableSlot *
ExecMergeAppend(PlanState * pstate)175 ExecMergeAppend(PlanState *pstate)
176 {
177 	MergeAppendState *node = castNode(MergeAppendState, pstate);
178 	TupleTableSlot *result;
179 	SlotNumber	i;
180 
181 	CHECK_FOR_INTERRUPTS();
182 
183 	if (!node->ms_initialized)
184 	{
185 		/*
186 		 * First time through: pull the first tuple from each subplan, and set
187 		 * up the heap.
188 		 */
189 		for (i = 0; i < node->ms_nplans; i++)
190 		{
191 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
192 			if (!TupIsNull(node->ms_slots[i]))
193 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
194 		}
195 		binaryheap_build(node->ms_heap);
196 		node->ms_initialized = true;
197 	}
198 	else
199 	{
200 		/*
201 		 * Otherwise, pull the next tuple from whichever subplan we returned
202 		 * from last time, and reinsert the subplan index into the heap,
203 		 * because it might now compare differently against the existing
204 		 * elements of the heap.  (We could perhaps simplify the logic a bit
205 		 * by doing this before returning from the prior call, but it's better
206 		 * to not pull tuples until necessary.)
207 		 */
208 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
209 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
210 		if (!TupIsNull(node->ms_slots[i]))
211 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
212 		else
213 			(void) binaryheap_remove_first(node->ms_heap);
214 	}
215 
216 	if (binaryheap_empty(node->ms_heap))
217 	{
218 		/* All the subplans are exhausted, and so is the heap */
219 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
220 	}
221 	else
222 	{
223 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
224 		result = node->ms_slots[i];
225 	}
226 
227 	return result;
228 }
229 
230 /*
231  * Compare the tuples in the two given slots.
232  */
233 static int32
heap_compare_slots(Datum a,Datum b,void * arg)234 heap_compare_slots(Datum a, Datum b, void *arg)
235 {
236 	MergeAppendState *node = (MergeAppendState *) arg;
237 	SlotNumber	slot1 = DatumGetInt32(a);
238 	SlotNumber	slot2 = DatumGetInt32(b);
239 
240 	TupleTableSlot *s1 = node->ms_slots[slot1];
241 	TupleTableSlot *s2 = node->ms_slots[slot2];
242 	int			nkey;
243 
244 	Assert(!TupIsNull(s1));
245 	Assert(!TupIsNull(s2));
246 
247 	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
248 	{
249 		SortSupport sortKey = node->ms_sortkeys + nkey;
250 		AttrNumber	attno = sortKey->ssup_attno;
251 		Datum		datum1,
252 					datum2;
253 		bool		isNull1,
254 					isNull2;
255 		int			compare;
256 
257 		datum1 = slot_getattr(s1, attno, &isNull1);
258 		datum2 = slot_getattr(s2, attno, &isNull2);
259 
260 		compare = ApplySortComparator(datum1, isNull1,
261 									  datum2, isNull2,
262 									  sortKey);
263 		if (compare != 0)
264 		{
265 			INVERT_COMPARE_RESULT(compare);
266 			return compare;
267 		}
268 	}
269 	return 0;
270 }
271 
272 /* ----------------------------------------------------------------
273  *		ExecEndMergeAppend
274  *
275  *		Shuts down the subscans of the MergeAppend node.
276  *
277  *		Returns nothing of interest.
278  * ----------------------------------------------------------------
279  */
280 void
ExecEndMergeAppend(MergeAppendState * node)281 ExecEndMergeAppend(MergeAppendState *node)
282 {
283 	PlanState **mergeplans;
284 	int			nplans;
285 	int			i;
286 
287 	/*
288 	 * get information from the node
289 	 */
290 	mergeplans = node->mergeplans;
291 	nplans = node->ms_nplans;
292 
293 	/*
294 	 * shut down each of the subscans
295 	 */
296 	for (i = 0; i < nplans; i++)
297 		ExecEndNode(mergeplans[i]);
298 }
299 
300 void
ExecReScanMergeAppend(MergeAppendState * node)301 ExecReScanMergeAppend(MergeAppendState *node)
302 {
303 	int			i;
304 
305 	for (i = 0; i < node->ms_nplans; i++)
306 	{
307 		PlanState  *subnode = node->mergeplans[i];
308 
309 		/*
310 		 * ExecReScan doesn't know about my subplans, so I have to do
311 		 * changed-parameter signaling myself.
312 		 */
313 		if (node->ps.chgParam != NULL)
314 			UpdateChangedParamSet(subnode, node->ps.chgParam);
315 
316 		/*
317 		 * If chgParam of subnode is not null then plan will be re-scanned by
318 		 * first ExecProcNode.
319 		 */
320 		if (subnode->chgParam == NULL)
321 			ExecReScan(subnode);
322 	}
323 	binaryheap_reset(node->ms_heap);
324 	node->ms_initialized = false;
325 }
326