1 #include <aggregate/reducer.h>
2 #include <util/block_alloc.h>
3 #include <util/khash.h>
4 #include <util/fnv.h>
5 #include <dep/hll/hll.h>
6 #include <rmutil/sds.h>
7
8 #define HLL_PRECISION_BITS 8
9 #define INSTANCE_BLOCK_NUM 1024
10
11 static const int khid = 35;
12 KHASH_SET_INIT_INT64(khid);
13
14 typedef struct {
15 size_t count;
16 const RLookupKey *srckey;
17 khash_t(khid) * dedup;
18 } distinctCounter;
19
distinctNewInstance(Reducer * r)20 static void *distinctNewInstance(Reducer *r) {
21 BlkAlloc *ba = &r->alloc;
22 distinctCounter *ctr =
23 BlkAlloc_Alloc(ba, sizeof(*ctr), INSTANCE_BLOCK_NUM * sizeof(*ctr)); // malloc(sizeof(*ctr));
24 ctr->count = 0;
25 ctr->dedup = kh_init(khid);
26 ctr->srckey = r->srckey;
27 return ctr;
28 }
29
distinctAdd(Reducer * r,void * ctx,const RLookupRow * srcrow)30 static int distinctAdd(Reducer *r, void *ctx, const RLookupRow *srcrow) {
31 distinctCounter *ctr = ctx;
32 const RSValue *val = RLookup_GetItem(ctr->srckey, srcrow);
33 if (!val || val->t == RSValue_Null) {
34 return 1;
35 }
36
37 uint64_t hval = RSValue_Hash(val, 0);
38
39 khiter_t k = kh_get(khid, ctr->dedup, hval); // first have to get ieter
40 if (k == kh_end(ctr->dedup)) {
41 ctr->count++;
42 int ret;
43 kh_put(khid, ctr->dedup, hval, &ret);
44 }
45 return 1;
46 }
47
distinctFinalize(Reducer * parent,void * ctx)48 static RSValue *distinctFinalize(Reducer *parent, void *ctx) {
49 distinctCounter *ctr = ctx;
50 return RS_NumVal(ctr->count);
51 }
52
distinctFreeInstance(Reducer * r,void * p)53 static void distinctFreeInstance(Reducer *r, void *p) {
54 distinctCounter *ctr = p;
55 // we only destroy the hash table. The object itself is allocated from a block and needs no
56 // freeing
57 kh_destroy(khid, ctr->dedup);
58 }
59
RDCRCountDistinct_New(const ReducerOptions * options)60 Reducer *RDCRCountDistinct_New(const ReducerOptions *options) {
61 Reducer *r = rm_calloc(1, sizeof(*r));
62 if (!ReducerOpts_GetKey(options, &r->srckey)) {
63 rm_free(r);
64 return NULL;
65 }
66 r->Add = distinctAdd;
67 r->Finalize = distinctFinalize;
68 r->Free = Reducer_GenericFree;
69 r->FreeInstance = distinctFreeInstance;
70 r->NewInstance = distinctNewInstance;
71 r->reducerId = REDUCER_T_DISTINCT;
72 return r;
73 }
74
75 typedef struct {
76 struct HLL hll;
77 const RLookupKey *key;
78 } distinctishCounter;
79
distinctishNewInstance(Reducer * parent)80 static void *distinctishNewInstance(Reducer *parent) {
81 BlkAlloc *ba = &parent->alloc;
82 distinctishCounter *ctr =
83 BlkAlloc_Alloc(ba, sizeof(*ctr), 1024 * sizeof(*ctr)); // malloc(sizeof(*ctr));
84 hll_init(&ctr->hll, HLL_PRECISION_BITS);
85 ctr->key = parent->srckey;
86 return ctr;
87 }
88
distinctishAdd(Reducer * parent,void * instance,const RLookupRow * srcrow)89 static int distinctishAdd(Reducer *parent, void *instance, const RLookupRow *srcrow) {
90 distinctishCounter *ctr = instance;
91 const RSValue *val = RLookup_GetItem(ctr->key, srcrow);
92 if (!val || val->t == RSValue_Null) {
93 return 1;
94 }
95
96 uint64_t hval = RSValue_Hash(val, 0x5f61767a);
97 uint32_t val32 = (uint32_t)hval ^ (uint32_t)(hval >> 32);
98 hll_add_hash(&ctr->hll, val32);
99 return 1;
100 }
101
distinctishFinalize(Reducer * parent,void * instance)102 static RSValue *distinctishFinalize(Reducer *parent, void *instance) {
103 distinctishCounter *ctr = instance;
104 return RS_NumVal((uint64_t)hll_count(&ctr->hll));
105 }
106
distinctishFreeInstance(Reducer * r,void * p)107 static void distinctishFreeInstance(Reducer *r, void *p) {
108 distinctishCounter *ctr = p;
109 hll_destroy(&ctr->hll);
110 }
111
112 /** Serialized HLL format */
113 typedef struct __attribute__((packed)) {
114 uint32_t flags; // Currently unused
115 uint8_t bits;
116 // uint32_t size -- NOTE - always 1<<bits
117 } HLLSerializedHeader;
118
hllFinalize(Reducer * parent,void * ctx)119 static RSValue *hllFinalize(Reducer *parent, void *ctx) {
120 distinctishCounter *ctr = ctx;
121
122 // Serialize field map.
123 HLLSerializedHeader hdr = {.flags = 0, .bits = ctr->hll.bits};
124 char *str = rm_malloc(sizeof(hdr) + ctr->hll.size);
125 size_t hdrsize = sizeof(hdr);
126 memcpy(str, &hdr, hdrsize);
127 memcpy(str + hdrsize, ctr->hll.registers, ctr->hll.size);
128 RSValue *ret = RS_StringVal(str, sizeof(hdr) + ctr->hll.size);
129 return ret;
130 }
131
newHllCommon(const ReducerOptions * options,int isRaw)132 static Reducer *newHllCommon(const ReducerOptions *options, int isRaw) {
133 Reducer *r = rm_calloc(1, sizeof(*r));
134 if (!ReducerOpts_GetKey(options, &r->srckey)) {
135 rm_free(r);
136 return NULL;
137 }
138 r->Add = distinctishAdd;
139 r->Free = Reducer_GenericFree;
140 r->FreeInstance = distinctishFreeInstance;
141 r->NewInstance = distinctishNewInstance;
142
143 if (isRaw) {
144 r->reducerId = REDUCER_T_HLL;
145 r->Finalize = hllFinalize;
146 } else {
147 r->reducerId = REDUCER_T_DISTINCTISH;
148 r->Finalize = distinctishFinalize;
149 }
150 return r;
151 }
152
RDCRCountDistinctish_New(const ReducerOptions * options)153 Reducer *RDCRCountDistinctish_New(const ReducerOptions *options) {
154 return newHllCommon(options, 0);
155 }
156
RDCRHLL_New(const ReducerOptions * options)157 Reducer *RDCRHLL_New(const ReducerOptions *options) {
158 return newHllCommon(options, 1);
159 }
160
161 typedef struct {
162 const RLookupKey *srckey;
163 struct HLL hll;
164 } hllSumCtx;
165
hllsumAdd(Reducer * r,void * ctx,const RLookupRow * srcrow)166 static int hllsumAdd(Reducer *r, void *ctx, const RLookupRow *srcrow) {
167 hllSumCtx *ctr = ctx;
168 const RSValue *val = RLookup_GetItem(ctr->srckey, srcrow);
169
170 if (val == NULL || !RSValue_IsString(val)) {
171 // Not a string!
172 return 0;
173 }
174
175 size_t len;
176 const char *buf = RSValue_StringPtrLen(val, &len);
177 // Verify!
178
179 const HLLSerializedHeader *hdr = (const void *)buf;
180 const char *registers = buf + sizeof(*hdr);
181
182 // Need at least the header size
183 if (len < sizeof(*hdr)) {
184 return 0;
185 }
186
187 // Can't be an insane bit value - we don't want to overflow either!
188 size_t regsz = len - sizeof(*hdr);
189 if (hdr->bits > 64) {
190 return 0;
191 }
192
193 // Expected length should be determined from bits (whose value we've also
194 // verified)
195 if (regsz != 1 << hdr->bits) {
196 return 0;
197 }
198
199 if (ctr->hll.bits) {
200 if (hdr->bits != ctr->hll.bits) {
201 return 0;
202 }
203 // Merge!
204 struct HLL tmphll = {
205 .bits = hdr->bits, .size = 1 << hdr->bits, .registers = (uint8_t *)registers};
206 if (hll_merge(&ctr->hll, &tmphll) != 0) {
207 return 0;
208 }
209 } else {
210 // Not yet initialized - make this our first register and continue.
211 hll_init(&ctr->hll, hdr->bits);
212 memcpy(ctr->hll.registers, registers, regsz);
213 }
214 return 1;
215 }
216
hllsumFinalize(Reducer * parent,void * ctx)217 static RSValue *hllsumFinalize(Reducer *parent, void *ctx) {
218 hllSumCtx *ctr = ctx;
219 return RS_NumVal(ctr->hll.bits ? (uint64_t)hll_count(&ctr->hll) : 0);
220 }
221
hllsumNewInstance(Reducer * r)222 static void *hllsumNewInstance(Reducer *r) {
223 hllSumCtx *ctr = BlkAlloc_Alloc(&r->alloc, sizeof(*ctr), 1024 * sizeof(*ctr));
224 ctr->hll.bits = 0;
225 ctr->hll.registers = NULL;
226 ctr->srckey = r->srckey;
227 return ctr;
228 }
229
hllsumFreeInstance(Reducer * r,void * p)230 static void hllsumFreeInstance(Reducer *r, void *p) {
231 hllSumCtx *ctr = p;
232 hll_destroy(&ctr->hll);
233 }
234
RDCRHLLSum_New(const ReducerOptions * options)235 Reducer *RDCRHLLSum_New(const ReducerOptions *options) {
236 Reducer *r = rm_calloc(1, sizeof(*r));
237 if (!ReducerOpts_GetKey(options, &r->srckey)) {
238 rm_free(r);
239 return NULL;
240 }
241 r->reducerId = REDUCER_T_HLLSUM;
242 r->Add = hllsumAdd;
243 r->Finalize = hllsumFinalize;
244 r->NewInstance = hllsumNewInstance;
245 r->FreeInstance = hllsumFreeInstance;
246 r->Free = Reducer_GenericFree;
247 return r;
248 }
249