1 #include <aggregate/reducer.h>
2
3 typedef struct {
4 TrieMap *values;
5 const RLookupKey *srckey;
6 } tolistCtx;
7
tolistNewInstance(Reducer * rbase)8 static void *tolistNewInstance(Reducer *rbase) {
9 tolistCtx *ctx = Reducer_BlkAlloc(rbase, sizeof(*ctx), 100 * sizeof(*ctx));
10 ctx->values = NewTrieMap();
11 ctx->srckey = rbase->srckey;
12 return ctx;
13 }
14
tolistAdd(Reducer * rbase,void * ctx,const RLookupRow * srcrow)15 static int tolistAdd(Reducer *rbase, void *ctx, const RLookupRow *srcrow) {
16 tolistCtx *tlc = ctx;
17 RSValue *v = RLookup_GetItem(tlc->srckey, srcrow);
18 if (!v) {
19 return 1;
20 }
21
22 // for non array values we simply add the value to the list */
23 if (v->t != RSValue_Array) {
24 uint64_t hval = RSValue_Hash(v, 0);
25 if (TrieMap_Find(tlc->values, (char *)&hval, sizeof(hval)) == TRIEMAP_NOTFOUND) {
26
27 TrieMap_Add(tlc->values, (char *)&hval, sizeof(hval),
28 RSValue_IncrRef(RSValue_MakePersistent(v)), NULL);
29 }
30 } else { // For array values we add each distinct element to the list
31 uint32_t len = RSValue_ArrayLen(v);
32 for (uint32_t i = 0; i < len; i++) {
33 RSValue *av = RSValue_ArrayItem(v, i);
34 uint64_t hval = RSValue_Hash(av, 0);
35 if (TrieMap_Find(tlc->values, (char *)&hval, sizeof(hval)) == TRIEMAP_NOTFOUND) {
36
37 TrieMap_Add(tlc->values, (char *)&hval, sizeof(hval),
38 RSValue_IncrRef(RSValue_MakePersistent(av)), NULL);
39 }
40 }
41 }
42 return 1;
43 }
44
tolistFinalize(Reducer * rbase,void * ctx)45 static RSValue *tolistFinalize(Reducer *rbase, void *ctx) {
46 tolistCtx *tlc = ctx;
47 TrieMapIterator *it = TrieMap_Iterate(tlc->values, "", 0);
48 char *c;
49 tm_len_t l;
50 void *ptr;
51 RSValue **arr = rm_calloc(tlc->values->cardinality, sizeof(RSValue));
52 size_t i = 0;
53 while (TrieMapIterator_Next(it, &c, &l, &ptr)) {
54 if (ptr) {
55 arr[i++] = ptr;
56 }
57 }
58
59 RSValue *ret = RSValue_NewArrayEx(arr, i, RSVAL_ARRAY_ALLOC);
60 TrieMapIterator_Free(it);
61 return ret;
62 }
63
freeValues(void * ptr)64 static void freeValues(void *ptr) {
65 RSValue_Decref((RSValue *)ptr);
66 }
67
tolistFreeInstance(Reducer * parent,void * p)68 static void tolistFreeInstance(Reducer *parent, void *p) {
69 tolistCtx *tlc = p;
70 TrieMap_Free(tlc->values, freeValues);
71 }
72
RDCRToList_New(const ReducerOptions * opts)73 Reducer *RDCRToList_New(const ReducerOptions *opts) {
74 Reducer *r = rm_calloc(1, sizeof(*r));
75 if (!ReducerOptions_GetKey(opts, &r->srckey)) {
76 rm_free(r);
77 return NULL;
78 }
79 r->Add = tolistAdd;
80 r->Finalize = tolistFinalize;
81 r->Free = Reducer_GenericFree;
82 r->FreeInstance = tolistFreeInstance;
83 r->NewInstance = tolistNewInstance;
84 return r;
85 }
86