1 #include <aggregate/reducer.h>
2 #include <util/block_alloc.h>
3 #include <util/khash.h>
4 #include <util/fnv.h>
5 #include <dep/hll/hll.h>
6 #include <rmutil/sds.h>
7 
8 #define HLL_PRECISION_BITS 8
9 #define INSTANCE_BLOCK_NUM 1024
10 
11 static const int khid = 35;
12 KHASH_SET_INIT_INT64(khid);
13 
14 typedef struct {
15   size_t count;
16   const RLookupKey *srckey;
17   khash_t(khid) * dedup;
18 } distinctCounter;
19 
distinctNewInstance(Reducer * r)20 static void *distinctNewInstance(Reducer *r) {
21   BlkAlloc *ba = &r->alloc;
22   distinctCounter *ctr =
23       BlkAlloc_Alloc(ba, sizeof(*ctr), INSTANCE_BLOCK_NUM * sizeof(*ctr));  // malloc(sizeof(*ctr));
24   ctr->count = 0;
25   ctr->dedup = kh_init(khid);
26   ctr->srckey = r->srckey;
27   return ctr;
28 }
29 
distinctAdd(Reducer * r,void * ctx,const RLookupRow * srcrow)30 static int distinctAdd(Reducer *r, void *ctx, const RLookupRow *srcrow) {
31   distinctCounter *ctr = ctx;
32   const RSValue *val = RLookup_GetItem(ctr->srckey, srcrow);
33   if (!val || val->t == RSValue_Null) {
34     return 1;
35   }
36 
37   uint64_t hval = RSValue_Hash(val, 0);
38 
39   khiter_t k = kh_get(khid, ctr->dedup, hval);  // first have to get ieter
40   if (k == kh_end(ctr->dedup)) {
41     ctr->count++;
42     int ret;
43     kh_put(khid, ctr->dedup, hval, &ret);
44   }
45   return 1;
46 }
47 
distinctFinalize(Reducer * parent,void * ctx)48 static RSValue *distinctFinalize(Reducer *parent, void *ctx) {
49   distinctCounter *ctr = ctx;
50   return RS_NumVal(ctr->count);
51 }
52 
distinctFreeInstance(Reducer * r,void * p)53 static void distinctFreeInstance(Reducer *r, void *p) {
54   distinctCounter *ctr = p;
55   // we only destroy the hash table. The object itself is allocated from a block and needs no
56   // freeing
57   kh_destroy(khid, ctr->dedup);
58 }
59 
RDCRCountDistinct_New(const ReducerOptions * options)60 Reducer *RDCRCountDistinct_New(const ReducerOptions *options) {
61   Reducer *r = rm_calloc(1, sizeof(*r));
62   if (!ReducerOpts_GetKey(options, &r->srckey)) {
63     rm_free(r);
64     return NULL;
65   }
66   r->Add = distinctAdd;
67   r->Finalize = distinctFinalize;
68   r->Free = Reducer_GenericFree;
69   r->FreeInstance = distinctFreeInstance;
70   r->NewInstance = distinctNewInstance;
71   r->reducerId = REDUCER_T_DISTINCT;
72   return r;
73 }
74 
75 typedef struct {
76   struct HLL hll;
77   const RLookupKey *key;
78 } distinctishCounter;
79 
distinctishNewInstance(Reducer * parent)80 static void *distinctishNewInstance(Reducer *parent) {
81   BlkAlloc *ba = &parent->alloc;
82   distinctishCounter *ctr =
83       BlkAlloc_Alloc(ba, sizeof(*ctr), 1024 * sizeof(*ctr));  // malloc(sizeof(*ctr));
84   hll_init(&ctr->hll, HLL_PRECISION_BITS);
85   ctr->key = parent->srckey;
86   return ctr;
87 }
88 
distinctishAdd(Reducer * parent,void * instance,const RLookupRow * srcrow)89 static int distinctishAdd(Reducer *parent, void *instance, const RLookupRow *srcrow) {
90   distinctishCounter *ctr = instance;
91   const RSValue *val = RLookup_GetItem(ctr->key, srcrow);
92   if (!val || val->t == RSValue_Null) {
93     return 1;
94   }
95 
96   uint64_t hval = RSValue_Hash(val, 0x5f61767a);
97   uint32_t val32 = (uint32_t)hval ^ (uint32_t)(hval >> 32);
98   hll_add_hash(&ctr->hll, val32);
99   return 1;
100 }
101 
distinctishFinalize(Reducer * parent,void * instance)102 static RSValue *distinctishFinalize(Reducer *parent, void *instance) {
103   distinctishCounter *ctr = instance;
104   return RS_NumVal((uint64_t)hll_count(&ctr->hll));
105 }
106 
distinctishFreeInstance(Reducer * r,void * p)107 static void distinctishFreeInstance(Reducer *r, void *p) {
108   distinctishCounter *ctr = p;
109   hll_destroy(&ctr->hll);
110 }
111 
112 /** Serialized HLL format */
113 typedef struct __attribute__((packed)) {
114   uint32_t flags;  // Currently unused
115   uint8_t bits;
116   // uint32_t size -- NOTE - always 1<<bits
117 } HLLSerializedHeader;
118 
hllFinalize(Reducer * parent,void * ctx)119 static RSValue *hllFinalize(Reducer *parent, void *ctx) {
120   distinctishCounter *ctr = ctx;
121 
122   // Serialize field map.
123   HLLSerializedHeader hdr = {.flags = 0, .bits = ctr->hll.bits};
124   char *str = rm_malloc(sizeof(hdr) + ctr->hll.size);
125   size_t hdrsize = sizeof(hdr);
126   memcpy(str, &hdr, hdrsize);
127   memcpy(str + hdrsize, ctr->hll.registers, ctr->hll.size);
128   RSValue *ret = RS_StringVal(str, sizeof(hdr) + ctr->hll.size);
129   return ret;
130 }
131 
newHllCommon(const ReducerOptions * options,int isRaw)132 static Reducer *newHllCommon(const ReducerOptions *options, int isRaw) {
133   Reducer *r = rm_calloc(1, sizeof(*r));
134   if (!ReducerOpts_GetKey(options, &r->srckey)) {
135     rm_free(r);
136     return NULL;
137   }
138   r->Add = distinctishAdd;
139   r->Free = Reducer_GenericFree;
140   r->FreeInstance = distinctishFreeInstance;
141   r->NewInstance = distinctishNewInstance;
142 
143   if (isRaw) {
144     r->reducerId = REDUCER_T_HLL;
145     r->Finalize = hllFinalize;
146   } else {
147     r->reducerId = REDUCER_T_DISTINCTISH;
148     r->Finalize = distinctishFinalize;
149   }
150   return r;
151 }
152 
RDCRCountDistinctish_New(const ReducerOptions * options)153 Reducer *RDCRCountDistinctish_New(const ReducerOptions *options) {
154   return newHllCommon(options, 0);
155 }
156 
RDCRHLL_New(const ReducerOptions * options)157 Reducer *RDCRHLL_New(const ReducerOptions *options) {
158   return newHllCommon(options, 1);
159 }
160 
161 typedef struct {
162   const RLookupKey *srckey;
163   struct HLL hll;
164 } hllSumCtx;
165 
hllsumAdd(Reducer * r,void * ctx,const RLookupRow * srcrow)166 static int hllsumAdd(Reducer *r, void *ctx, const RLookupRow *srcrow) {
167   hllSumCtx *ctr = ctx;
168   const RSValue *val = RLookup_GetItem(ctr->srckey, srcrow);
169 
170   if (val == NULL || !RSValue_IsString(val)) {
171     // Not a string!
172     return 0;
173   }
174 
175   size_t len;
176   const char *buf = RSValue_StringPtrLen(val, &len);
177   // Verify!
178 
179   const HLLSerializedHeader *hdr = (const void *)buf;
180   const char *registers = buf + sizeof(*hdr);
181 
182   // Need at least the header size
183   if (len < sizeof(*hdr)) {
184     return 0;
185   }
186 
187   // Can't be an insane bit value - we don't want to overflow either!
188   size_t regsz = len - sizeof(*hdr);
189   if (hdr->bits > 64) {
190     return 0;
191   }
192 
193   // Expected length should be determined from bits (whose value we've also
194   // verified)
195   if (regsz != 1 << hdr->bits) {
196     return 0;
197   }
198 
199   if (ctr->hll.bits) {
200     if (hdr->bits != ctr->hll.bits) {
201       return 0;
202     }
203     // Merge!
204     struct HLL tmphll = {
205         .bits = hdr->bits, .size = 1 << hdr->bits, .registers = (uint8_t *)registers};
206     if (hll_merge(&ctr->hll, &tmphll) != 0) {
207       return 0;
208     }
209   } else {
210     // Not yet initialized - make this our first register and continue.
211     hll_init(&ctr->hll, hdr->bits);
212     memcpy(ctr->hll.registers, registers, regsz);
213   }
214   return 1;
215 }
216 
hllsumFinalize(Reducer * parent,void * ctx)217 static RSValue *hllsumFinalize(Reducer *parent, void *ctx) {
218   hllSumCtx *ctr = ctx;
219   return RS_NumVal(ctr->hll.bits ? (uint64_t)hll_count(&ctr->hll) : 0);
220 }
221 
hllsumNewInstance(Reducer * r)222 static void *hllsumNewInstance(Reducer *r) {
223   hllSumCtx *ctr = BlkAlloc_Alloc(&r->alloc, sizeof(*ctr), 1024 * sizeof(*ctr));
224   ctr->hll.bits = 0;
225   ctr->hll.registers = NULL;
226   ctr->srckey = r->srckey;
227   return ctr;
228 }
229 
hllsumFreeInstance(Reducer * r,void * p)230 static void hllsumFreeInstance(Reducer *r, void *p) {
231   hllSumCtx *ctr = p;
232   hll_destroy(&ctr->hll);
233 }
234 
RDCRHLLSum_New(const ReducerOptions * options)235 Reducer *RDCRHLLSum_New(const ReducerOptions *options) {
236   Reducer *r = rm_calloc(1, sizeof(*r));
237   if (!ReducerOpts_GetKey(options, &r->srckey)) {
238     rm_free(r);
239     return NULL;
240   }
241   r->reducerId = REDUCER_T_HLLSUM;
242   r->Add = hllsumAdd;
243   r->Finalize = hllsumFinalize;
244   r->NewInstance = hllsumNewInstance;
245   r->FreeInstance = hllsumFreeInstance;
246   r->Free = Reducer_GenericFree;
247   return r;
248 }
249