1 #include "aggregate_plan.h"
2 #include "reducer.h"
3 #include "expr/expression.h"
4 #include <util/arr.h>
5 #include <ctype.h>
6 
steptypeToString(PLN_StepType type)7 static const char *steptypeToString(PLN_StepType type) {
8   switch (type) {
9     case PLN_T_APPLY:
10       return "APPLY";
11     case PLN_T_FILTER:
12       return "FILTER";
13     case PLN_T_ARRANGE:
14       return "LIMIT/MAX/SORTBY";
15     case PLN_T_ROOT:
16       return "<ROOT>";
17     case PLN_T_GROUP:
18       return "GROUPBY";
19     case PLN_T_LOAD:
20       return "LOAD";
21     case PLN_T_DISTRIBUTE:
22       return "DISTRIBUTE";
23     case PLN_T_INVALID:
24     default:
25       return "<UNKNOWN>";
26   }
27 }
28 
29 /* add a step to the plan at its end (before the dummy tail) */
AGPLN_AddStep(AGGPlan * plan,PLN_BaseStep * step)30 void AGPLN_AddStep(AGGPlan *plan, PLN_BaseStep *step) {
31   RS_LOG_ASSERT(step->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
32   dllist_append(&plan->steps, &step->llnodePln);
33   plan->steptypes |= (1 << (step->type - 1));
34 }
35 
AGPLN_HasStep(const AGGPlan * pln,PLN_StepType t)36 int AGPLN_HasStep(const AGGPlan *pln, PLN_StepType t) {
37   return (pln->steptypes & (1 << (t - 1)));
38 }
39 
AGPLN_AddBefore(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)40 void AGPLN_AddBefore(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
41   RS_LOG_ASSERT(newstp->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
42   if (posstp == NULL || DLLIST_IS_FIRST(&pln->steps, &posstp->llnodePln)) {
43     dllist_prepend(&pln->steps, &posstp->llnodePln);
44   } else {
45     dllist_insert(posstp->llnodePln.prev, &posstp->llnodePln, &newstp->llnodePln);
46   }
47 }
48 
AGPLN_AddAfter(AGGPlan * pln,PLN_BaseStep * posstp,PLN_BaseStep * newstp)49 void AGPLN_AddAfter(AGGPlan *pln, PLN_BaseStep *posstp, PLN_BaseStep *newstp) {
50   RS_LOG_ASSERT(newstp->type > PLN_T_INVALID, "Step type connot be PLN_T_INVALID");
51   if (posstp == NULL || DLLIST_IS_LAST(&pln->steps, &posstp->llnodePln)) {
52     AGPLN_AddStep(pln, newstp);
53   } else {
54     dllist_insert(&posstp->llnodePln, posstp->llnodePln.next, &newstp->llnodePln);
55   }
56 }
57 
AGPLN_Prepend(AGGPlan * pln,PLN_BaseStep * newstp)58 void AGPLN_Prepend(AGGPlan *pln, PLN_BaseStep *newstp) {
59   dllist_prepend(&pln->steps, &newstp->llnodePln);
60 }
61 
AGPLN_PopStep(AGGPlan * pln,PLN_BaseStep * step)62 void AGPLN_PopStep(AGGPlan *pln, PLN_BaseStep *step) {
63   dllist_delete(&step->llnodePln);
64   (void)pln;
65 }
66 
rootStepDtor(PLN_BaseStep * bstp)67 static void rootStepDtor(PLN_BaseStep *bstp) {
68   PLN_FirstStep *fstp = (PLN_FirstStep *)bstp;
69   RLookup_Cleanup(&fstp->lookup);
70 }
rootStepLookup(PLN_BaseStep * bstp)71 static RLookup *rootStepLookup(PLN_BaseStep *bstp) {
72   return &((PLN_FirstStep *)bstp)->lookup;
73 }
74 
AGPLN_Init(AGGPlan * plan)75 void AGPLN_Init(AGGPlan *plan) {
76   memset(plan, 0, sizeof *plan);
77   dllist_init(&plan->steps);
78   dllist_append(&plan->steps, &plan->firstStep_s.base.llnodePln);
79   plan->firstStep_s.base.type = PLN_T_ROOT;
80   plan->firstStep_s.base.dtor = rootStepDtor;
81   plan->firstStep_s.base.getLookup = rootStepLookup;
82 }
83 
lookupFromNode(const DLLIST_node * nn)84 static RLookup *lookupFromNode(const DLLIST_node *nn) {
85   PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
86   if (stp->getLookup) {
87     return stp->getLookup(stp);
88   } else {
89     return NULL;
90   }
91 }
92 
AGPLN_FindStep(const AGGPlan * pln,const PLN_BaseStep * begin,const PLN_BaseStep * end,PLN_StepType type)93 const PLN_BaseStep *AGPLN_FindStep(const AGGPlan *pln, const PLN_BaseStep *begin,
94                                    const PLN_BaseStep *end, PLN_StepType type) {
95   if (!begin) {
96     begin = DLLIST_ITEM(pln->steps.next, PLN_BaseStep, llnodePln);
97   }
98   if (!end) {
99     end = DLLIST_ITEM(&pln->steps, PLN_BaseStep, llnodePln);
100   }
101   for (const PLN_BaseStep *bstp = begin; bstp != end;
102        bstp = DLLIST_ITEM(bstp->llnodePln.next, PLN_BaseStep, llnodePln)) {
103     if (bstp->type == type) {
104       return bstp;
105     }
106     if (type == PLANTYPE_ANY_REDUCER && PLN_IsReduce(bstp)) {
107       return bstp;
108     }
109   }
110   return NULL;
111 }
112 
arrangeDtor(PLN_BaseStep * bstp)113 static void arrangeDtor(PLN_BaseStep *bstp) {
114   PLN_ArrangeStep *astp = (PLN_ArrangeStep *)bstp;
115   if (astp->sortKeys) {
116     array_free(astp->sortKeys);
117   }
118   rm_free(astp->sortkeysLK);
119   rm_free(bstp);
120 }
121 
AGPLN_GetArrangeStep(AGGPlan * pln)122 PLN_ArrangeStep *AGPLN_GetArrangeStep(AGGPlan *pln) {
123   // Go backwards.. and stop at the cutoff
124   for (const DLLIST_node *nn = pln->steps.prev; nn != &pln->steps; nn = nn->prev) {
125     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
126     if (PLN_IsReduce(stp)) {
127       break;
128     } else if (stp->type == PLN_T_ARRANGE) {
129       return (PLN_ArrangeStep *)stp;
130     }
131   }
132   return NULL;
133 }
134 
AGPLN_GetOrCreateArrangeStep(AGGPlan * pln)135 PLN_ArrangeStep *AGPLN_GetOrCreateArrangeStep(AGGPlan *pln) {
136   PLN_ArrangeStep *ret = AGPLN_GetArrangeStep(pln);
137   if (ret) {
138     return ret;
139   }
140   ret = rm_calloc(1, sizeof(*ret));
141   ret->base.type = PLN_T_ARRANGE;
142   ret->base.dtor = arrangeDtor;
143   AGPLN_AddStep(pln, &ret->base);
144   return ret;
145 }
146 
AGPLN_GetLookup(const AGGPlan * pln,const PLN_BaseStep * bstp,AGPLNGetLookupMode mode)147 RLookup *AGPLN_GetLookup(const AGGPlan *pln, const PLN_BaseStep *bstp, AGPLNGetLookupMode mode) {
148   const DLLIST_node *first = NULL, *last = NULL;
149   int isReverse = 0;
150 
151   switch (mode) {
152     case AGPLN_GETLOOKUP_FIRST:
153       first = pln->steps.next;
154       last = bstp ? &bstp->llnodePln : &pln->steps;
155       break;
156     case AGPLN_GETLOOKUP_PREV:
157       first = &pln->steps;
158       last = bstp->llnodePln.prev;
159       isReverse = 1;
160       break;
161     case AGPLN_GETLOOKUP_NEXT:
162       first = bstp->llnodePln.next;
163       last = &pln->steps;
164       break;
165     case AGPLN_GETLOOKUP_LAST:
166       first = bstp ? &bstp->llnodePln : &pln->steps;
167       last = pln->steps.prev;
168       isReverse = 1;
169   }
170 
171   if (isReverse) {
172     for (const DLLIST_node *nn = last; nn && nn != first; nn = nn->prev) {
173       RLookup *lk = lookupFromNode(nn);
174       if (lk) {
175         return lk;
176       }
177     }
178   } else {
179     for (const DLLIST_node *nn = first; nn && nn != last; nn = nn->next) {
180       RLookup *lk = lookupFromNode(nn);
181       if (lk) {
182         return lk;
183       }
184     }
185     return NULL;
186   }
187   return NULL;
188 }
189 
AGPLN_FreeSteps(AGGPlan * pln)190 void AGPLN_FreeSteps(AGGPlan *pln) {
191   DLLIST_node *nn = pln->steps.next;
192   while (nn && nn != &pln->steps) {
193     PLN_BaseStep *bstp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
194     nn = nn->next;
195     if (bstp->dtor) {
196       bstp->dtor(bstp);
197     }
198   }
199 }
200 
AGPLN_Dump(const AGGPlan * pln)201 void AGPLN_Dump(const AGGPlan *pln) {
202   for (const DLLIST_node *nn = pln->steps.next; nn && nn != &pln->steps; nn = nn->next) {
203     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
204     printf("STEP: [T=%s. P=%p]\n", steptypeToString(stp->type), stp);
205     const RLookup *lk = lookupFromNode(nn);
206     if (lk) {
207       printf("  NEW LOOKUP: %p\n", lk);
208       for (const RLookupKey *kk = lk->head; kk; kk = kk->next) {
209         printf("    %s @%p: FLAGS=0x%x\n", kk->name, kk, kk->flags);
210       }
211     }
212 
213     switch (stp->type) {
214       case PLN_T_APPLY:
215       case PLN_T_FILTER:
216         printf("  EXPR:%s\n", ((PLN_MapFilterStep *)stp)->rawExpr);
217         if (stp->alias) {
218           printf("  AS:%s\n", stp->alias);
219         }
220         break;
221       case PLN_T_ARRANGE: {
222         const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
223         if (astp->offset || astp->limit) {
224           printf("  OFFSET:%lu LIMIT:%lu\n", (unsigned long)astp->offset,
225                  (unsigned long)astp->limit);
226         }
227         if (astp->sortKeys) {
228           printf("  SORT:\n");
229           for (size_t ii = 0; ii < array_len(astp->sortKeys); ++ii) {
230             const char *dir = SORTASCMAP_GETASC(astp->sortAscMap, ii) ? "ASC" : "DESC";
231             printf("    %s:%s\n", astp->sortKeys[ii], dir);
232           }
233         }
234         break;
235       }
236       case PLN_T_LOAD: {
237         const PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
238         for (size_t ii = 0; ii < lstp->args.argc; ++ii) {
239           printf("  %s\n", (char *)lstp->args.objs[ii]);
240         }
241         break;
242       }
243       case PLN_T_GROUP: {
244         const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
245         printf("  BY:\n");
246         for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
247           printf("    %s\n", gstp->properties[ii]);
248         }
249         for (size_t ii = 0; ii < array_len(gstp->reducers); ++ii) {
250           const PLN_Reducer *r = gstp->reducers + ii;
251           printf("  REDUCE: %s AS %s\n", r->name, r->alias);
252           if (r->args.argc) {
253             printf("    ARGS:[");
254           }
255           for (size_t jj = 0; jj < r->args.argc; ++jj) {
256             printf("%s ", (char *)r->args.objs[jj]);
257           }
258           printf("]\n");
259         }
260         break;
261       }
262       case PLN_T_ROOT:
263       case PLN_T_DISTRIBUTE:
264       case PLN_T_INVALID:
265       case PLN_T__MAX:
266         break;
267     }
268   }
269 }
270 
271 typedef char **myArgArray_t;
272 
append_string(myArgArray_t * arr,const char * src)273 static inline void append_string(myArgArray_t *arr, const char *src) {
274   char *s = rm_strdup(src);
275   *arr = array_append(*arr, s);
276 }
append_uint(myArgArray_t * arr,unsigned long long ll)277 static inline void append_uint(myArgArray_t *arr, unsigned long long ll) {
278   char s[64] = {0};
279   sprintf(s, "%llu", ll);
280   append_string(arr, s);
281 }
append_ac(myArgArray_t * arr,const ArgsCursor * ac)282 static inline void append_ac(myArgArray_t *arr, const ArgsCursor *ac) {
283   for (size_t ii = 0; ii < ac->argc; ++ii) {
284     append_string(arr, AC_StringArg(ac, ii));
285   }
286 }
287 
serializeMapFilter(myArgArray_t * arr,const PLN_BaseStep * stp)288 static void serializeMapFilter(myArgArray_t *arr, const PLN_BaseStep *stp) {
289   const PLN_MapFilterStep *mstp = (PLN_MapFilterStep *)stp;
290   if (stp->type == PLN_T_APPLY) {
291     append_string(arr, "APPLY");
292   } else {
293     append_string(arr, "FILTER");
294   }
295   append_string(arr, mstp->rawExpr);
296   if (stp->alias) {
297     append_string(arr, "AS");
298     append_string(arr, stp->alias);
299   }
300 }
301 
serializeArrange(myArgArray_t * arr,const PLN_BaseStep * stp)302 static void serializeArrange(myArgArray_t *arr, const PLN_BaseStep *stp) {
303   const PLN_ArrangeStep *astp = (PLN_ArrangeStep *)stp;
304   if (astp->limit || astp->offset) {
305     append_string(arr, "LIMIT");
306     append_uint(arr, 0);
307     append_uint(arr, astp->offset + astp->limit);
308   }
309   if (astp->sortKeys) {
310     size_t numsort = array_len(astp->sortKeys);
311     append_string(arr, "SORTBY");
312     append_uint(arr, numsort * 2);
313     for (size_t ii = 0; ii < numsort; ++ii) {
314       char *stmp;
315       rm_asprintf(&stmp, "@%s", astp->sortKeys[ii]);
316       *arr = array_append(*arr, stmp);
317       if (SORTASCMAP_GETASC(astp->sortAscMap, ii)) {
318         append_string(arr, "ASC");
319       } else {
320         append_string(arr, "DESC");
321       }
322     }
323   }
324 }
325 
serializeLoad(myArgArray_t * arr,const PLN_BaseStep * stp)326 static void serializeLoad(myArgArray_t *arr, const PLN_BaseStep *stp) {
327   PLN_LoadStep *lstp = (PLN_LoadStep *)stp;
328   if (lstp->args.argc) {
329     append_string(arr, "LOAD");
330     append_uint(arr, lstp->args.argc);
331     append_ac(arr, &lstp->args);
332   } else if (lstp->base.flags & PLN_F_LOAD_ALL) {
333     append_string(arr, "LOAD");
334     append_string(arr, "*");
335   }
336 }
337 
serializeGroup(myArgArray_t * arr,const PLN_BaseStep * stp)338 static void serializeGroup(myArgArray_t *arr, const PLN_BaseStep *stp) {
339   const PLN_GroupStep *gstp = (PLN_GroupStep *)stp;
340   append_string(arr, "GROUPBY");
341   append_uint(arr, gstp->nproperties);
342   for (size_t ii = 0; ii < gstp->nproperties; ++ii) {
343     append_string(arr, gstp->properties[ii]);
344   }
345   size_t nreducers = array_len(gstp->reducers);
346   for (size_t ii = 0; ii < nreducers; ++ii) {
347     const PLN_Reducer *r = gstp->reducers + ii;
348     append_string(arr, "REDUCE");
349     append_string(arr, r->name);
350     append_uint(arr, r->args.argc);
351     append_ac(arr, &r->args);
352     if (r->alias) {
353       append_string(arr, "AS");
354       append_string(arr, r->alias);
355     }
356   }
357 }
358 
AGPLN_Serialize(const AGGPlan * pln)359 array_t AGPLN_Serialize(const AGGPlan *pln) {
360   char **arr = array_new(char *, 1);
361   for (const DLLIST_node *nn = pln->steps.next; nn != &pln->steps; nn = nn->next) {
362     const PLN_BaseStep *stp = DLLIST_ITEM(nn, PLN_BaseStep, llnodePln);
363     switch (stp->type) {
364       case PLN_T_APPLY:
365       case PLN_T_FILTER:
366         serializeMapFilter(&arr, stp);
367         break;
368       case PLN_T_ARRANGE:
369         serializeArrange(&arr, stp);
370         break;
371       case PLN_T_LOAD:
372         serializeLoad(&arr, stp);
373         break;
374       case PLN_T_GROUP:
375         serializeGroup(&arr, stp);
376         break;
377       case PLN_T_INVALID:
378       case PLN_T_ROOT:
379       case PLN_T_DISTRIBUTE:
380       case PLN_T__MAX:
381         break;
382     }
383   }
384   return arr;
385 }
386