1 #include <aggregate/reducer.h>
2 
3 typedef struct {
4   TrieMap *values;
5   const RLookupKey *srckey;
6 } tolistCtx;
7 
tolistNewInstance(Reducer * rbase)8 static void *tolistNewInstance(Reducer *rbase) {
9   tolistCtx *ctx = Reducer_BlkAlloc(rbase, sizeof(*ctx), 100 * sizeof(*ctx));
10   ctx->values = NewTrieMap();
11   ctx->srckey = rbase->srckey;
12   return ctx;
13 }
14 
tolistAdd(Reducer * rbase,void * ctx,const RLookupRow * srcrow)15 static int tolistAdd(Reducer *rbase, void *ctx, const RLookupRow *srcrow) {
16   tolistCtx *tlc = ctx;
17   RSValue *v = RLookup_GetItem(tlc->srckey, srcrow);
18   if (!v) {
19     return 1;
20   }
21 
22   // for non array values we simply add the value to the list */
23   if (v->t != RSValue_Array) {
24     uint64_t hval = RSValue_Hash(v, 0);
25     if (TrieMap_Find(tlc->values, (char *)&hval, sizeof(hval)) == TRIEMAP_NOTFOUND) {
26 
27       TrieMap_Add(tlc->values, (char *)&hval, sizeof(hval),
28                   RSValue_IncrRef(RSValue_MakePersistent(v)), NULL);
29     }
30   } else {  // For array values we add each distinct element to the list
31     uint32_t len = RSValue_ArrayLen(v);
32     for (uint32_t i = 0; i < len; i++) {
33       RSValue *av = RSValue_ArrayItem(v, i);
34       uint64_t hval = RSValue_Hash(av, 0);
35       if (TrieMap_Find(tlc->values, (char *)&hval, sizeof(hval)) == TRIEMAP_NOTFOUND) {
36 
37         TrieMap_Add(tlc->values, (char *)&hval, sizeof(hval),
38                     RSValue_IncrRef(RSValue_MakePersistent(av)), NULL);
39       }
40     }
41   }
42   return 1;
43 }
44 
tolistFinalize(Reducer * rbase,void * ctx)45 static RSValue *tolistFinalize(Reducer *rbase, void *ctx) {
46   tolistCtx *tlc = ctx;
47   TrieMapIterator *it = TrieMap_Iterate(tlc->values, "", 0);
48   char *c;
49   tm_len_t l;
50   void *ptr;
51   RSValue **arr = rm_calloc(tlc->values->cardinality, sizeof(RSValue));
52   size_t i = 0;
53   while (TrieMapIterator_Next(it, &c, &l, &ptr)) {
54     if (ptr) {
55       arr[i++] = ptr;
56     }
57   }
58 
59   RSValue *ret = RSValue_NewArrayEx(arr, i, RSVAL_ARRAY_ALLOC);
60   TrieMapIterator_Free(it);
61   return ret;
62 }
63 
freeValues(void * ptr)64 static void freeValues(void *ptr) {
65   RSValue_Decref((RSValue *)ptr);
66 }
67 
tolistFreeInstance(Reducer * parent,void * p)68 static void tolistFreeInstance(Reducer *parent, void *p) {
69   tolistCtx *tlc = p;
70   TrieMap_Free(tlc->values, freeValues);
71 }
72 
RDCRToList_New(const ReducerOptions * opts)73 Reducer *RDCRToList_New(const ReducerOptions *opts) {
74   Reducer *r = rm_calloc(1, sizeof(*r));
75   if (!ReducerOptions_GetKey(opts, &r->srckey)) {
76     rm_free(r);
77     return NULL;
78   }
79   r->Add = tolistAdd;
80   r->Finalize = tolistFinalize;
81   r->Free = Reducer_GenericFree;
82   r->FreeInstance = tolistFreeInstance;
83   r->NewInstance = tolistNewInstance;
84   return r;
85 }
86