1 /*-------------------------------------------------------------------------
2 *
3 * nodeMergeAppend.c
4 * routines to handle MergeAppend nodes.
5 *
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeMergeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /* INTERFACE ROUTINES
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
20 *
21 * NOTES
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
26 *
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
30 *
31 * ...
32 * /
33 * MergeAppend---+------+------+--- nil
34 * / \ | | |
35 * nil nil ... ... ...
36 * subplans
37 */
38
39 #include "postgres.h"
40
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43
44 #include "lib/binaryheap.h"
45
46 /*
47 * We have one slot for each item in the heap array. We use SlotNumber
48 * to store slot indexes. This doesn't actually provide any formal
49 * type-safety, but it makes the code more self-documenting.
50 */
51 typedef int32 SlotNumber;
52
53 static int heap_compare_slots(Datum a, Datum b, void *arg);
54
55
56 /* ----------------------------------------------------------------
57 * ExecInitMergeAppend
58 *
59 * Begin all of the subscans of the MergeAppend node.
60 * ----------------------------------------------------------------
61 */
62 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)63 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
64 {
65 MergeAppendState *mergestate = makeNode(MergeAppendState);
66 PlanState **mergeplanstates;
67 int nplans;
68 int i;
69 ListCell *lc;
70
71 /* check for unsupported flags */
72 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
73
74 /*
75 * Set up empty vector of subplan states
76 */
77 nplans = list_length(node->mergeplans);
78
79 mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
80
81 /*
82 * create new MergeAppendState for our node
83 */
84 mergestate->ps.plan = (Plan *) node;
85 mergestate->ps.state = estate;
86 mergestate->mergeplans = mergeplanstates;
87 mergestate->ms_nplans = nplans;
88
89 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
90 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
91 mergestate);
92
93 /*
94 * Miscellaneous initialization
95 *
96 * MergeAppend plans don't have expression contexts because they never
97 * call ExecQual or ExecProject.
98 */
99
100 /*
101 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
102 * so we have to initialize them.
103 */
104 ExecInitResultTupleSlot(estate, &mergestate->ps);
105
106 /*
107 * call ExecInitNode on each of the plans to be executed and save the
108 * results into the array "mergeplans".
109 */
110 i = 0;
111 foreach(lc, node->mergeplans)
112 {
113 Plan *initNode = (Plan *) lfirst(lc);
114
115 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
116 i++;
117 }
118
119 /*
120 * initialize output tuple type
121 */
122 ExecAssignResultTypeFromTL(&mergestate->ps);
123 mergestate->ps.ps_ProjInfo = NULL;
124
125 /*
126 * initialize sort-key information
127 */
128 mergestate->ms_nkeys = node->numCols;
129 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
130
131 for (i = 0; i < node->numCols; i++)
132 {
133 SortSupport sortKey = mergestate->ms_sortkeys + i;
134
135 sortKey->ssup_cxt = CurrentMemoryContext;
136 sortKey->ssup_collation = node->collations[i];
137 sortKey->ssup_nulls_first = node->nullsFirst[i];
138 sortKey->ssup_attno = node->sortColIdx[i];
139
140 /*
141 * It isn't feasible to perform abbreviated key conversion, since
142 * tuples are pulled into mergestate's binary heap as needed. It
143 * would likely be counter-productive to convert tuples into an
144 * abbreviated representation as they're pulled up, so opt out of that
145 * additional optimization entirely.
146 */
147 sortKey->abbreviate = false;
148
149 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
150 }
151
152 /*
153 * initialize to show we have not run the subplans yet
154 */
155 mergestate->ms_initialized = false;
156
157 return mergestate;
158 }
159
160 /* ----------------------------------------------------------------
161 * ExecMergeAppend
162 *
163 * Handles iteration over multiple subplans.
164 * ----------------------------------------------------------------
165 */
166 TupleTableSlot *
ExecMergeAppend(MergeAppendState * node)167 ExecMergeAppend(MergeAppendState *node)
168 {
169 TupleTableSlot *result;
170 SlotNumber i;
171
172 if (!node->ms_initialized)
173 {
174 /*
175 * First time through: pull the first tuple from each subplan, and set
176 * up the heap.
177 */
178 for (i = 0; i < node->ms_nplans; i++)
179 {
180 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
181 if (!TupIsNull(node->ms_slots[i]))
182 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
183 }
184 binaryheap_build(node->ms_heap);
185 node->ms_initialized = true;
186 }
187 else
188 {
189 /*
190 * Otherwise, pull the next tuple from whichever subplan we returned
191 * from last time, and reinsert the subplan index into the heap,
192 * because it might now compare differently against the existing
193 * elements of the heap. (We could perhaps simplify the logic a bit
194 * by doing this before returning from the prior call, but it's better
195 * to not pull tuples until necessary.)
196 */
197 i = DatumGetInt32(binaryheap_first(node->ms_heap));
198 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
199 if (!TupIsNull(node->ms_slots[i]))
200 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
201 else
202 (void) binaryheap_remove_first(node->ms_heap);
203 }
204
205 if (binaryheap_empty(node->ms_heap))
206 {
207 /* All the subplans are exhausted, and so is the heap */
208 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
209 }
210 else
211 {
212 i = DatumGetInt32(binaryheap_first(node->ms_heap));
213 result = node->ms_slots[i];
214 }
215
216 return result;
217 }
218
219 /*
220 * Compare the tuples in the two given slots.
221 */
222 static int32
heap_compare_slots(Datum a,Datum b,void * arg)223 heap_compare_slots(Datum a, Datum b, void *arg)
224 {
225 MergeAppendState *node = (MergeAppendState *) arg;
226 SlotNumber slot1 = DatumGetInt32(a);
227 SlotNumber slot2 = DatumGetInt32(b);
228
229 TupleTableSlot *s1 = node->ms_slots[slot1];
230 TupleTableSlot *s2 = node->ms_slots[slot2];
231 int nkey;
232
233 Assert(!TupIsNull(s1));
234 Assert(!TupIsNull(s2));
235
236 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
237 {
238 SortSupport sortKey = node->ms_sortkeys + nkey;
239 AttrNumber attno = sortKey->ssup_attno;
240 Datum datum1,
241 datum2;
242 bool isNull1,
243 isNull2;
244 int compare;
245
246 datum1 = slot_getattr(s1, attno, &isNull1);
247 datum2 = slot_getattr(s2, attno, &isNull2);
248
249 compare = ApplySortComparator(datum1, isNull1,
250 datum2, isNull2,
251 sortKey);
252 if (compare != 0)
253 {
254 INVERT_COMPARE_RESULT(compare);
255 return compare;
256 }
257 }
258 return 0;
259 }
260
261 /* ----------------------------------------------------------------
262 * ExecEndMergeAppend
263 *
264 * Shuts down the subscans of the MergeAppend node.
265 *
266 * Returns nothing of interest.
267 * ----------------------------------------------------------------
268 */
269 void
ExecEndMergeAppend(MergeAppendState * node)270 ExecEndMergeAppend(MergeAppendState *node)
271 {
272 PlanState **mergeplans;
273 int nplans;
274 int i;
275
276 /*
277 * get information from the node
278 */
279 mergeplans = node->mergeplans;
280 nplans = node->ms_nplans;
281
282 /*
283 * shut down each of the subscans
284 */
285 for (i = 0; i < nplans; i++)
286 ExecEndNode(mergeplans[i]);
287 }
288
289 void
ExecReScanMergeAppend(MergeAppendState * node)290 ExecReScanMergeAppend(MergeAppendState *node)
291 {
292 int i;
293
294 for (i = 0; i < node->ms_nplans; i++)
295 {
296 PlanState *subnode = node->mergeplans[i];
297
298 /*
299 * ExecReScan doesn't know about my subplans, so I have to do
300 * changed-parameter signaling myself.
301 */
302 if (node->ps.chgParam != NULL)
303 UpdateChangedParamSet(subnode, node->ps.chgParam);
304
305 /*
306 * If chgParam of subnode is not null then plan will be re-scanned by
307 * first ExecProcNode.
308 */
309 if (subnode->chgParam == NULL)
310 ExecReScan(subnode);
311 }
312 binaryheap_reset(node->ms_heap);
313 node->ms_initialized = false;
314 }
315