1 #include <aggregate/reducer.h>
2 
3 typedef struct {
4   size_t count;
5   double total;
6 } sumCtx;
7 
8 typedef struct {
9   Reducer base;
10   int isAvg;
11   const RLookupKey *srckey;
12 } SumReducer;
13 
14 #define BLOCK_SIZE 32 * sizeof(sumCtx)
15 
sumNewInstance(Reducer * r)16 static void *sumNewInstance(Reducer *r) {
17   sumCtx *ctx = BlkAlloc_Alloc(&r->alloc, sizeof(*ctx), BLOCK_SIZE);
18   ctx->count = 0;
19   ctx->total = 0;
20   return ctx;
21 }
22 
sumAdd(Reducer * baseparent,void * instance,const RLookupRow * row)23 static int sumAdd(Reducer *baseparent, void *instance, const RLookupRow *row) {
24   sumCtx *ctr = instance;
25   const SumReducer *parent = (const SumReducer *)baseparent;
26   ctr->count++;
27   const RSValue *v = RLookup_GetItem(parent->srckey, row);
28   if (v && v->t == RSValue_Number) {
29     ctr->total += v->numval;
30   } else {  // try to convert value to number
31     double d = 0;
32     if (RSValue_ToNumber(v, &d)) {
33       ctr->total += d;
34     }
35   }
36   return 1;
37 }
38 
sumFinalize(Reducer * baseparent,void * instance)39 static RSValue *sumFinalize(Reducer *baseparent, void *instance) {
40   sumCtx *ctr = instance;
41   SumReducer *parent = (SumReducer *)baseparent;
42   double v = 0;
43   if (parent->isAvg) {
44     if (ctr->count) {
45       v = ctr->total / ctr->count;
46     }
47   } else {
48     v = ctr->total;
49   }
50   return RS_NumVal(v);
51 }
52 
newReducerCommon(const ReducerOptions * options,int isAvg)53 static Reducer *newReducerCommon(const ReducerOptions *options, int isAvg) {
54   SumReducer *r = rm_calloc(1, sizeof(*r));
55   if (!ReducerOpts_GetKey(options, &r->srckey)) {
56     rm_free(r);
57     return NULL;
58   }
59   r->base.NewInstance = sumNewInstance;
60   r->base.Add = sumAdd;
61   r->base.Finalize = sumFinalize;
62   r->base.Free = Reducer_GenericFree;
63   r->isAvg = isAvg;
64   return &r->base;
65 }
66 
RDCRSum_New(const ReducerOptions * options)67 Reducer *RDCRSum_New(const ReducerOptions *options) {
68   return newReducerCommon(options, 0);
69 }
70 
RDCRAvg_New(const ReducerOptions * options)71 Reducer *RDCRAvg_New(const ReducerOptions *options) {
72   return newReducerCommon(options, 1);
73 }
74