1 #include "aggregate_plan.h"
2 #include "reducer.h"
3 #include "expr/expression.h"
4 #include <commands.h>
5 #include <util/arr.h>
6 #include <ctype.h>
7
steptypeToString(PLN_StepType type)8 static const char *steptypeToString(PLN_StepType type) {
9 switch (type) {
10 case PLN_T_APPLY:
11 return "APPLY";
12 case PLN_T_FILTER:
13 return "FILTER";
14 case PLN_T_ARRANGE:
15 return "LIMIT/MAX/SORTBY";
16 case PLN_T_ROOT:
17 return "<ROOT>";
18 case PLN_T_GROUP:
19 return "GROUPBY";
20 case PLN_T_LOAD:
21 return "LOAD";
22 case PLN_T_DISTRIBUTE:
23 return "DISTRIBUTE";
24 case PLN_T_INVALID:
25 default:
26 return "<UNKNOWN>";
27 }
28 }
29
30 /* add a step to the plan at its end (before the dummy tail) */
AGPLN_AddStep(AGGPlan * plan,PLN_BaseStep * step)31 void AGPLN_AddStep(AGGPlan *plan, PLN_BaseStep *step) {
32 assert(step->type > PLN_T_INVALID);
33 dllist_append(&plan->steps, &step->llnodePln);
34 plan->steptypes |= (1 << (step->type - 1));
35 }
36
AGPLN_HasStep(const AGGPlan * pln,PLN_StepType t)37 int AGPLN_HasStep(const AGGPlan *pln, PLN_StepType t) {
38 return (pln->steptypes & (1 << (t - 1)));
39 }
40
AGPLN_AddBefore(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)41 void AGPLN_AddBefore(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
42 assert(newstp->type > PLN_T_INVALID);
43 if (posstp == NULL || DLLIST_IS_FIRST(&pln->steps, &posstp->llnodePln)) {
44 dllist_prepend(&pln->steps, &posstp->llnodePln);
45 } else {
46 dllist_insert(posstp->llnodePln.prev, &posstp->llnodePln, &newstp->llnodePln);
47 }
48 }
49
AGPLN_AddAfter(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)50 void AGPLN_AddAfter(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
51 assert(newstp->type > PLN_T_INVALID);
52 if (posstp == NULL || DLLIST_IS_LAST(&pln->steps, &posstp->llnodePln)) {
53 AGPLN_AddStep(pln, newstp);
54 } else {
55 dllist_insert(&posstp->llnodePln, posstp->llnodePln.next, &newstp->llnodePln);
56 }
57 }
58
AGPLN_Prepend(AGGPlan * pln,PLN_BaseStep * newstp)59 void AGPLN_Prepend(AGGPlan *pln, PLN_BaseStep *newstp) {
60 dllist_prepend(&pln->steps, &newstp->llnodePln);
61 }
62
AGPLN_PopStep(AGGPlan * pln,PLN_BaseStep * step)63 void AGPLN_PopStep(AGGPlan *pln, PLN_BaseStep *step) {
64 dllist_delete(&step->llnodePln);
65 (void)pln;
66 }
67
rootStepDtor(PLN_BaseStep * bstp)68 static void rootStepDtor(PLN_BaseStep *bstp) {
69 PLN_FirstStep *fstp = (PLN_FirstStep *)bstp;
70 RLookup_Cleanup(&fstp->lookup);
71 }
rootStepLookup(PLN_BaseStep * bstp)72 static RLookup *rootStepLookup(PLN_BaseStep *bstp) {
73 return &((PLN_FirstStep *)bstp)->lookup;
74 }
75
AGPLN_Init(AGGPlan * plan)76 void AGPLN_Init(AGGPlan *plan) {
77 memset(plan, 0, sizeof *plan);
78 dllist_init(&plan->steps);
79 dllist_append(&plan->steps, &plan->firstStep_s.base.llnodePln);
80 plan->firstStep_s.base.type = PLN_T_ROOT;
81 plan->firstStep_s.base.dtor = rootStepDtor;
82 plan->firstStep_s.base.getLookup = rootStepLookup;
83 }
84
lookupFromNode(const DLLIST_node * nn)85 static RLookup *lookupFromNode(const DLLIST_node *nn) {
86 PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
87 if (stp->getLookup) {
88 return stp->getLookup(stp);
89 } else {
90 return NULL;
91 }
92 }
93
AGPLN_FindStep(const AGGPlan * pln,const PLN_BaseStep * begin,const PLN_BaseStep * end,PLN_StepType type)94 const PLN_BaseStep *AGPLN_FindStep(const AGGPlan *pln, const PLN_BaseStep *begin,
95 const PLN_BaseStep *end, PLN_StepType type) {
96 if (!begin) {
97 begin = DLLIST_ITEM(pln->steps.next, PLN_BaseStep, llnodePln);
98 }
99 if (!end) {
100 end = DLLIST_ITEM(&pln->steps, PLN_BaseStep, llnodePln);
101 }
102 for (const PLN_BaseStep *bstp = begin; bstp != end;
103 bstp = DLLIST_ITEM(bstp->llnodePln.next, PLN_BaseStep, llnodePln)) {
104 if (bstp->type == type) {
105 return bstp;
106 }
107 if (type == PLANTYPE_ANY_REDUCER && PLN_IsReduce(bstp)) {
108 return bstp;
109 }
110 }
111 return NULL;
112 }
113
arrangeDtor(PLN_BaseStep * bstp)114 static void arrangeDtor(PLN_BaseStep *bstp) {
115 PLN_ArrangeStep *astp = (PLN_ArrangeStep *)bstp;
116 if (astp->sortKeys) {
117 array_free(astp->sortKeys);
118 }
119 rm_free(astp->sortkeysLK);
120 rm_free(bstp);
121 }
122
AGPLN_GetArrangeStep(AGGPlan * pln)123 PLN_ArrangeStep *AGPLN_GetArrangeStep(AGGPlan *pln) {
124 // Go backwards.. and stop at the cutoff
125 for (const DLLIST_node *nn = pln->steps.prev; nn != &pln->steps; nn = nn->prev) {
126 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
127 if (PLN_IsReduce(stp)) {
128 break;
129 } else if (stp->type == PLN_T_ARRANGE) {
130 return (PLN_ArrangeStep *)stp;
131 }
132 }
133 return NULL;
134 }
135
AGPLN_GetOrCreateArrangeStep(AGGPlan * pln)136 PLN_ArrangeStep *AGPLN_GetOrCreateArrangeStep(AGGPlan *pln) {
137 PLN_ArrangeStep *ret = AGPLN_GetArrangeStep(pln);
138 if (ret) {
139 return ret;
140 }
141 ret = rm_calloc(1, sizeof(*ret));
142 ret->base.type = PLN_T_ARRANGE;
143 ret->base.dtor = arrangeDtor;
144 AGPLN_AddStep(pln, &ret->base);
145 return ret;
146 }
147
AGPLN_GetLookup(const AGGPlan * pln,const PLN_BaseStep * bstp,AGPLNGetLookupMode mode)148 RLookup *AGPLN_GetLookup(const AGGPlan *pln, const PLN_BaseStep *bstp, AGPLNGetLookupMode mode) {
149 const DLLIST_node *first = NULL, *last = NULL;
150 int isReverse = 0;
151
152 switch (mode) {
153 case AGPLN_GETLOOKUP_FIRST:
154 first = pln->steps.next;
155 last = bstp ? &bstp->llnodePln : &pln->steps;
156 break;
157 case AGPLN_GETLOOKUP_PREV:
158 first = &pln->steps;
159 last = bstp->llnodePln.prev;
160 isReverse = 1;
161 break;
162 case AGPLN_GETLOOKUP_NEXT:
163 first = bstp->llnodePln.next;
164 last = &pln->steps;
165 break;
166 case AGPLN_GETLOOKUP_LAST:
167 first = bstp ? &bstp->llnodePln : &pln->steps;
168 last = pln->steps.prev;
169 isReverse = 1;
170 }
171
172 if (isReverse) {
173 for (const DLLIST_node *nn = last; nn && nn != first; nn = nn->prev) {
174 RLookup *lk = lookupFromNode(nn);
175 if (lk) {
176 return lk;
177 }
178 }
179 } else {
180 for (const DLLIST_node *nn = first; nn && nn != last; nn = nn->next) {
181 RLookup *lk = lookupFromNode(nn);
182 if (lk) {
183 return lk;
184 }
185 }
186 return NULL;
187 }
188 return NULL;
189 }
190
AGPLN_FreeSteps(AGGPlan * pln)191 void AGPLN_FreeSteps(AGGPlan *pln) {
192 DLLIST_node *nn = pln->steps.next;
193 while (nn && nn != &pln->steps) {
194 PLN_BaseStep *bstp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
195 nn = nn->next;
196 if (bstp->dtor) {
197 bstp->dtor(bstp);
198 }
199 }
200 }
201
AGPLN_Dump(const AGGPlan * pln)202 void AGPLN_Dump(const AGGPlan *pln) {
203 for (const DLLIST_node *nn = pln->steps.next; nn && nn != &pln->steps; nn = nn->next) {
204 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
205 printf("STEP: [T=%s. P=%p]\n", steptypeToString(stp->type), stp);
206 const RLookup *lk = lookupFromNode(nn);
207 if (lk) {
208 printf(" NEW LOOKUP: %p\n", lk);
209 for (const RLookupKey *kk = lk->head; kk; kk = kk->next) {
210 printf(" %s @%p: FLAGS=0x%x\n", kk->name, kk, kk->flags);
211 }
212 }
213
214 switch (stp->type) {
215 case PLN_T_APPLY:
216 case PLN_T_FILTER:
217 printf(" EXPR:%s\n", ((PLN_MapFilterStep *)stp)->rawExpr);
218 if (stp->alias) {
219 printf(" AS:%s\n", stp->alias);
220 }
221 break;
222 case PLN_T_ARRANGE: {
223 const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
224 if (astp->offset || astp->limit) {
225 printf(" OFFSET:%lu LIMIT:%lu\n", (unsigned long)astp->offset,
226 (unsigned long)astp->limit);
227 }
228 if (astp->sortKeys) {
229 printf(" SORT:\n");
230 for (size_t ii = 0; ii < array_len(astp->sortKeys); ++ii) {
231 const char *dir = SORTASCMAP_GETASC(astp->sortAscMap, ii) ? "ASC" : "DESC";
232 printf(" %s:%s\n", astp->sortKeys[ii], dir);
233 }
234 }
235 break;
236 }
237 case PLN_T_LOAD: {
238 const PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
239 for (size_t ii = 0; ii < lstp->args.argc; ++ii) {
240 printf(" %s\n", (char *)lstp->args.objs[ii]);
241 }
242 break;
243 }
244 case PLN_T_GROUP: {
245 const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
246 printf(" BY:\n");
247 for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
248 printf(" %s\n", gstp->properties[ii]);
249 }
250 for (size_t ii = 0; ii < array_len(gstp->reducers); ++ii) {
251 const PLN_Reducer *r = gstp->reducers + ii;
252 printf(" REDUCE: %s AS %s\n", r->name, r->alias);
253 if (r->args.argc) {
254 printf(" ARGS:[");
255 }
256 for (size_t jj = 0; jj < r->args.argc; ++jj) {
257 printf("%s ", (char *)r->args.objs[jj]);
258 }
259 printf("]\n");
260 }
261 break;
262 }
263 case PLN_T_ROOT:
264 case PLN_T_DISTRIBUTE:
265 case PLN_T_INVALID:
266 case PLN_T__MAX:
267 break;
268 }
269 }
270 }
271
272 typedef char **myArgArray_t;
273
append_string(myArgArray_t * arr,const char * src)274 static inline void append_string(myArgArray_t *arr, const char *src) {
275 char *s = rm_strdup(src);
276 *arr = array_append(*arr, s);
277 }
append_uint(myArgArray_t * arr,unsigned long long ll)278 static inline void append_uint(myArgArray_t *arr, unsigned long long ll) {
279 char s[64] = {0};
280 sprintf(s, "%llu", ll);
281 append_string(arr, s);
282 }
append_ac(myArgArray_t * arr,const ArgsCursor * ac)283 static inline void append_ac(myArgArray_t *arr, const ArgsCursor *ac) {
284 for (size_t ii = 0; ii < ac->argc; ++ii) {
285 append_string(arr, AC_StringArg(ac, ii));
286 }
287 }
288
serializeMapFilter(myArgArray_t * arr,const PLN_BaseStep * stp)289 static void serializeMapFilter(myArgArray_t *arr, const PLN_BaseStep *stp) {
290 const PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
291 if (stp->type == PLN_T_APPLY) {
292 append_string(arr, "APPLY");
293 } else {
294 append_string(arr, "FILTER");
295 }
296 append_string(arr, mstp->rawExpr);
297 if (stp->alias) {
298 append_string(arr, "AS");
299 append_string(arr, stp->alias);
300 }
301 }
302
serializeArrange(myArgArray_t * arr,const PLN_BaseStep * stp)303 static void serializeArrange(myArgArray_t *arr, const PLN_BaseStep *stp) {
304 const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
305 if (astp->limit || astp->offset) {
306 append_string(arr, "LIMIT");
307 append_uint(arr, astp->offset);
308 append_uint(arr, astp->limit);
309 }
310 if (astp->sortKeys) {
311 size_t numsort = array_len(astp->sortKeys);
312 append_string(arr, "SORTBY");
313 append_uint(arr, numsort * 2);
314 for (size_t ii = 0; ii < numsort; ++ii) {
315 char *stmp;
316 rm_asprintf(&stmp, "@%s", astp->sortKeys[ii]);
317 *arr = array_append(*arr, stmp);
318 if (SORTASCMAP_GETASC(astp->sortAscMap, ii)) {
319 append_string(arr, "ASC");
320 } else {
321 append_string(arr, "DESC");
322 }
323 }
324 }
325 }
serializeLoad(myArgArray_t * arr,const PLN_BaseStep * stp)326 static void serializeLoad(myArgArray_t *arr, const PLN_BaseStep *stp) {
327 PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
328 if (lstp->args.argc) {
329 append_string(arr, "LOAD");
330 append_uint(arr, lstp->args.argc);
331 append_ac(arr, &lstp->args);
332 }
333 }
334
serializeGroup(myArgArray_t * arr,const PLN_BaseStep * stp)335 static void serializeGroup(myArgArray_t *arr, const PLN_BaseStep *stp) {
336 const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
337 append_string(arr, "GROUPBY");
338 append_uint(arr, gstp->nproperties);
339 for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
340 append_string(arr, gstp->properties[ii]);
341 }
342 size_t nreducers = array_len(gstp->reducers);
343 for (size_t ii = 0; ii < nreducers; ++ii) {
344 const PLN_Reducer *r = gstp->reducers + ii;
345 append_string(arr, "REDUCE");
346 append_string(arr, r->name);
347 append_uint(arr, r->args.argc);
348 append_ac(arr, &r->args);
349 if (r->alias) {
350 append_string(arr, "AS");
351 append_string(arr, r->alias);
352 }
353 }
354 }
355
AGPLN_Serialize(const AGGPlan * pln)356 array_t AGPLN_Serialize(const AGGPlan *pln) {
357 char **arr = array_new(char *, 1);
358 for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
359 const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
360 switch (stp->type) {
361 case PLN_T_APPLY:
362 case PLN_T_FILTER:
363 serializeMapFilter(&arr, stp);
364 break;
365 case PLN_T_ARRANGE:
366 serializeArrange(&arr, stp);
367 break;
368 case PLN_T_LOAD:
369 serializeLoad(&arr, stp);
370 break;
371 case PLN_T_GROUP:
372 serializeGroup(&arr, stp);
373 break;
374 case PLN_T_INVALID:
375 case PLN_T_ROOT:
376 case PLN_T_DISTRIBUTE:
377 case PLN_T__MAX:
378 break;
379 }
380 }
381 return arr;
382 }
383