1 /*-------------------------------------------------------------------------
2 *
3 * nodeMergeAppend.c
4 * routines to handle MergeAppend nodes.
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeMergeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /* INTERFACE ROUTINES
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
20 *
21 * NOTES
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
26 *
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
30 *
31 * ...
32 * /
33 * MergeAppend---+------+------+--- nil
34 * / \ | | |
35 * nil nil ... ... ...
36 * subplans
37 */
38
39 #include "postgres.h"
40
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43 #include "lib/binaryheap.h"
44 #include "miscadmin.h"
45
46 /*
47 * We have one slot for each item in the heap array. We use SlotNumber
48 * to store slot indexes. This doesn't actually provide any formal
49 * type-safety, but it makes the code more self-documenting.
50 */
51 typedef int32 SlotNumber;
52
53 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
54 static int heap_compare_slots(Datum a, Datum b, void *arg);
55
56
57 /* ----------------------------------------------------------------
58 * ExecInitMergeAppend
59 *
60 * Begin all of the subscans of the MergeAppend node.
61 * ----------------------------------------------------------------
62 */
63 MergeAppendState *
ExecInitMergeAppend(MergeAppend * node,EState * estate,int eflags)64 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 {
66 MergeAppendState *mergestate = makeNode(MergeAppendState);
67 PlanState **mergeplanstates;
68 int nplans;
69 int i;
70 ListCell *lc;
71
72 /* check for unsupported flags */
73 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74
75 /*
76 * Lock the non-leaf tables in the partition tree controlled by this node.
77 * It's a no-op for non-partitioned parent tables.
78 */
79 ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
80
81 /*
82 * Set up empty vector of subplan states
83 */
84 nplans = list_length(node->mergeplans);
85
86 mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
87
88 /*
89 * create new MergeAppendState for our node
90 */
91 mergestate->ps.plan = (Plan *) node;
92 mergestate->ps.state = estate;
93 mergestate->ps.ExecProcNode = ExecMergeAppend;
94 mergestate->mergeplans = mergeplanstates;
95 mergestate->ms_nplans = nplans;
96
97 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
98 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
99 mergestate);
100
101 /*
102 * Miscellaneous initialization
103 *
104 * MergeAppend plans don't have expression contexts because they never
105 * call ExecQual or ExecProject.
106 */
107
108 /*
109 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
110 * so we have to initialize them.
111 */
112 ExecInitResultTupleSlot(estate, &mergestate->ps);
113
114 /*
115 * call ExecInitNode on each of the plans to be executed and save the
116 * results into the array "mergeplans".
117 */
118 i = 0;
119 foreach(lc, node->mergeplans)
120 {
121 Plan *initNode = (Plan *) lfirst(lc);
122
123 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
124 i++;
125 }
126
127 /*
128 * initialize output tuple type
129 */
130 ExecAssignResultTypeFromTL(&mergestate->ps);
131 mergestate->ps.ps_ProjInfo = NULL;
132
133 /*
134 * initialize sort-key information
135 */
136 mergestate->ms_nkeys = node->numCols;
137 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
138
139 for (i = 0; i < node->numCols; i++)
140 {
141 SortSupport sortKey = mergestate->ms_sortkeys + i;
142
143 sortKey->ssup_cxt = CurrentMemoryContext;
144 sortKey->ssup_collation = node->collations[i];
145 sortKey->ssup_nulls_first = node->nullsFirst[i];
146 sortKey->ssup_attno = node->sortColIdx[i];
147
148 /*
149 * It isn't feasible to perform abbreviated key conversion, since
150 * tuples are pulled into mergestate's binary heap as needed. It
151 * would likely be counter-productive to convert tuples into an
152 * abbreviated representation as they're pulled up, so opt out of that
153 * additional optimization entirely.
154 */
155 sortKey->abbreviate = false;
156
157 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
158 }
159
160 /*
161 * initialize to show we have not run the subplans yet
162 */
163 mergestate->ms_initialized = false;
164
165 return mergestate;
166 }
167
168 /* ----------------------------------------------------------------
169 * ExecMergeAppend
170 *
171 * Handles iteration over multiple subplans.
172 * ----------------------------------------------------------------
173 */
174 static TupleTableSlot *
ExecMergeAppend(PlanState * pstate)175 ExecMergeAppend(PlanState *pstate)
176 {
177 MergeAppendState *node = castNode(MergeAppendState, pstate);
178 TupleTableSlot *result;
179 SlotNumber i;
180
181 CHECK_FOR_INTERRUPTS();
182
183 if (!node->ms_initialized)
184 {
185 /*
186 * First time through: pull the first tuple from each subplan, and set
187 * up the heap.
188 */
189 for (i = 0; i < node->ms_nplans; i++)
190 {
191 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
192 if (!TupIsNull(node->ms_slots[i]))
193 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
194 }
195 binaryheap_build(node->ms_heap);
196 node->ms_initialized = true;
197 }
198 else
199 {
200 /*
201 * Otherwise, pull the next tuple from whichever subplan we returned
202 * from last time, and reinsert the subplan index into the heap,
203 * because it might now compare differently against the existing
204 * elements of the heap. (We could perhaps simplify the logic a bit
205 * by doing this before returning from the prior call, but it's better
206 * to not pull tuples until necessary.)
207 */
208 i = DatumGetInt32(binaryheap_first(node->ms_heap));
209 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
210 if (!TupIsNull(node->ms_slots[i]))
211 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
212 else
213 (void) binaryheap_remove_first(node->ms_heap);
214 }
215
216 if (binaryheap_empty(node->ms_heap))
217 {
218 /* All the subplans are exhausted, and so is the heap */
219 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
220 }
221 else
222 {
223 i = DatumGetInt32(binaryheap_first(node->ms_heap));
224 result = node->ms_slots[i];
225 }
226
227 return result;
228 }
229
230 /*
231 * Compare the tuples in the two given slots.
232 */
233 static int32
heap_compare_slots(Datum a,Datum b,void * arg)234 heap_compare_slots(Datum a, Datum b, void *arg)
235 {
236 MergeAppendState *node = (MergeAppendState *) arg;
237 SlotNumber slot1 = DatumGetInt32(a);
238 SlotNumber slot2 = DatumGetInt32(b);
239
240 TupleTableSlot *s1 = node->ms_slots[slot1];
241 TupleTableSlot *s2 = node->ms_slots[slot2];
242 int nkey;
243
244 Assert(!TupIsNull(s1));
245 Assert(!TupIsNull(s2));
246
247 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
248 {
249 SortSupport sortKey = node->ms_sortkeys + nkey;
250 AttrNumber attno = sortKey->ssup_attno;
251 Datum datum1,
252 datum2;
253 bool isNull1,
254 isNull2;
255 int compare;
256
257 datum1 = slot_getattr(s1, attno, &isNull1);
258 datum2 = slot_getattr(s2, attno, &isNull2);
259
260 compare = ApplySortComparator(datum1, isNull1,
261 datum2, isNull2,
262 sortKey);
263 if (compare != 0)
264 {
265 INVERT_COMPARE_RESULT(compare);
266 return compare;
267 }
268 }
269 return 0;
270 }
271
272 /* ----------------------------------------------------------------
273 * ExecEndMergeAppend
274 *
275 * Shuts down the subscans of the MergeAppend node.
276 *
277 * Returns nothing of interest.
278 * ----------------------------------------------------------------
279 */
280 void
ExecEndMergeAppend(MergeAppendState * node)281 ExecEndMergeAppend(MergeAppendState *node)
282 {
283 PlanState **mergeplans;
284 int nplans;
285 int i;
286
287 /*
288 * get information from the node
289 */
290 mergeplans = node->mergeplans;
291 nplans = node->ms_nplans;
292
293 /*
294 * shut down each of the subscans
295 */
296 for (i = 0; i < nplans; i++)
297 ExecEndNode(mergeplans[i]);
298 }
299
300 void
ExecReScanMergeAppend(MergeAppendState * node)301 ExecReScanMergeAppend(MergeAppendState *node)
302 {
303 int i;
304
305 for (i = 0; i < node->ms_nplans; i++)
306 {
307 PlanState *subnode = node->mergeplans[i];
308
309 /*
310 * ExecReScan doesn't know about my subplans, so I have to do
311 * changed-parameter signaling myself.
312 */
313 if (node->ps.chgParam != NULL)
314 UpdateChangedParamSet(subnode, node->ps.chgParam);
315
316 /*
317 * If chgParam of subnode is not null then plan will be re-scanned by
318 * first ExecProcNode.
319 */
320 if (subnode->chgParam == NULL)
321 ExecReScan(subnode);
322 }
323 binaryheap_reset(node->ms_heap);
324 node->ms_initialized = false;
325 }
326