1 #include "aggregate_plan.h"
2 #include "reducer.h"
3 #include "expr/expression.h"
4 #include <util/arr.h>
5 #include <ctype.h>
6
steptypeToString(PLN_StepType type)7 static const char *steptypeToString(PLN_StepType type) {
8 switch (type) {
9 case PLN_T_APPLY:
10 return "APPLY";
11 case PLN_T_FILTER:
12 return "FILTER";
13 case PLN_T_ARRANGE:
14 return "LIMIT/MAX/SORTBY";
15 case PLN_T_ROOT:
16 return "<ROOT>";
17 case PLN_T_GROUP:
18 return "GROUPBY";
19 case PLN_T_LOAD:
20 return "LOAD";
21 case PLN_T_DISTRIBUTE:
22 return "DISTRIBUTE";
23 case PLN_T_INVALID:
24 default:
25 return "<UNKNOWN>";
26 }
27 }
28
29 /* add a step to the plan at its end (before the dummy tail) */
AGPLN_AddStep(AGGPlan * plan,PLN_BaseStep * step)30 void AGPLN_AddStep(AGGPlan *plan, PLN_BaseStep *step) {
31 RS_LOG_ASSERT(step->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
32 dllist_append(&plan->steps, &step->llnodePln);
33 plan->steptypes |= (1 << (step->type - 1));
34 }
35
AGPLN_HasStep(const AGGPlan * pln,PLN_StepType t)36 int AGPLN_HasStep(const AGGPlan *pln, PLN_StepType t) {
37 return (pln->steptypes & (1 << (t - 1)));
38 }
39
AGPLN_AddBefore(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)40 void AGPLN_AddBefore(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
41 RS_LOG_ASSERT(newstp->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
42 if (posstp == NULL || DLLIST_IS_FIRST(&pln->steps, &posstp->llnodePln)) {
43 dllist_prepend(&pln->steps, &posstp->llnodePln);
44 } else {
45 dllist_insert(posstp->llnodePln.prev, &posstp->llnodePln, &newstp->llnodePln);
46 }
47 }
48
AGPLN_AddAfter(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)49 void AGPLN_AddAfter(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
50 RS_LOG_ASSERT(newstp->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
51 if (posstp == NULL || DLLIST_IS_LAST(&pln->steps, &posstp->llnodePln)) {
52 AGPLN_AddStep(pln, newstp);
53 } else {
54 dllist_insert(&posstp->llnodePln, posstp->llnodePln.next, &newstp->llnodePln);
55 }
56 }
57
AGPLN_Prepend(AGGPlan * pln,PLN_BaseStep * newstp)58 void AGPLN_Prepend(AGGPlan *pln, PLN_BaseStep *newstp) {
59 dllist_prepend(&pln->steps, &newstp->llnodePln);
60 }
61
AGPLN_PopStep(AGGPlan * pln,PLN_BaseStep * step)62 void AGPLN_PopStep(AGGPlan *pln, PLN_BaseStep *step) {
63 dllist_delete(&step->llnodePln);
64 (void)pln;
65 }
66
rootStepDtor(PLN_BaseStep * bstp)67 static void rootStepDtor(PLN_BaseStep *bstp) {
68 PLN_FirstStep *fstp = (PLN_FirstStep *)bstp;
69 RLookup_Cleanup(&fstp->lookup);
70 }
rootStepLookup(PLN_BaseStep * bstp)71 static RLookup *rootStepLookup(PLN_BaseStep *bstp) {
72 return &((PLN_FirstStep *)bstp)->lookup;
73 }
74
AGPLN_Init(AGGPlan * plan)75 void AGPLN_Init(AGGPlan *plan) {
76 memset(plan, 0, sizeof *plan);
77 dllist_init(&plan->steps);
78 dllist_append(&plan->steps, &plan->firstStep_s.base.llnodePln);
79 plan->firstStep_s.base.type = PLN_T_ROOT;
80 plan->firstStep_s.base.dtor = rootStepDtor;
81 plan->firstStep_s.base.getLookup = rootStepLookup;
82 }
83
lookupFromNode(const DLLIST_node * nn)84 static RLookup *lookupFromNode(const DLLIST_node *nn) {
85 PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
86 if (stp->getLookup) {
87 return stp->getLookup(stp);
88 } else {
89 return NULL;
90 }
91 }
92
AGPLN_FindStep(const AGGPlan * pln,const PLN_BaseStep * begin,const PLN_BaseStep * end,PLN_StepType type)93 const PLN_BaseStep *AGPLN_FindStep(const AGGPlan *pln, const PLN_BaseStep *begin,
94 const PLN_BaseStep *end, PLN_StepType type) {
95 if (!begin) {
96 begin = DLLIST_ITEM(pln->steps.next, PLN_BaseStep, llnodePln);
97 }
98 if (!end) {
99 end = DLLIST_ITEM(&pln->steps, PLN_BaseStep, llnodePln);
100 }
101 for (const PLN_BaseStep *bstp = begin; bstp != end;
102 bstp = DLLIST_ITEM(bstp->llnodePln.next, PLN_BaseStep, llnodePln)) {
103 if (bstp->type == type) {
104 return bstp;
105 }
106 if (type == PLANTYPE_ANY_REDUCER && PLN_IsReduce(bstp)) {
107 return bstp;
108 }
109 }
110 return NULL;
111 }
112
arrangeDtor(PLN_BaseStep * bstp)113 static void arrangeDtor(PLN_BaseStep *bstp) {
114 PLN_ArrangeStep *astp = (PLN_ArrangeStep *)bstp;
115 if (astp->sortKeys) {
116 array_free(astp->sortKeys);
117 }
118 rm_free(astp->sortkeysLK);
119 rm_free(bstp);
120 }
121
AGPLN_GetArrangeStep(AGGPlan * pln)122 PLN_ArrangeStep *AGPLN_GetArrangeStep(AGGPlan *pln) {
123 // Go backwards.. and stop at the cutoff
124 for (const DLLIST_node *nn = pln->steps.prev; nn != &pln->steps; nn = nn->prev) {
125 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
126 if (PLN_IsReduce(stp)) {
127 break;
128 } else if (stp->type == PLN_T_ARRANGE) {
129 return (PLN_ArrangeStep *)stp;
130 }
131 }
132 return NULL;
133 }
134
AGPLN_GetOrCreateArrangeStep(AGGPlan * pln)135 PLN_ArrangeStep *AGPLN_GetOrCreateArrangeStep(AGGPlan *pln) {
136 PLN_ArrangeStep *ret = AGPLN_GetArrangeStep(pln);
137 if (ret) {
138 return ret;
139 }
140 ret = rm_calloc(1, sizeof(*ret));
141 ret->base.type = PLN_T_ARRANGE;
142 ret->base.dtor = arrangeDtor;
143 AGPLN_AddStep(pln, &ret->base);
144 return ret;
145 }
146
AGPLN_GetLookup(const AGGPlan * pln,const PLN_BaseStep * bstp,AGPLNGetLookupMode mode)147 RLookup *AGPLN_GetLookup(const AGGPlan *pln, const PLN_BaseStep *bstp, AGPLNGetLookupMode mode) {
148 const DLLIST_node *first = NULL, *last = NULL;
149 int isReverse = 0;
150
151 switch (mode) {
152 case AGPLN_GETLOOKUP_FIRST:
153 first = pln->steps.next;
154 last = bstp ? &bstp->llnodePln : &pln->steps;
155 break;
156 case AGPLN_GETLOOKUP_PREV:
157 first = &pln->steps;
158 last = bstp->llnodePln.prev;
159 isReverse = 1;
160 break;
161 case AGPLN_GETLOOKUP_NEXT:
162 first = bstp->llnodePln.next;
163 last = &pln->steps;
164 break;
165 case AGPLN_GETLOOKUP_LAST:
166 first = bstp ? &bstp->llnodePln : &pln->steps;
167 last = pln->steps.prev;
168 isReverse = 1;
169 }
170
171 if (isReverse) {
172 for (const DLLIST_node *nn = last; nn && nn != first; nn = nn->prev) {
173 RLookup *lk = lookupFromNode(nn);
174 if (lk) {
175 return lk;
176 }
177 }
178 } else {
179 for (const DLLIST_node *nn = first; nn && nn != last; nn = nn->next) {
180 RLookup *lk = lookupFromNode(nn);
181 if (lk) {
182 return lk;
183 }
184 }
185 return NULL;
186 }
187 return NULL;
188 }
189
AGPLN_FreeSteps(AGGPlan * pln)190 void AGPLN_FreeSteps(AGGPlan *pln) {
191 DLLIST_node *nn = pln->steps.next;
192 while (nn && nn != &pln->steps) {
193 PLN_BaseStep *bstp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
194 nn = nn->next;
195 if (bstp->dtor) {
196 bstp->dtor(bstp);
197 }
198 }
199 }
200
AGPLN_Dump(const AGGPlan * pln)201 void AGPLN_Dump(const AGGPlan *pln) {
202 for (const DLLIST_node *nn = pln->steps.next; nn && nn != &pln->steps; nn = nn->next) {
203 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
204 printf("STEP: [T=%s. P=%p]\n", steptypeToString(stp->type), stp);
205 const RLookup *lk = lookupFromNode(nn);
206 if (lk) {
207 printf(" NEW LOOKUP: %p\n", lk);
208 for (const RLookupKey *kk = lk->head; kk; kk = kk->next) {
209 printf(" %s @%p: FLAGS=0x%x\n", kk->name, kk, kk->flags);
210 }
211 }
212
213 switch (stp->type) {
214 case PLN_T_APPLY:
215 case PLN_T_FILTER:
216 printf(" EXPR:%s\n", ((PLN_MapFilterStep *)stp)->rawExpr);
217 if (stp->alias) {
218 printf(" AS:%s\n", stp->alias);
219 }
220 break;
221 case PLN_T_ARRANGE: {
222 const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
223 if (astp->offset || astp->limit) {
224 printf(" OFFSET:%lu LIMIT:%lu\n", (unsigned long)astp->offset,
225 (unsigned long)astp->limit);
226 }
227 if (astp->sortKeys) {
228 printf(" SORT:\n");
229 for (size_t ii = 0; ii < array_len(astp->sortKeys); ++ii) {
230 const char *dir = SORTASCMAP_GETASC(astp->sortAscMap, ii) ? "ASC" : "DESC";
231 printf(" %s:%s\n", astp->sortKeys[ii], dir);
232 }
233 }
234 break;
235 }
236 case PLN_T_LOAD: {
237 const PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
238 for (size_t ii = 0; ii < lstp->args.argc; ++ii) {
239 printf(" %s\n", (char *)lstp->args.objs[ii]);
240 }
241 break;
242 }
243 case PLN_T_GROUP: {
244 const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
245 printf(" BY:\n");
246 for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
247 printf(" %s\n", gstp->properties[ii]);
248 }
249 for (size_t ii = 0; ii < array_len(gstp->reducers); ++ii) {
250 const PLN_Reducer *r = gstp->reducers + ii;
251 printf(" REDUCE: %s AS %s\n", r->name, r->alias);
252 if (r->args.argc) {
253 printf(" ARGS:[");
254 }
255 for (size_t jj = 0; jj < r->args.argc; ++jj) {
256 printf("%s ", (char *)r->args.objs[jj]);
257 }
258 printf("]\n");
259 }
260 break;
261 }
262 case PLN_T_ROOT:
263 case PLN_T_DISTRIBUTE:
264 case PLN_T_INVALID:
265 case PLN_T__MAX:
266 break;
267 }
268 }
269 }
270
271 typedef char **myArgArray_t;
272
append_string(myArgArray_t * arr,const char * src)273 static inline void append_string(myArgArray_t *arr, const char *src) {
274 char *s = rm_strdup(src);
275 *arr = array_append(*arr, s);
276 }
append_uint(myArgArray_t * arr,unsigned long long ll)277 static inline void append_uint(myArgArray_t *arr, unsigned long long ll) {
278 char s[64] = {0};
279 sprintf(s, "%llu", ll);
280 append_string(arr, s);
281 }
append_ac(myArgArray_t * arr,const ArgsCursor * ac)282 static inline void append_ac(myArgArray_t *arr, const ArgsCursor *ac) {
283 for (size_t ii = 0; ii < ac->argc; ++ii) {
284 append_string(arr, AC_StringArg(ac, ii));
285 }
286 }
287
serializeMapFilter(myArgArray_t * arr,const PLN_BaseStep * stp)288 static void serializeMapFilter(myArgArray_t *arr, const PLN_BaseStep *stp) {
289 const PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
290 if (stp->type == PLN_T_APPLY) {
291 append_string(arr, "APPLY");
292 } else {
293 append_string(arr, "FILTER");
294 }
295 append_string(arr, mstp->rawExpr);
296 if (stp->alias) {
297 append_string(arr, "AS");
298 append_string(arr, stp->alias);
299 }
300 }
301
serializeArrange(myArgArray_t * arr,const PLN_BaseStep * stp)302 static void serializeArrange(myArgArray_t *arr, const PLN_BaseStep *stp) {
303 const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
304 if (astp->limit || astp->offset) {
305 append_string(arr, "LIMIT");
306 append_uint(arr, 0);
307 append_uint(arr, astp->offset + astp->limit);
308 }
309 if (astp->sortKeys) {
310 size_t numsort = array_len(astp->sortKeys);
311 append_string(arr, "SORTBY");
312 append_uint(arr, numsort * 2);
313 for (size_t ii = 0; ii < numsort; ++ii) {
314 char *stmp;
315 rm_asprintf(&stmp, "@%s", astp->sortKeys[ii]);
316 *arr = array_append(*arr, stmp);
317 if (SORTASCMAP_GETASC(astp->sortAscMap, ii)) {
318 append_string(arr, "ASC");
319 } else {
320 append_string(arr, "DESC");
321 }
322 }
323 }
324 }
325
serializeLoad(myArgArray_t * arr,const PLN_BaseStep * stp)326 static void serializeLoad(myArgArray_t *arr, const PLN_BaseStep *stp) {
327 PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
328 if (lstp->args.argc) {
329 append_string(arr, "LOAD");
330 append_uint(arr, lstp->args.argc);
331 append_ac(arr, &lstp->args);
332 } else if (lstp->base.flags & PLN_F_LOAD_ALL) {
333 append_string(arr, "LOAD");
334 append_string(arr, "*");
335 }
336 }
337
serializeGroup(myArgArray_t * arr,const PLN_BaseStep * stp)338 static void serializeGroup(myArgArray_t *arr, const PLN_BaseStep *stp) {
339 const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
340 append_string(arr, "GROUPBY");
341 append_uint(arr, gstp->nproperties);
342 for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
343 append_string(arr, gstp->properties[ii]);
344 }
345 size_t nreducers = array_len(gstp->reducers);
346 for (size_t ii = 0; ii < nreducers; ++ii) {
347 const PLN_Reducer *r = gstp->reducers + ii;
348 append_string(arr, "REDUCE");
349 append_string(arr, r->name);
350 append_uint(arr, r->args.argc);
351 append_ac(arr, &r->args);
352 if (r->alias) {
353 append_string(arr, "AS");
354 append_string(arr, r->alias);
355 }
356 }
357 }
358
AGPLN_Serialize(const AGGPlan * pln)359 array_t AGPLN_Serialize(const AGGPlan *pln) {
360 char **arr = array_new(char *, 1);
361 for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
362 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
363 switch (stp->type) {
364 case PLN_T_APPLY:
365 case PLN_T_FILTER:
366 serializeMapFilter(&arr, stp);
367 break;
368 case PLN_T_ARRANGE:
369 serializeArrange(&arr, stp);
370 break;
371 case PLN_T_LOAD:
372 serializeLoad(&arr, stp);
373 break;
374 case PLN_T_GROUP:
375 serializeGroup(&arr, stp);
376 break;
377 case PLN_T_INVALID:
378 case PLN_T_ROOT:
379 case PLN_T_DISTRIBUTE:
380 case PLN_T__MAX:
381 break;
382 }
383 }
384 return arr;
385 }
386