1 #include "aggregate_plan.h"
2 #include "reducer.h"
3 #include "expr/expression.h"
4 #include <commands.h>
5 #include <util/arr.h>
6 #include <ctype.h>
7 
steptypeToString(PLN_StepType type)8 static const char *steptypeToString(PLN_StepType type) {
9   switch (type) {
10     case PLN_T_APPLY:
11       return "APPLY";
12     case PLN_T_FILTER:
13       return "FILTER";
14     case PLN_T_ARRANGE:
15       return "LIMIT/MAX/SORTBY";
16     case PLN_T_ROOT:
17       return "<ROOT>";
18     case PLN_T_GROUP:
19       return "GROUPBY";
20     case PLN_T_LOAD:
21       return "LOAD";
22     case PLN_T_DISTRIBUTE:
23       return "DISTRIBUTE";
24     case PLN_T_INVALID:
25     default:
26       return "<UNKNOWN>";
27   }
28 }
29 
30 /* add a step to the plan at its end (before the dummy tail) */
AGPLN_AddStep(AGGPlan * plan,PLN_BaseStep * step)31 void AGPLN_AddStep(AGGPlan *plan, PLN_BaseStep *step) {
32   assert(step->type > PLN_T_INVALID);
33   dllist_append(&plan->steps, &step->llnodePln);
34   plan->steptypes |= (1 << (step->type - 1));
35 }
36 
AGPLN_HasStep(const AGGPlan * pln,PLN_StepType t)37 int AGPLN_HasStep(const AGGPlan *pln, PLN_StepType t) {
38   return (pln->steptypes & (1 << (t - 1)));
39 }
40 
AGPLN_AddBefore(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)41 void AGPLN_AddBefore(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
42   assert(newstp->type > PLN_T_INVALID);
43   if (posstp == NULL || DLLIST_IS_FIRST(&pln->steps, &posstp->llnodePln)) {
44     dllist_prepend(&pln->steps, &posstp->llnodePln);
45   } else {
46     dllist_insert(posstp->llnodePln.prev, &posstp->llnodePln, &newstp->llnodePln);
47   }
48 }
49 
AGPLN_AddAfter(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)50 void AGPLN_AddAfter(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
51   assert(newstp->type > PLN_T_INVALID);
52   if (posstp == NULL || DLLIST_IS_LAST(&pln->steps, &posstp->llnodePln)) {
53     AGPLN_AddStep(pln, newstp);
54   } else {
55     dllist_insert(&posstp->llnodePln, posstp->llnodePln.next, &newstp->llnodePln);
56   }
57 }
58 
AGPLN_Prepend(AGGPlan * pln,PLN_BaseStep * newstp)59 void AGPLN_Prepend(AGGPlan *pln, PLN_BaseStep *newstp) {
60   dllist_prepend(&pln->steps, &newstp->llnodePln);
61 }
62 
AGPLN_PopStep(AGGPlan * pln,PLN_BaseStep * step)63 void AGPLN_PopStep(AGGPlan *pln, PLN_BaseStep *step) {
64   dllist_delete(&step->llnodePln);
65   (void)pln;
66 }
67 
rootStepDtor(PLN_BaseStep * bstp)68 static void rootStepDtor(PLN_BaseStep *bstp) {
69   PLN_FirstStep *fstp = (PLN_FirstStep *)bstp;
70   RLookup_Cleanup(&fstp->lookup);
71 }
rootStepLookup(PLN_BaseStep * bstp)72 static RLookup *rootStepLookup(PLN_BaseStep *bstp) {
73   return &((PLN_FirstStep *)bstp)->lookup;
74 }
75 
AGPLN_Init(AGGPlan * plan)76 void AGPLN_Init(AGGPlan *plan) {
77   memset(plan, 0, sizeof *plan);
78   dllist_init(&plan->steps);
79   dllist_append(&plan->steps, &plan->firstStep_s.base.llnodePln);
80   plan->firstStep_s.base.type = PLN_T_ROOT;
81   plan->firstStep_s.base.dtor = rootStepDtor;
82   plan->firstStep_s.base.getLookup = rootStepLookup;
83 }
84 
lookupFromNode(const DLLIST_node * nn)85 static RLookup *lookupFromNode(const DLLIST_node *nn) {
86   PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
87   if (stp->getLookup) {
88     return stp->getLookup(stp);
89   } else {
90     return NULL;
91   }
92 }
93 
AGPLN_FindStep(const AGGPlan * pln,const PLN_BaseStep * begin,const PLN_BaseStep * end,PLN_StepType type)94 const PLN_BaseStep *AGPLN_FindStep(const AGGPlan *pln, const PLN_BaseStep *begin,
95                                    const PLN_BaseStep *end, PLN_StepType type) {
96   if (!begin) {
97     begin = DLLIST_ITEM(pln->steps.next, PLN_BaseStep, llnodePln);
98   }
99   if (!end) {
100     end = DLLIST_ITEM(&pln->steps, PLN_BaseStep, llnodePln);
101   }
102   for (const PLN_BaseStep *bstp = begin; bstp != end;
103        bstp = DLLIST_ITEM(bstp->llnodePln.next, PLN_BaseStep, llnodePln)) {
104     if (bstp->type == type) {
105       return bstp;
106     }
107     if (type == PLANTYPE_ANY_REDUCER && PLN_IsReduce(bstp)) {
108       return bstp;
109     }
110   }
111   return NULL;
112 }
113 
arrangeDtor(PLN_BaseStep * bstp)114 static void arrangeDtor(PLN_BaseStep *bstp) {
115   PLN_ArrangeStep *astp = (PLN_ArrangeStep *)bstp;
116   if (astp->sortKeys) {
117     array_free(astp->sortKeys);
118   }
119   rm_free(astp->sortkeysLK);
120   rm_free(bstp);
121 }
122 
AGPLN_GetArrangeStep(AGGPlan * pln)123 PLN_ArrangeStep *AGPLN_GetArrangeStep(AGGPlan *pln) {
124   // Go backwards.. and stop at the cutoff
125   for (const DLLIST_node *nn = pln->steps.prev; nn != &pln->steps; nn = nn->prev) {
126     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
127     if (PLN_IsReduce(stp)) {
128       break;
129     } else if (stp->type == PLN_T_ARRANGE) {
130       return (PLN_ArrangeStep *)stp;
131     }
132   }
133   return NULL;
134 }
135 
AGPLN_GetOrCreateArrangeStep(AGGPlan * pln)136 PLN_ArrangeStep *AGPLN_GetOrCreateArrangeStep(AGGPlan *pln) {
137   PLN_ArrangeStep *ret = AGPLN_GetArrangeStep(pln);
138   if (ret) {
139     return ret;
140   }
141   ret = rm_calloc(1, sizeof(*ret));
142   ret->base.type = PLN_T_ARRANGE;
143   ret->base.dtor = arrangeDtor;
144   AGPLN_AddStep(pln, &ret->base);
145   return ret;
146 }
147 
AGPLN_GetLookup(const AGGPlan * pln,const PLN_BaseStep * bstp,AGPLNGetLookupMode mode)148 RLookup *AGPLN_GetLookup(const AGGPlan *pln, const PLN_BaseStep *bstp, AGPLNGetLookupMode mode) {
149   const DLLIST_node *first = NULL, *last = NULL;
150   int isReverse = 0;
151 
152   switch (mode) {
153     case AGPLN_GETLOOKUP_FIRST:
154       first = pln->steps.next;
155       last = bstp ? &bstp->llnodePln : &pln->steps;
156       break;
157     case AGPLN_GETLOOKUP_PREV:
158       first = &pln->steps;
159       last = bstp->llnodePln.prev;
160       isReverse = 1;
161       break;
162     case AGPLN_GETLOOKUP_NEXT:
163       first = bstp->llnodePln.next;
164       last = &pln->steps;
165       break;
166     case AGPLN_GETLOOKUP_LAST:
167       first = bstp ? &bstp->llnodePln : &pln->steps;
168       last = pln->steps.prev;
169       isReverse = 1;
170   }
171 
172   if (isReverse) {
173     for (const DLLIST_node *nn = last; nn && nn != first; nn = nn->prev) {
174       RLookup *lk = lookupFromNode(nn);
175       if (lk) {
176         return lk;
177       }
178     }
179   } else {
180     for (const DLLIST_node *nn = first; nn && nn != last; nn = nn->next) {
181       RLookup *lk = lookupFromNode(nn);
182       if (lk) {
183         return lk;
184       }
185     }
186     return NULL;
187   }
188   return NULL;
189 }
190 
AGPLN_FreeSteps(AGGPlan * pln)191 void AGPLN_FreeSteps(AGGPlan *pln) {
192   DLLIST_node *nn = pln->steps.next;
193   while (nn && nn != &pln->steps) {
194     PLN_BaseStep *bstp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
195     nn = nn->next;
196     if (bstp->dtor) {
197       bstp->dtor(bstp);
198     }
199   }
200 }
201 
AGPLN_Dump(const AGGPlan * pln)202 void AGPLN_Dump(const AGGPlan *pln) {
203   for (const DLLIST_node *nn = pln->steps.next; nn && nn != &pln->steps; nn = nn->next) {
204     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
205     printf("STEP: [T=%s. P=%p]\n", steptypeToString(stp->type), stp);
206     const RLookup *lk = lookupFromNode(nn);
207     if (lk) {
208       printf("  NEW LOOKUP: %p\n", lk);
209       for (const RLookupKey *kk = lk->head; kk; kk = kk->next) {
210         printf("    %s @%p: FLAGS=0x%x\n", kk->name, kk, kk->flags);
211       }
212     }
213 
214     switch (stp->type) {
215       case PLN_T_APPLY:
216       case PLN_T_FILTER:
217         printf("  EXPR:%s\n", ((PLN_MapFilterStep *)stp)->rawExpr);
218         if (stp->alias) {
219           printf("  AS:%s\n", stp->alias);
220         }
221         break;
222       case PLN_T_ARRANGE: {
223         const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
224         if (astp->offset || astp->limit) {
225           printf("  OFFSET:%lu LIMIT:%lu\n", (unsigned long)astp->offset,
226                  (unsigned long)astp->limit);
227         }
228         if (astp->sortKeys) {
229           printf("  SORT:\n");
230           for (size_t ii = 0; ii < array_len(astp->sortKeys); ++ii) {
231             const char *dir = SORTASCMAP_GETASC(astp->sortAscMap, ii) ? "ASC" : "DESC";
232             printf("    %s:%s\n", astp->sortKeys[ii], dir);
233           }
234         }
235         break;
236       }
237       case PLN_T_LOAD: {
238         const PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
239         for (size_t ii = 0; ii < lstp->args.argc; ++ii) {
240           printf("  %s\n", (char *)lstp->args.objs[ii]);
241         }
242         break;
243       }
244       case PLN_T_GROUP: {
245         const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
246         printf("  BY:\n");
247         for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
248           printf("    %s\n", gstp->properties[ii]);
249         }
250         for (size_t ii = 0; ii < array_len(gstp->reducers); ++ii) {
251           const PLN_Reducer *r = gstp->reducers + ii;
252           printf("  REDUCE: %s AS %s\n", r->name, r->alias);
253           if (r->args.argc) {
254             printf("    ARGS:[");
255           }
256           for (size_t jj = 0; jj < r->args.argc; ++jj) {
257             printf("%s ", (char *)r->args.objs[jj]);
258           }
259           printf("]\n");
260         }
261         break;
262       }
263       case PLN_T_ROOT:
264       case PLN_T_DISTRIBUTE:
265       case PLN_T_INVALID:
266       case PLN_T__MAX:
267         break;
268     }
269   }
270 }
271 
272 typedef char **myArgArray_t;
273 
append_string(myArgArray_t * arr,const char * src)274 static inline void append_string(myArgArray_t *arr, const char *src) {
275   char *s = rm_strdup(src);
276   *arr = array_append(*arr, s);
277 }
append_uint(myArgArray_t * arr,unsigned long long ll)278 static inline void append_uint(myArgArray_t *arr, unsigned long long ll) {
279   char s[64] = {0};
280   sprintf(s, "%llu", ll);
281   append_string(arr, s);
282 }
append_ac(myArgArray_t * arr,const ArgsCursor * ac)283 static inline void append_ac(myArgArray_t *arr, const ArgsCursor *ac) {
284   for (size_t ii = 0; ii < ac->argc; ++ii) {
285     append_string(arr, AC_StringArg(ac, ii));
286   }
287 }
288 
serializeMapFilter(myArgArray_t * arr,const PLN_BaseStep * stp)289 static void serializeMapFilter(myArgArray_t *arr, const PLN_BaseStep *stp) {
290   const PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
291   if (stp->type == PLN_T_APPLY) {
292     append_string(arr, "APPLY");
293   } else {
294     append_string(arr, "FILTER");
295   }
296   append_string(arr, mstp->rawExpr);
297   if (stp->alias) {
298     append_string(arr, "AS");
299     append_string(arr, stp->alias);
300   }
301 }
302 
serializeArrange(myArgArray_t * arr,const PLN_BaseStep * stp)303 static void serializeArrange(myArgArray_t *arr, const PLN_BaseStep *stp) {
304   const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
305   if (astp->limit || astp->offset) {
306     append_string(arr, "LIMIT");
307     append_uint(arr, astp->offset);
308     append_uint(arr, astp->limit);
309   }
310   if (astp->sortKeys) {
311     size_t numsort = array_len(astp->sortKeys);
312     append_string(arr, "SORTBY");
313     append_uint(arr, numsort * 2);
314     for (size_t ii = 0; ii < numsort; ++ii) {
315       char *stmp;
316       rm_asprintf(&stmp, "@%s", astp->sortKeys[ii]);
317       *arr = array_append(*arr, stmp);
318       if (SORTASCMAP_GETASC(astp->sortAscMap, ii)) {
319         append_string(arr, "ASC");
320       } else {
321         append_string(arr, "DESC");
322       }
323     }
324   }
325 }
serializeLoad(myArgArray_t * arr,const PLN_BaseStep * stp)326 static void serializeLoad(myArgArray_t *arr, const PLN_BaseStep *stp) {
327   PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
328   if (lstp->args.argc) {
329     append_string(arr, "LOAD");
330     append_uint(arr, lstp->args.argc);
331     append_ac(arr, &lstp->args);
332   }
333 }
334 
serializeGroup(myArgArray_t * arr,const PLN_BaseStep * stp)335 static void serializeGroup(myArgArray_t *arr, const PLN_BaseStep *stp) {
336   const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
337   append_string(arr, "GROUPBY");
338   append_uint(arr, gstp->nproperties);
339   for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
340     append_string(arr, gstp->properties[ii]);
341   }
342   size_t nreducers = array_len(gstp->reducers);
343   for (size_t ii = 0; ii < nreducers; ++ii) {
344     const PLN_Reducer *r = gstp->reducers + ii;
345     append_string(arr, "REDUCE");
346     append_string(arr, r->name);
347     append_uint(arr, r->args.argc);
348     append_ac(arr, &r->args);
349     if (r->alias) {
350       append_string(arr, "AS");
351       append_string(arr, r->alias);
352     }
353   }
354 }
355 
AGPLN_Serialize(const AGGPlan * pln)356 array_t AGPLN_Serialize(const AGGPlan *pln) {
357   char **arr = array_new(char *, 1);
358   for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
359     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
360     switch (stp->type) {
361       case PLN_T_APPLY:
362       case PLN_T_FILTER:
363         serializeMapFilter(&arr, stp);
364         break;
365       case PLN_T_ARRANGE:
366         serializeArrange(&arr, stp);
367         break;
368       case PLN_T_LOAD:
369         serializeLoad(&arr, stp);
370         break;
371       case PLN_T_GROUP:
372         serializeGroup(&arr, stp);
373         break;
374       case PLN_T_INVALID:
375       case PLN_T_ROOT:
376       case PLN_T_DISTRIBUTE:
377       case PLN_T__MAX:
378         break;
379     }
380   }
381   return arr;
382 }
383