1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *	  routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitMergeAppend		- initialize the MergeAppend node
17  *		ExecMergeAppend			- retrieve the next tuple from the node
18  *		ExecEndMergeAppend		- shut down the MergeAppend node
19  *		ExecReScanMergeAppend	- rescan the MergeAppend node
20  *
21  *	 NOTES
22  *		A MergeAppend node contains a list of one or more subplans.
23  *		These are each expected to deliver tuples that are sorted according
24  *		to a common sort key.  The MergeAppend node merges these streams
25  *		to produce output sorted the same way.
26  *
27  *		MergeAppend nodes don't make use of their left and right
28  *		subtrees, rather they maintain a list of subplans so
29  *		a typical MergeAppend node looks like this in the plan tree:
30  *
31  *				   ...
32  *				   /
33  *				MergeAppend---+------+------+--- nil
34  *				/	\		  |		 |		|
35  *			  nil	nil		 ...    ...    ...
36  *								 subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43 #include "lib/binaryheap.h"
44 #include "miscadmin.h"
45 
46 /*
47  * We have one slot for each item in the heap array.  We use SlotNumber
48  * to store slot indexes.  This doesn't actually provide any formal
49  * type-safety, but it makes the code more self-documenting.
50  */
51 typedef int32 SlotNumber;
52 
53 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
54 static int	heap_compare_slots(Datum a, Datum b, void *arg);
55 
56 
57 /* ----------------------------------------------------------------
58  *		ExecInitMergeAppend
59  *
60  *		Begin all of the subscans of the MergeAppend node.
61  * ----------------------------------------------------------------
62  */
63 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)64 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 {
66 	MergeAppendState *mergestate = makeNode(MergeAppendState);
67 	PlanState **mergeplanstates;
68 	int			nplans;
69 	int			i;
70 	ListCell   *lc;
71 
72 	/* check for unsupported flags */
73 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74 
75 	/*
76 	 * Lock the non-leaf tables in the partition tree controlled by this node.
77 	 * It's a no-op for non-partitioned parent tables.
78 	 */
79 	ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
80 
81 	/*
82 	 * Set up empty vector of subplan states
83 	 */
84 	nplans = list_length(node->mergeplans);
85 
86 	mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
87 
88 	/*
89 	 * create new MergeAppendState for our node
90 	 */
91 	mergestate->ps.plan = (Plan *) node;
92 	mergestate->ps.state = estate;
93 	mergestate->ps.ExecProcNode = ExecMergeAppend;
94 	mergestate->mergeplans = mergeplanstates;
95 	mergestate->ms_nplans = nplans;
96 
97 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
98 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
99 											  mergestate);
100 
101 	/*
102 	 * Miscellaneous initialization
103 	 *
104 	 * MergeAppend plans don't have expression contexts because they never
105 	 * call ExecQual or ExecProject.
106 	 */
107 
108 	/*
109 	 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
110 	 * so we have to initialize them.
111 	 */
112 	ExecInitResultTupleSlotTL(estate, &mergestate->ps);
113 
114 	/*
115 	 * call ExecInitNode on each of the plans to be executed and save the
116 	 * results into the array "mergeplans".
117 	 */
118 	i = 0;
119 	foreach(lc, node->mergeplans)
120 	{
121 		Plan	   *initNode = (Plan *) lfirst(lc);
122 
123 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
124 		i++;
125 	}
126 
127 	mergestate->ps.ps_ProjInfo = NULL;
128 
129 	/*
130 	 * initialize sort-key information
131 	 */
132 	mergestate->ms_nkeys = node->numCols;
133 	mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
134 
135 	for (i = 0; i < node->numCols; i++)
136 	{
137 		SortSupport sortKey = mergestate->ms_sortkeys + i;
138 
139 		sortKey->ssup_cxt = CurrentMemoryContext;
140 		sortKey->ssup_collation = node->collations[i];
141 		sortKey->ssup_nulls_first = node->nullsFirst[i];
142 		sortKey->ssup_attno = node->sortColIdx[i];
143 
144 		/*
145 		 * It isn't feasible to perform abbreviated key conversion, since
146 		 * tuples are pulled into mergestate's binary heap as needed.  It
147 		 * would likely be counter-productive to convert tuples into an
148 		 * abbreviated representation as they're pulled up, so opt out of that
149 		 * additional optimization entirely.
150 		 */
151 		sortKey->abbreviate = false;
152 
153 		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
154 	}
155 
156 	/*
157 	 * initialize to show we have not run the subplans yet
158 	 */
159 	mergestate->ms_initialized = false;
160 
161 	return mergestate;
162 }
163 
164 /* ----------------------------------------------------------------
165  *	   ExecMergeAppend
166  *
167  *		Handles iteration over multiple subplans.
168  * ----------------------------------------------------------------
169  */
170 static TupleTableSlot *
ExecMergeAppend(PlanState * pstate)171 ExecMergeAppend(PlanState *pstate)
172 {
173 	MergeAppendState *node = castNode(MergeAppendState, pstate);
174 	TupleTableSlot *result;
175 	SlotNumber	i;
176 
177 	CHECK_FOR_INTERRUPTS();
178 
179 	if (!node->ms_initialized)
180 	{
181 		/*
182 		 * First time through: pull the first tuple from each subplan, and set
183 		 * up the heap.
184 		 */
185 		for (i = 0; i < node->ms_nplans; i++)
186 		{
187 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
188 			if (!TupIsNull(node->ms_slots[i]))
189 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
190 		}
191 		binaryheap_build(node->ms_heap);
192 		node->ms_initialized = true;
193 	}
194 	else
195 	{
196 		/*
197 		 * Otherwise, pull the next tuple from whichever subplan we returned
198 		 * from last time, and reinsert the subplan index into the heap,
199 		 * because it might now compare differently against the existing
200 		 * elements of the heap.  (We could perhaps simplify the logic a bit
201 		 * by doing this before returning from the prior call, but it's better
202 		 * to not pull tuples until necessary.)
203 		 */
204 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
205 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
206 		if (!TupIsNull(node->ms_slots[i]))
207 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
208 		else
209 			(void) binaryheap_remove_first(node->ms_heap);
210 	}
211 
212 	if (binaryheap_empty(node->ms_heap))
213 	{
214 		/* All the subplans are exhausted, and so is the heap */
215 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
216 	}
217 	else
218 	{
219 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
220 		result = node->ms_slots[i];
221 	}
222 
223 	return result;
224 }
225 
226 /*
227  * Compare the tuples in the two given slots.
228  */
229 static int32
heap_compare_slots(Datum a,Datum b,void * arg)230 heap_compare_slots(Datum a, Datum b, void *arg)
231 {
232 	MergeAppendState *node = (MergeAppendState *) arg;
233 	SlotNumber	slot1 = DatumGetInt32(a);
234 	SlotNumber	slot2 = DatumGetInt32(b);
235 
236 	TupleTableSlot *s1 = node->ms_slots[slot1];
237 	TupleTableSlot *s2 = node->ms_slots[slot2];
238 	int			nkey;
239 
240 	Assert(!TupIsNull(s1));
241 	Assert(!TupIsNull(s2));
242 
243 	for (nkey = 0; nkey < node->ms_nkeys; nkey++)
244 	{
245 		SortSupport sortKey = node->ms_sortkeys + nkey;
246 		AttrNumber	attno = sortKey->ssup_attno;
247 		Datum		datum1,
248 					datum2;
249 		bool		isNull1,
250 					isNull2;
251 		int			compare;
252 
253 		datum1 = slot_getattr(s1, attno, &isNull1);
254 		datum2 = slot_getattr(s2, attno, &isNull2);
255 
256 		compare = ApplySortComparator(datum1, isNull1,
257 									  datum2, isNull2,
258 									  sortKey);
259 		if (compare != 0)
260 		{
261 			INVERT_COMPARE_RESULT(compare);
262 			return compare;
263 		}
264 	}
265 	return 0;
266 }
267 
268 /* ----------------------------------------------------------------
269  *		ExecEndMergeAppend
270  *
271  *		Shuts down the subscans of the MergeAppend node.
272  *
273  *		Returns nothing of interest.
274  * ----------------------------------------------------------------
275  */
276 void
ExecEndMergeAppend(MergeAppendState * node)277 ExecEndMergeAppend(MergeAppendState *node)
278 {
279 	PlanState **mergeplans;
280 	int			nplans;
281 	int			i;
282 
283 	/*
284 	 * get information from the node
285 	 */
286 	mergeplans = node->mergeplans;
287 	nplans = node->ms_nplans;
288 
289 	/*
290 	 * shut down each of the subscans
291 	 */
292 	for (i = 0; i < nplans; i++)
293 		ExecEndNode(mergeplans[i]);
294 }
295 
296 void
ExecReScanMergeAppend(MergeAppendState * node)297 ExecReScanMergeAppend(MergeAppendState *node)
298 {
299 	int			i;
300 
301 	for (i = 0; i < node->ms_nplans; i++)
302 	{
303 		PlanState  *subnode = node->mergeplans[i];
304 
305 		/*
306 		 * ExecReScan doesn't know about my subplans, so I have to do
307 		 * changed-parameter signaling myself.
308 		 */
309 		if (node->ps.chgParam != NULL)
310 			UpdateChangedParamSet(subnode, node->ps.chgParam);
311 
312 		/*
313 		 * If chgParam of subnode is not null then plan will be re-scanned by
314 		 * first ExecProcNode.
315 		 */
316 		if (subnode->chgParam == NULL)
317 			ExecReScan(subnode);
318 	}
319 	binaryheap_reset(node->ms_heap);
320 	node->ms_initialized = false;
321 }
322