1// Generated by tmpl
2// https://github.com/benbjohnson/tmpl
3//
4// DO NOT EDIT!
5// Source: functions.gen.go.tmpl
6
7package query
8
9import (
10	"math/rand"
11	"sort"
12	"time"
13)
14
15// FloatPointAggregator aggregates points to produce a single point.
16type FloatPointAggregator interface {
17	AggregateFloat(p *FloatPoint)
18}
19
20// FloatBulkPointAggregator aggregates multiple points at a time.
21type FloatBulkPointAggregator interface {
22	AggregateFloatBulk(points []FloatPoint)
23}
24
25// AggregateFloatPoints feeds a slice of FloatPoint into an
26// aggregator. If the aggregator is a FloatBulkPointAggregator, it will
27// use the AggregateBulk method.
28func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) {
29	switch a := a.(type) {
30	case FloatBulkPointAggregator:
31		a.AggregateFloatBulk(points)
32	default:
33		for _, p := range points {
34			a.AggregateFloat(&p)
35		}
36	}
37}
38
39// FloatPointEmitter produces a single point from an aggregate.
40type FloatPointEmitter interface {
41	Emit() []FloatPoint
42}
43
44// FloatReduceFunc is the function called by a FloatPoint reducer.
45type FloatReduceFunc func(prev *FloatPoint, curr *FloatPoint) (t int64, v float64, aux []interface{})
46
47// FloatFuncReducer is a reducer that reduces
48// the passed in points to a single point using a reduce function.
49type FloatFuncReducer struct {
50	prev *FloatPoint
51	fn   FloatReduceFunc
52}
53
54// NewFloatFuncReducer creates a new FloatFuncFloatReducer.
55func NewFloatFuncReducer(fn FloatReduceFunc, prev *FloatPoint) *FloatFuncReducer {
56	return &FloatFuncReducer{fn: fn, prev: prev}
57}
58
59// AggregateFloat takes a FloatPoint and invokes the reduce function with the
60// current and new point to modify the current point.
61func (r *FloatFuncReducer) AggregateFloat(p *FloatPoint) {
62	t, v, aux := r.fn(r.prev, p)
63	if r.prev == nil {
64		r.prev = &FloatPoint{}
65	}
66	r.prev.Time = t
67	r.prev.Value = v
68	r.prev.Aux = aux
69	if p.Aggregated > 1 {
70		r.prev.Aggregated += p.Aggregated
71	} else {
72		r.prev.Aggregated++
73	}
74}
75
76// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
77func (r *FloatFuncReducer) Emit() []FloatPoint {
78	return []FloatPoint{*r.prev}
79}
80
81// FloatReduceSliceFunc is the function called by a FloatPoint reducer.
82type FloatReduceSliceFunc func(a []FloatPoint) []FloatPoint
83
84// FloatSliceFuncReducer is a reducer that aggregates
85// the passed in points and then invokes the function to reduce the points when they are emitted.
86type FloatSliceFuncReducer struct {
87	points []FloatPoint
88	fn     FloatReduceSliceFunc
89}
90
91// NewFloatSliceFuncReducer creates a new FloatSliceFuncReducer.
92func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer {
93	return &FloatSliceFuncReducer{fn: fn}
94}
95
96// AggregateFloat copies the FloatPoint into the internal slice to be passed
97// to the reduce function when Emit is called.
98func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) {
99	r.points = append(r.points, *p.Clone())
100}
101
102// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
103// This is a more efficient version of calling AggregateFloat on each point.
104func (r *FloatSliceFuncReducer) AggregateFloatBulk(points []FloatPoint) {
105	r.points = append(r.points, points...)
106}
107
108// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
109// This method does not clear the points from the internal slice.
110func (r *FloatSliceFuncReducer) Emit() []FloatPoint {
111	return r.fn(r.points)
112}
113
114// FloatReduceIntegerFunc is the function called by a FloatPoint reducer.
115type FloatReduceIntegerFunc func(prev *IntegerPoint, curr *FloatPoint) (t int64, v int64, aux []interface{})
116
117// FloatFuncIntegerReducer is a reducer that reduces
118// the passed in points to a single point using a reduce function.
119type FloatFuncIntegerReducer struct {
120	prev *IntegerPoint
121	fn   FloatReduceIntegerFunc
122}
123
124// NewFloatFuncIntegerReducer creates a new FloatFuncIntegerReducer.
125func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc, prev *IntegerPoint) *FloatFuncIntegerReducer {
126	return &FloatFuncIntegerReducer{fn: fn, prev: prev}
127}
128
129// AggregateFloat takes a FloatPoint and invokes the reduce function with the
130// current and new point to modify the current point.
131func (r *FloatFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
132	t, v, aux := r.fn(r.prev, p)
133	if r.prev == nil {
134		r.prev = &IntegerPoint{}
135	}
136	r.prev.Time = t
137	r.prev.Value = v
138	r.prev.Aux = aux
139	if p.Aggregated > 1 {
140		r.prev.Aggregated += p.Aggregated
141	} else {
142		r.prev.Aggregated++
143	}
144}
145
146// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
147func (r *FloatFuncIntegerReducer) Emit() []IntegerPoint {
148	return []IntegerPoint{*r.prev}
149}
150
151// FloatReduceIntegerSliceFunc is the function called by a FloatPoint reducer.
152type FloatReduceIntegerSliceFunc func(a []FloatPoint) []IntegerPoint
153
154// FloatSliceFuncIntegerReducer is a reducer that aggregates
155// the passed in points and then invokes the function to reduce the points when they are emitted.
156type FloatSliceFuncIntegerReducer struct {
157	points []FloatPoint
158	fn     FloatReduceIntegerSliceFunc
159}
160
161// NewFloatSliceFuncIntegerReducer creates a new FloatSliceFuncIntegerReducer.
162func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSliceFuncIntegerReducer {
163	return &FloatSliceFuncIntegerReducer{fn: fn}
164}
165
166// AggregateFloat copies the FloatPoint into the internal slice to be passed
167// to the reduce function when Emit is called.
168func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
169	r.points = append(r.points, *p.Clone())
170}
171
172// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
173// This is a more efficient version of calling AggregateFloat on each point.
174func (r *FloatSliceFuncIntegerReducer) AggregateFloatBulk(points []FloatPoint) {
175	r.points = append(r.points, points...)
176}
177
178// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
179// This method does not clear the points from the internal slice.
180func (r *FloatSliceFuncIntegerReducer) Emit() []IntegerPoint {
181	return r.fn(r.points)
182}
183
184// FloatReduceUnsignedFunc is the function called by a FloatPoint reducer.
185type FloatReduceUnsignedFunc func(prev *UnsignedPoint, curr *FloatPoint) (t int64, v uint64, aux []interface{})
186
187// FloatFuncUnsignedReducer is a reducer that reduces
188// the passed in points to a single point using a reduce function.
189type FloatFuncUnsignedReducer struct {
190	prev *UnsignedPoint
191	fn   FloatReduceUnsignedFunc
192}
193
194// NewFloatFuncUnsignedReducer creates a new FloatFuncUnsignedReducer.
195func NewFloatFuncUnsignedReducer(fn FloatReduceUnsignedFunc, prev *UnsignedPoint) *FloatFuncUnsignedReducer {
196	return &FloatFuncUnsignedReducer{fn: fn, prev: prev}
197}
198
199// AggregateFloat takes a FloatPoint and invokes the reduce function with the
200// current and new point to modify the current point.
201func (r *FloatFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
202	t, v, aux := r.fn(r.prev, p)
203	if r.prev == nil {
204		r.prev = &UnsignedPoint{}
205	}
206	r.prev.Time = t
207	r.prev.Value = v
208	r.prev.Aux = aux
209	if p.Aggregated > 1 {
210		r.prev.Aggregated += p.Aggregated
211	} else {
212		r.prev.Aggregated++
213	}
214}
215
216// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
217func (r *FloatFuncUnsignedReducer) Emit() []UnsignedPoint {
218	return []UnsignedPoint{*r.prev}
219}
220
221// FloatReduceUnsignedSliceFunc is the function called by a FloatPoint reducer.
222type FloatReduceUnsignedSliceFunc func(a []FloatPoint) []UnsignedPoint
223
224// FloatSliceFuncUnsignedReducer is a reducer that aggregates
225// the passed in points and then invokes the function to reduce the points when they are emitted.
226type FloatSliceFuncUnsignedReducer struct {
227	points []FloatPoint
228	fn     FloatReduceUnsignedSliceFunc
229}
230
231// NewFloatSliceFuncUnsignedReducer creates a new FloatSliceFuncUnsignedReducer.
232func NewFloatSliceFuncUnsignedReducer(fn FloatReduceUnsignedSliceFunc) *FloatSliceFuncUnsignedReducer {
233	return &FloatSliceFuncUnsignedReducer{fn: fn}
234}
235
236// AggregateFloat copies the FloatPoint into the internal slice to be passed
237// to the reduce function when Emit is called.
238func (r *FloatSliceFuncUnsignedReducer) AggregateFloat(p *FloatPoint) {
239	r.points = append(r.points, *p.Clone())
240}
241
242// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
243// This is a more efficient version of calling AggregateFloat on each point.
244func (r *FloatSliceFuncUnsignedReducer) AggregateFloatBulk(points []FloatPoint) {
245	r.points = append(r.points, points...)
246}
247
248// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
249// This method does not clear the points from the internal slice.
250func (r *FloatSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
251	return r.fn(r.points)
252}
253
254// FloatReduceStringFunc is the function called by a FloatPoint reducer.
255type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{})
256
257// FloatFuncStringReducer is a reducer that reduces
258// the passed in points to a single point using a reduce function.
259type FloatFuncStringReducer struct {
260	prev *StringPoint
261	fn   FloatReduceStringFunc
262}
263
264// NewFloatFuncStringReducer creates a new FloatFuncStringReducer.
265func NewFloatFuncStringReducer(fn FloatReduceStringFunc, prev *StringPoint) *FloatFuncStringReducer {
266	return &FloatFuncStringReducer{fn: fn, prev: prev}
267}
268
269// AggregateFloat takes a FloatPoint and invokes the reduce function with the
270// current and new point to modify the current point.
271func (r *FloatFuncStringReducer) AggregateFloat(p *FloatPoint) {
272	t, v, aux := r.fn(r.prev, p)
273	if r.prev == nil {
274		r.prev = &StringPoint{}
275	}
276	r.prev.Time = t
277	r.prev.Value = v
278	r.prev.Aux = aux
279	if p.Aggregated > 1 {
280		r.prev.Aggregated += p.Aggregated
281	} else {
282		r.prev.Aggregated++
283	}
284}
285
286// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
287func (r *FloatFuncStringReducer) Emit() []StringPoint {
288	return []StringPoint{*r.prev}
289}
290
291// FloatReduceStringSliceFunc is the function called by a FloatPoint reducer.
292type FloatReduceStringSliceFunc func(a []FloatPoint) []StringPoint
293
294// FloatSliceFuncStringReducer is a reducer that aggregates
295// the passed in points and then invokes the function to reduce the points when they are emitted.
296type FloatSliceFuncStringReducer struct {
297	points []FloatPoint
298	fn     FloatReduceStringSliceFunc
299}
300
301// NewFloatSliceFuncStringReducer creates a new FloatSliceFuncStringReducer.
302func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFuncStringReducer {
303	return &FloatSliceFuncStringReducer{fn: fn}
304}
305
306// AggregateFloat copies the FloatPoint into the internal slice to be passed
307// to the reduce function when Emit is called.
308func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) {
309	r.points = append(r.points, *p.Clone())
310}
311
312// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
313// This is a more efficient version of calling AggregateFloat on each point.
314func (r *FloatSliceFuncStringReducer) AggregateFloatBulk(points []FloatPoint) {
315	r.points = append(r.points, points...)
316}
317
318// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
319// This method does not clear the points from the internal slice.
320func (r *FloatSliceFuncStringReducer) Emit() []StringPoint {
321	return r.fn(r.points)
322}
323
324// FloatReduceBooleanFunc is the function called by a FloatPoint reducer.
325type FloatReduceBooleanFunc func(prev *BooleanPoint, curr *FloatPoint) (t int64, v bool, aux []interface{})
326
327// FloatFuncBooleanReducer is a reducer that reduces
328// the passed in points to a single point using a reduce function.
329type FloatFuncBooleanReducer struct {
330	prev *BooleanPoint
331	fn   FloatReduceBooleanFunc
332}
333
334// NewFloatFuncBooleanReducer creates a new FloatFuncBooleanReducer.
335func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc, prev *BooleanPoint) *FloatFuncBooleanReducer {
336	return &FloatFuncBooleanReducer{fn: fn, prev: prev}
337}
338
339// AggregateFloat takes a FloatPoint and invokes the reduce function with the
340// current and new point to modify the current point.
341func (r *FloatFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
342	t, v, aux := r.fn(r.prev, p)
343	if r.prev == nil {
344		r.prev = &BooleanPoint{}
345	}
346	r.prev.Time = t
347	r.prev.Value = v
348	r.prev.Aux = aux
349	if p.Aggregated > 1 {
350		r.prev.Aggregated += p.Aggregated
351	} else {
352		r.prev.Aggregated++
353	}
354}
355
356// Emit emits the point that was generated when reducing the points fed in with AggregateFloat.
357func (r *FloatFuncBooleanReducer) Emit() []BooleanPoint {
358	return []BooleanPoint{*r.prev}
359}
360
361// FloatReduceBooleanSliceFunc is the function called by a FloatPoint reducer.
362type FloatReduceBooleanSliceFunc func(a []FloatPoint) []BooleanPoint
363
364// FloatSliceFuncBooleanReducer is a reducer that aggregates
365// the passed in points and then invokes the function to reduce the points when they are emitted.
366type FloatSliceFuncBooleanReducer struct {
367	points []FloatPoint
368	fn     FloatReduceBooleanSliceFunc
369}
370
371// NewFloatSliceFuncBooleanReducer creates a new FloatSliceFuncBooleanReducer.
372func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSliceFuncBooleanReducer {
373	return &FloatSliceFuncBooleanReducer{fn: fn}
374}
375
376// AggregateFloat copies the FloatPoint into the internal slice to be passed
377// to the reduce function when Emit is called.
378func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
379	r.points = append(r.points, *p.Clone())
380}
381
382// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
383// This is a more efficient version of calling AggregateFloat on each point.
384func (r *FloatSliceFuncBooleanReducer) AggregateFloatBulk(points []FloatPoint) {
385	r.points = append(r.points, points...)
386}
387
388// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
389// This method does not clear the points from the internal slice.
390func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint {
391	return r.fn(r.points)
392}
393
394// FloatDistinctReducer returns the distinct points in a series.
395type FloatDistinctReducer struct {
396	m map[float64]FloatPoint
397}
398
399// NewFloatDistinctReducer creates a new FloatDistinctReducer.
400func NewFloatDistinctReducer() *FloatDistinctReducer {
401	return &FloatDistinctReducer{m: make(map[float64]FloatPoint)}
402}
403
404// AggregateFloat aggregates a point into the reducer.
405func (r *FloatDistinctReducer) AggregateFloat(p *FloatPoint) {
406	if _, ok := r.m[p.Value]; !ok {
407		r.m[p.Value] = *p
408	}
409}
410
411// Emit emits the distinct points that have been aggregated into the reducer.
412func (r *FloatDistinctReducer) Emit() []FloatPoint {
413	points := make([]FloatPoint, 0, len(r.m))
414	for _, p := range r.m {
415		points = append(points, FloatPoint{Time: p.Time, Value: p.Value})
416	}
417	sort.Sort(floatPoints(points))
418	return points
419}
420
421// FloatElapsedReducer calculates the elapsed of the aggregated points.
422type FloatElapsedReducer struct {
423	unitConversion int64
424	prev           FloatPoint
425	curr           FloatPoint
426}
427
428// NewFloatElapsedReducer creates a new FloatElapsedReducer.
429func NewFloatElapsedReducer(interval Interval) *FloatElapsedReducer {
430	return &FloatElapsedReducer{
431		unitConversion: int64(interval.Duration),
432		prev:           FloatPoint{Nil: true},
433		curr:           FloatPoint{Nil: true},
434	}
435}
436
437// AggregateFloat aggregates a point into the reducer and updates the current window.
438func (r *FloatElapsedReducer) AggregateFloat(p *FloatPoint) {
439	r.prev = r.curr
440	r.curr = *p
441}
442
443// Emit emits the elapsed of the reducer at the current point.
444func (r *FloatElapsedReducer) Emit() []IntegerPoint {
445	if !r.prev.Nil {
446		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
447		return []IntegerPoint{
448			{Time: r.curr.Time, Value: elapsed},
449		}
450	}
451	return nil
452}
453
454// FloatSampleReducer implements a reservoir sampling to calculate a random subset of points
455type FloatSampleReducer struct {
456	count int        // how many points we've iterated over
457	rng   *rand.Rand // random number generator for each reducer
458
459	points floatPoints // the reservoir
460}
461
462// NewFloatSampleReducer creates a new FloatSampleReducer
463func NewFloatSampleReducer(size int) *FloatSampleReducer {
464	return &FloatSampleReducer{
465		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
466		points: make(floatPoints, size),
467	}
468}
469
470// AggregateFloat aggregates a point into the reducer.
471func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) {
472	r.count++
473	// Fill the reservoir with the first n points
474	if r.count-1 < len(r.points) {
475		p.CopyTo(&r.points[r.count-1])
476		return
477	}
478
479	// Generate a random integer between 1 and the count and
480	// if that number is less than the length of the slice
481	// replace the point at that index rnd with p.
482	rnd := r.rng.Intn(r.count)
483	if rnd < len(r.points) {
484		p.CopyTo(&r.points[rnd])
485	}
486}
487
488// Emit emits the reservoir sample as many points.
489func (r *FloatSampleReducer) Emit() []FloatPoint {
490	min := len(r.points)
491	if r.count < min {
492		min = r.count
493	}
494	pts := r.points[:min]
495	sort.Sort(pts)
496	return pts
497}
498
499// IntegerPointAggregator aggregates points to produce a single point.
500type IntegerPointAggregator interface {
501	AggregateInteger(p *IntegerPoint)
502}
503
504// IntegerBulkPointAggregator aggregates multiple points at a time.
505type IntegerBulkPointAggregator interface {
506	AggregateIntegerBulk(points []IntegerPoint)
507}
508
509// AggregateIntegerPoints feeds a slice of IntegerPoint into an
510// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will
511// use the AggregateBulk method.
512func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) {
513	switch a := a.(type) {
514	case IntegerBulkPointAggregator:
515		a.AggregateIntegerBulk(points)
516	default:
517		for _, p := range points {
518			a.AggregateInteger(&p)
519		}
520	}
521}
522
523// IntegerPointEmitter produces a single point from an aggregate.
524type IntegerPointEmitter interface {
525	Emit() []IntegerPoint
526}
527
528// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer.
529type IntegerReduceFloatFunc func(prev *FloatPoint, curr *IntegerPoint) (t int64, v float64, aux []interface{})
530
531// IntegerFuncFloatReducer is a reducer that reduces
532// the passed in points to a single point using a reduce function.
533type IntegerFuncFloatReducer struct {
534	prev *FloatPoint
535	fn   IntegerReduceFloatFunc
536}
537
538// NewIntegerFuncFloatReducer creates a new IntegerFuncFloatReducer.
539func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc, prev *FloatPoint) *IntegerFuncFloatReducer {
540	return &IntegerFuncFloatReducer{fn: fn, prev: prev}
541}
542
543// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
544// current and new point to modify the current point.
545func (r *IntegerFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
546	t, v, aux := r.fn(r.prev, p)
547	if r.prev == nil {
548		r.prev = &FloatPoint{}
549	}
550	r.prev.Time = t
551	r.prev.Value = v
552	r.prev.Aux = aux
553	if p.Aggregated > 1 {
554		r.prev.Aggregated += p.Aggregated
555	} else {
556		r.prev.Aggregated++
557	}
558}
559
560// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
561func (r *IntegerFuncFloatReducer) Emit() []FloatPoint {
562	return []FloatPoint{*r.prev}
563}
564
565// IntegerReduceFloatSliceFunc is the function called by a IntegerPoint reducer.
566type IntegerReduceFloatSliceFunc func(a []IntegerPoint) []FloatPoint
567
568// IntegerSliceFuncFloatReducer is a reducer that aggregates
569// the passed in points and then invokes the function to reduce the points when they are emitted.
570type IntegerSliceFuncFloatReducer struct {
571	points []IntegerPoint
572	fn     IntegerReduceFloatSliceFunc
573}
574
575// NewIntegerSliceFuncFloatReducer creates a new IntegerSliceFuncFloatReducer.
576func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSliceFuncFloatReducer {
577	return &IntegerSliceFuncFloatReducer{fn: fn}
578}
579
580// AggregateInteger copies the IntegerPoint into the internal slice to be passed
581// to the reduce function when Emit is called.
582func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
583	r.points = append(r.points, *p.Clone())
584}
585
586// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
587// This is a more efficient version of calling AggregateInteger on each point.
588func (r *IntegerSliceFuncFloatReducer) AggregateIntegerBulk(points []IntegerPoint) {
589	r.points = append(r.points, points...)
590}
591
592// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
593// This method does not clear the points from the internal slice.
594func (r *IntegerSliceFuncFloatReducer) Emit() []FloatPoint {
595	return r.fn(r.points)
596}
597
598// IntegerReduceFunc is the function called by a IntegerPoint reducer.
599type IntegerReduceFunc func(prev *IntegerPoint, curr *IntegerPoint) (t int64, v int64, aux []interface{})
600
601// IntegerFuncReducer is a reducer that reduces
602// the passed in points to a single point using a reduce function.
603type IntegerFuncReducer struct {
604	prev *IntegerPoint
605	fn   IntegerReduceFunc
606}
607
608// NewIntegerFuncReducer creates a new IntegerFuncIntegerReducer.
609func NewIntegerFuncReducer(fn IntegerReduceFunc, prev *IntegerPoint) *IntegerFuncReducer {
610	return &IntegerFuncReducer{fn: fn, prev: prev}
611}
612
613// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
614// current and new point to modify the current point.
615func (r *IntegerFuncReducer) AggregateInteger(p *IntegerPoint) {
616	t, v, aux := r.fn(r.prev, p)
617	if r.prev == nil {
618		r.prev = &IntegerPoint{}
619	}
620	r.prev.Time = t
621	r.prev.Value = v
622	r.prev.Aux = aux
623	if p.Aggregated > 1 {
624		r.prev.Aggregated += p.Aggregated
625	} else {
626		r.prev.Aggregated++
627	}
628}
629
630// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
631func (r *IntegerFuncReducer) Emit() []IntegerPoint {
632	return []IntegerPoint{*r.prev}
633}
634
635// IntegerReduceSliceFunc is the function called by a IntegerPoint reducer.
636type IntegerReduceSliceFunc func(a []IntegerPoint) []IntegerPoint
637
638// IntegerSliceFuncReducer is a reducer that aggregates
639// the passed in points and then invokes the function to reduce the points when they are emitted.
640type IntegerSliceFuncReducer struct {
641	points []IntegerPoint
642	fn     IntegerReduceSliceFunc
643}
644
645// NewIntegerSliceFuncReducer creates a new IntegerSliceFuncReducer.
646func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncReducer {
647	return &IntegerSliceFuncReducer{fn: fn}
648}
649
650// AggregateInteger copies the IntegerPoint into the internal slice to be passed
651// to the reduce function when Emit is called.
652func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) {
653	r.points = append(r.points, *p.Clone())
654}
655
656// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
657// This is a more efficient version of calling AggregateInteger on each point.
658func (r *IntegerSliceFuncReducer) AggregateIntegerBulk(points []IntegerPoint) {
659	r.points = append(r.points, points...)
660}
661
662// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
663// This method does not clear the points from the internal slice.
664func (r *IntegerSliceFuncReducer) Emit() []IntegerPoint {
665	return r.fn(r.points)
666}
667
668// IntegerReduceUnsignedFunc is the function called by a IntegerPoint reducer.
669type IntegerReduceUnsignedFunc func(prev *UnsignedPoint, curr *IntegerPoint) (t int64, v uint64, aux []interface{})
670
671// IntegerFuncUnsignedReducer is a reducer that reduces
672// the passed in points to a single point using a reduce function.
673type IntegerFuncUnsignedReducer struct {
674	prev *UnsignedPoint
675	fn   IntegerReduceUnsignedFunc
676}
677
678// NewIntegerFuncUnsignedReducer creates a new IntegerFuncUnsignedReducer.
679func NewIntegerFuncUnsignedReducer(fn IntegerReduceUnsignedFunc, prev *UnsignedPoint) *IntegerFuncUnsignedReducer {
680	return &IntegerFuncUnsignedReducer{fn: fn, prev: prev}
681}
682
683// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
684// current and new point to modify the current point.
685func (r *IntegerFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
686	t, v, aux := r.fn(r.prev, p)
687	if r.prev == nil {
688		r.prev = &UnsignedPoint{}
689	}
690	r.prev.Time = t
691	r.prev.Value = v
692	r.prev.Aux = aux
693	if p.Aggregated > 1 {
694		r.prev.Aggregated += p.Aggregated
695	} else {
696		r.prev.Aggregated++
697	}
698}
699
700// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
701func (r *IntegerFuncUnsignedReducer) Emit() []UnsignedPoint {
702	return []UnsignedPoint{*r.prev}
703}
704
705// IntegerReduceUnsignedSliceFunc is the function called by a IntegerPoint reducer.
706type IntegerReduceUnsignedSliceFunc func(a []IntegerPoint) []UnsignedPoint
707
708// IntegerSliceFuncUnsignedReducer is a reducer that aggregates
709// the passed in points and then invokes the function to reduce the points when they are emitted.
710type IntegerSliceFuncUnsignedReducer struct {
711	points []IntegerPoint
712	fn     IntegerReduceUnsignedSliceFunc
713}
714
715// NewIntegerSliceFuncUnsignedReducer creates a new IntegerSliceFuncUnsignedReducer.
716func NewIntegerSliceFuncUnsignedReducer(fn IntegerReduceUnsignedSliceFunc) *IntegerSliceFuncUnsignedReducer {
717	return &IntegerSliceFuncUnsignedReducer{fn: fn}
718}
719
720// AggregateInteger copies the IntegerPoint into the internal slice to be passed
721// to the reduce function when Emit is called.
722func (r *IntegerSliceFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) {
723	r.points = append(r.points, *p.Clone())
724}
725
726// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
727// This is a more efficient version of calling AggregateInteger on each point.
728func (r *IntegerSliceFuncUnsignedReducer) AggregateIntegerBulk(points []IntegerPoint) {
729	r.points = append(r.points, points...)
730}
731
732// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
733// This method does not clear the points from the internal slice.
734func (r *IntegerSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
735	return r.fn(r.points)
736}
737
738// IntegerReduceStringFunc is the function called by a IntegerPoint reducer.
739type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{})
740
741// IntegerFuncStringReducer is a reducer that reduces
742// the passed in points to a single point using a reduce function.
743type IntegerFuncStringReducer struct {
744	prev *StringPoint
745	fn   IntegerReduceStringFunc
746}
747
748// NewIntegerFuncStringReducer creates a new IntegerFuncStringReducer.
749func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc, prev *StringPoint) *IntegerFuncStringReducer {
750	return &IntegerFuncStringReducer{fn: fn, prev: prev}
751}
752
753// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
754// current and new point to modify the current point.
755func (r *IntegerFuncStringReducer) AggregateInteger(p *IntegerPoint) {
756	t, v, aux := r.fn(r.prev, p)
757	if r.prev == nil {
758		r.prev = &StringPoint{}
759	}
760	r.prev.Time = t
761	r.prev.Value = v
762	r.prev.Aux = aux
763	if p.Aggregated > 1 {
764		r.prev.Aggregated += p.Aggregated
765	} else {
766		r.prev.Aggregated++
767	}
768}
769
770// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
771func (r *IntegerFuncStringReducer) Emit() []StringPoint {
772	return []StringPoint{*r.prev}
773}
774
775// IntegerReduceStringSliceFunc is the function called by a IntegerPoint reducer.
776type IntegerReduceStringSliceFunc func(a []IntegerPoint) []StringPoint
777
778// IntegerSliceFuncStringReducer is a reducer that aggregates
779// the passed in points and then invokes the function to reduce the points when they are emitted.
780type IntegerSliceFuncStringReducer struct {
781	points []IntegerPoint
782	fn     IntegerReduceStringSliceFunc
783}
784
785// NewIntegerSliceFuncStringReducer creates a new IntegerSliceFuncStringReducer.
786func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerSliceFuncStringReducer {
787	return &IntegerSliceFuncStringReducer{fn: fn}
788}
789
790// AggregateInteger copies the IntegerPoint into the internal slice to be passed
791// to the reduce function when Emit is called.
792func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) {
793	r.points = append(r.points, *p.Clone())
794}
795
796// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
797// This is a more efficient version of calling AggregateInteger on each point.
798func (r *IntegerSliceFuncStringReducer) AggregateIntegerBulk(points []IntegerPoint) {
799	r.points = append(r.points, points...)
800}
801
802// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
803// This method does not clear the points from the internal slice.
804func (r *IntegerSliceFuncStringReducer) Emit() []StringPoint {
805	return r.fn(r.points)
806}
807
808// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer.
809type IntegerReduceBooleanFunc func(prev *BooleanPoint, curr *IntegerPoint) (t int64, v bool, aux []interface{})
810
811// IntegerFuncBooleanReducer is a reducer that reduces
812// the passed in points to a single point using a reduce function.
813type IntegerFuncBooleanReducer struct {
814	prev *BooleanPoint
815	fn   IntegerReduceBooleanFunc
816}
817
818// NewIntegerFuncBooleanReducer creates a new IntegerFuncBooleanReducer.
819func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc, prev *BooleanPoint) *IntegerFuncBooleanReducer {
820	return &IntegerFuncBooleanReducer{fn: fn, prev: prev}
821}
822
823// AggregateInteger takes a IntegerPoint and invokes the reduce function with the
824// current and new point to modify the current point.
825func (r *IntegerFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
826	t, v, aux := r.fn(r.prev, p)
827	if r.prev == nil {
828		r.prev = &BooleanPoint{}
829	}
830	r.prev.Time = t
831	r.prev.Value = v
832	r.prev.Aux = aux
833	if p.Aggregated > 1 {
834		r.prev.Aggregated += p.Aggregated
835	} else {
836		r.prev.Aggregated++
837	}
838}
839
840// Emit emits the point that was generated when reducing the points fed in with AggregateInteger.
841func (r *IntegerFuncBooleanReducer) Emit() []BooleanPoint {
842	return []BooleanPoint{*r.prev}
843}
844
845// IntegerReduceBooleanSliceFunc is the function called by a IntegerPoint reducer.
846type IntegerReduceBooleanSliceFunc func(a []IntegerPoint) []BooleanPoint
847
848// IntegerSliceFuncBooleanReducer is a reducer that aggregates
849// the passed in points and then invokes the function to reduce the points when they are emitted.
850type IntegerSliceFuncBooleanReducer struct {
851	points []IntegerPoint
852	fn     IntegerReduceBooleanSliceFunc
853}
854
855// NewIntegerSliceFuncBooleanReducer creates a new IntegerSliceFuncBooleanReducer.
856func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *IntegerSliceFuncBooleanReducer {
857	return &IntegerSliceFuncBooleanReducer{fn: fn}
858}
859
860// AggregateInteger copies the IntegerPoint into the internal slice to be passed
861// to the reduce function when Emit is called.
862func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
863	r.points = append(r.points, *p.Clone())
864}
865
866// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
867// This is a more efficient version of calling AggregateInteger on each point.
868func (r *IntegerSliceFuncBooleanReducer) AggregateIntegerBulk(points []IntegerPoint) {
869	r.points = append(r.points, points...)
870}
871
872// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
873// This method does not clear the points from the internal slice.
874func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint {
875	return r.fn(r.points)
876}
877
878// IntegerDistinctReducer returns the distinct points in a series.
879type IntegerDistinctReducer struct {
880	m map[int64]IntegerPoint
881}
882
883// NewIntegerDistinctReducer creates a new IntegerDistinctReducer.
884func NewIntegerDistinctReducer() *IntegerDistinctReducer {
885	return &IntegerDistinctReducer{m: make(map[int64]IntegerPoint)}
886}
887
888// AggregateInteger aggregates a point into the reducer.
889func (r *IntegerDistinctReducer) AggregateInteger(p *IntegerPoint) {
890	if _, ok := r.m[p.Value]; !ok {
891		r.m[p.Value] = *p
892	}
893}
894
895// Emit emits the distinct points that have been aggregated into the reducer.
896func (r *IntegerDistinctReducer) Emit() []IntegerPoint {
897	points := make([]IntegerPoint, 0, len(r.m))
898	for _, p := range r.m {
899		points = append(points, IntegerPoint{Time: p.Time, Value: p.Value})
900	}
901	sort.Sort(integerPoints(points))
902	return points
903}
904
905// IntegerElapsedReducer calculates the elapsed of the aggregated points.
906type IntegerElapsedReducer struct {
907	unitConversion int64
908	prev           IntegerPoint
909	curr           IntegerPoint
910}
911
912// NewIntegerElapsedReducer creates a new IntegerElapsedReducer.
913func NewIntegerElapsedReducer(interval Interval) *IntegerElapsedReducer {
914	return &IntegerElapsedReducer{
915		unitConversion: int64(interval.Duration),
916		prev:           IntegerPoint{Nil: true},
917		curr:           IntegerPoint{Nil: true},
918	}
919}
920
921// AggregateInteger aggregates a point into the reducer and updates the current window.
922func (r *IntegerElapsedReducer) AggregateInteger(p *IntegerPoint) {
923	r.prev = r.curr
924	r.curr = *p
925}
926
927// Emit emits the elapsed of the reducer at the current point.
928func (r *IntegerElapsedReducer) Emit() []IntegerPoint {
929	if !r.prev.Nil {
930		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
931		return []IntegerPoint{
932			{Time: r.curr.Time, Value: elapsed},
933		}
934	}
935	return nil
936}
937
938// IntegerSampleReducer implements a reservoir sampling to calculate a random subset of points
939type IntegerSampleReducer struct {
940	count int        // how many points we've iterated over
941	rng   *rand.Rand // random number generator for each reducer
942
943	points integerPoints // the reservoir
944}
945
946// NewIntegerSampleReducer creates a new IntegerSampleReducer
947func NewIntegerSampleReducer(size int) *IntegerSampleReducer {
948	return &IntegerSampleReducer{
949		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
950		points: make(integerPoints, size),
951	}
952}
953
954// AggregateInteger aggregates a point into the reducer.
955func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) {
956	r.count++
957	// Fill the reservoir with the first n points
958	if r.count-1 < len(r.points) {
959		p.CopyTo(&r.points[r.count-1])
960		return
961	}
962
963	// Generate a random integer between 1 and the count and
964	// if that number is less than the length of the slice
965	// replace the point at that index rnd with p.
966	rnd := r.rng.Intn(r.count)
967	if rnd < len(r.points) {
968		p.CopyTo(&r.points[rnd])
969	}
970}
971
972// Emit emits the reservoir sample as many points.
973func (r *IntegerSampleReducer) Emit() []IntegerPoint {
974	min := len(r.points)
975	if r.count < min {
976		min = r.count
977	}
978	pts := r.points[:min]
979	sort.Sort(pts)
980	return pts
981}
982
983// UnsignedPointAggregator aggregates points to produce a single point.
984type UnsignedPointAggregator interface {
985	AggregateUnsigned(p *UnsignedPoint)
986}
987
988// UnsignedBulkPointAggregator aggregates multiple points at a time.
989type UnsignedBulkPointAggregator interface {
990	AggregateUnsignedBulk(points []UnsignedPoint)
991}
992
993// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an
994// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will
995// use the AggregateBulk method.
996func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) {
997	switch a := a.(type) {
998	case UnsignedBulkPointAggregator:
999		a.AggregateUnsignedBulk(points)
1000	default:
1001		for _, p := range points {
1002			a.AggregateUnsigned(&p)
1003		}
1004	}
1005}
1006
1007// UnsignedPointEmitter produces a single point from an aggregate.
1008type UnsignedPointEmitter interface {
1009	Emit() []UnsignedPoint
1010}
1011
1012// UnsignedReduceFloatFunc is the function called by a UnsignedPoint reducer.
1013type UnsignedReduceFloatFunc func(prev *FloatPoint, curr *UnsignedPoint) (t int64, v float64, aux []interface{})
1014
1015// UnsignedFuncFloatReducer is a reducer that reduces
1016// the passed in points to a single point using a reduce function.
1017type UnsignedFuncFloatReducer struct {
1018	prev *FloatPoint
1019	fn   UnsignedReduceFloatFunc
1020}
1021
1022// NewUnsignedFuncFloatReducer creates a new UnsignedFuncFloatReducer.
1023func NewUnsignedFuncFloatReducer(fn UnsignedReduceFloatFunc, prev *FloatPoint) *UnsignedFuncFloatReducer {
1024	return &UnsignedFuncFloatReducer{fn: fn, prev: prev}
1025}
1026
1027// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
1028// current and new point to modify the current point.
1029func (r *UnsignedFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
1030	t, v, aux := r.fn(r.prev, p)
1031	if r.prev == nil {
1032		r.prev = &FloatPoint{}
1033	}
1034	r.prev.Time = t
1035	r.prev.Value = v
1036	r.prev.Aux = aux
1037	if p.Aggregated > 1 {
1038		r.prev.Aggregated += p.Aggregated
1039	} else {
1040		r.prev.Aggregated++
1041	}
1042}
1043
1044// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
1045func (r *UnsignedFuncFloatReducer) Emit() []FloatPoint {
1046	return []FloatPoint{*r.prev}
1047}
1048
1049// UnsignedReduceFloatSliceFunc is the function called by a UnsignedPoint reducer.
1050type UnsignedReduceFloatSliceFunc func(a []UnsignedPoint) []FloatPoint
1051
1052// UnsignedSliceFuncFloatReducer is a reducer that aggregates
1053// the passed in points and then invokes the function to reduce the points when they are emitted.
1054type UnsignedSliceFuncFloatReducer struct {
1055	points []UnsignedPoint
1056	fn     UnsignedReduceFloatSliceFunc
1057}
1058
1059// NewUnsignedSliceFuncFloatReducer creates a new UnsignedSliceFuncFloatReducer.
1060func NewUnsignedSliceFuncFloatReducer(fn UnsignedReduceFloatSliceFunc) *UnsignedSliceFuncFloatReducer {
1061	return &UnsignedSliceFuncFloatReducer{fn: fn}
1062}
1063
1064// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
1065// to the reduce function when Emit is called.
1066func (r *UnsignedSliceFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) {
1067	r.points = append(r.points, *p.Clone())
1068}
1069
1070// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
1071// This is a more efficient version of calling AggregateUnsigned on each point.
1072func (r *UnsignedSliceFuncFloatReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
1073	r.points = append(r.points, points...)
1074}
1075
1076// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1077// This method does not clear the points from the internal slice.
1078func (r *UnsignedSliceFuncFloatReducer) Emit() []FloatPoint {
1079	return r.fn(r.points)
1080}
1081
1082// UnsignedReduceIntegerFunc is the function called by a UnsignedPoint reducer.
1083type UnsignedReduceIntegerFunc func(prev *IntegerPoint, curr *UnsignedPoint) (t int64, v int64, aux []interface{})
1084
1085// UnsignedFuncIntegerReducer is a reducer that reduces
1086// the passed in points to a single point using a reduce function.
1087type UnsignedFuncIntegerReducer struct {
1088	prev *IntegerPoint
1089	fn   UnsignedReduceIntegerFunc
1090}
1091
1092// NewUnsignedFuncIntegerReducer creates a new UnsignedFuncIntegerReducer.
1093func NewUnsignedFuncIntegerReducer(fn UnsignedReduceIntegerFunc, prev *IntegerPoint) *UnsignedFuncIntegerReducer {
1094	return &UnsignedFuncIntegerReducer{fn: fn, prev: prev}
1095}
1096
1097// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
1098// current and new point to modify the current point.
1099func (r *UnsignedFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
1100	t, v, aux := r.fn(r.prev, p)
1101	if r.prev == nil {
1102		r.prev = &IntegerPoint{}
1103	}
1104	r.prev.Time = t
1105	r.prev.Value = v
1106	r.prev.Aux = aux
1107	if p.Aggregated > 1 {
1108		r.prev.Aggregated += p.Aggregated
1109	} else {
1110		r.prev.Aggregated++
1111	}
1112}
1113
1114// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
1115func (r *UnsignedFuncIntegerReducer) Emit() []IntegerPoint {
1116	return []IntegerPoint{*r.prev}
1117}
1118
1119// UnsignedReduceIntegerSliceFunc is the function called by a UnsignedPoint reducer.
1120type UnsignedReduceIntegerSliceFunc func(a []UnsignedPoint) []IntegerPoint
1121
1122// UnsignedSliceFuncIntegerReducer is a reducer that aggregates
1123// the passed in points and then invokes the function to reduce the points when they are emitted.
1124type UnsignedSliceFuncIntegerReducer struct {
1125	points []UnsignedPoint
1126	fn     UnsignedReduceIntegerSliceFunc
1127}
1128
1129// NewUnsignedSliceFuncIntegerReducer creates a new UnsignedSliceFuncIntegerReducer.
1130func NewUnsignedSliceFuncIntegerReducer(fn UnsignedReduceIntegerSliceFunc) *UnsignedSliceFuncIntegerReducer {
1131	return &UnsignedSliceFuncIntegerReducer{fn: fn}
1132}
1133
1134// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
1135// to the reduce function when Emit is called.
1136func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) {
1137	r.points = append(r.points, *p.Clone())
1138}
1139
1140// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
1141// This is a more efficient version of calling AggregateUnsigned on each point.
1142func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
1143	r.points = append(r.points, points...)
1144}
1145
1146// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1147// This method does not clear the points from the internal slice.
1148func (r *UnsignedSliceFuncIntegerReducer) Emit() []IntegerPoint {
1149	return r.fn(r.points)
1150}
1151
1152// UnsignedReduceFunc is the function called by a UnsignedPoint reducer.
1153type UnsignedReduceFunc func(prev *UnsignedPoint, curr *UnsignedPoint) (t int64, v uint64, aux []interface{})
1154
1155// UnsignedFuncReducer is a reducer that reduces
1156// the passed in points to a single point using a reduce function.
1157type UnsignedFuncReducer struct {
1158	prev *UnsignedPoint
1159	fn   UnsignedReduceFunc
1160}
1161
1162// NewUnsignedFuncReducer creates a new UnsignedFuncUnsignedReducer.
1163func NewUnsignedFuncReducer(fn UnsignedReduceFunc, prev *UnsignedPoint) *UnsignedFuncReducer {
1164	return &UnsignedFuncReducer{fn: fn, prev: prev}
1165}
1166
1167// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
1168// current and new point to modify the current point.
1169func (r *UnsignedFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
1170	t, v, aux := r.fn(r.prev, p)
1171	if r.prev == nil {
1172		r.prev = &UnsignedPoint{}
1173	}
1174	r.prev.Time = t
1175	r.prev.Value = v
1176	r.prev.Aux = aux
1177	if p.Aggregated > 1 {
1178		r.prev.Aggregated += p.Aggregated
1179	} else {
1180		r.prev.Aggregated++
1181	}
1182}
1183
1184// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
1185func (r *UnsignedFuncReducer) Emit() []UnsignedPoint {
1186	return []UnsignedPoint{*r.prev}
1187}
1188
1189// UnsignedReduceSliceFunc is the function called by a UnsignedPoint reducer.
1190type UnsignedReduceSliceFunc func(a []UnsignedPoint) []UnsignedPoint
1191
1192// UnsignedSliceFuncReducer is a reducer that aggregates
1193// the passed in points and then invokes the function to reduce the points when they are emitted.
1194type UnsignedSliceFuncReducer struct {
1195	points []UnsignedPoint
1196	fn     UnsignedReduceSliceFunc
1197}
1198
1199// NewUnsignedSliceFuncReducer creates a new UnsignedSliceFuncReducer.
1200func NewUnsignedSliceFuncReducer(fn UnsignedReduceSliceFunc) *UnsignedSliceFuncReducer {
1201	return &UnsignedSliceFuncReducer{fn: fn}
1202}
1203
1204// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
1205// to the reduce function when Emit is called.
1206func (r *UnsignedSliceFuncReducer) AggregateUnsigned(p *UnsignedPoint) {
1207	r.points = append(r.points, *p.Clone())
1208}
1209
1210// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
1211// This is a more efficient version of calling AggregateUnsigned on each point.
1212func (r *UnsignedSliceFuncReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
1213	r.points = append(r.points, points...)
1214}
1215
1216// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1217// This method does not clear the points from the internal slice.
1218func (r *UnsignedSliceFuncReducer) Emit() []UnsignedPoint {
1219	return r.fn(r.points)
1220}
1221
1222// UnsignedReduceStringFunc is the function called by a UnsignedPoint reducer.
1223type UnsignedReduceStringFunc func(prev *StringPoint, curr *UnsignedPoint) (t int64, v string, aux []interface{})
1224
1225// UnsignedFuncStringReducer is a reducer that reduces
1226// the passed in points to a single point using a reduce function.
1227type UnsignedFuncStringReducer struct {
1228	prev *StringPoint
1229	fn   UnsignedReduceStringFunc
1230}
1231
1232// NewUnsignedFuncStringReducer creates a new UnsignedFuncStringReducer.
1233func NewUnsignedFuncStringReducer(fn UnsignedReduceStringFunc, prev *StringPoint) *UnsignedFuncStringReducer {
1234	return &UnsignedFuncStringReducer{fn: fn, prev: prev}
1235}
1236
1237// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
1238// current and new point to modify the current point.
1239func (r *UnsignedFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
1240	t, v, aux := r.fn(r.prev, p)
1241	if r.prev == nil {
1242		r.prev = &StringPoint{}
1243	}
1244	r.prev.Time = t
1245	r.prev.Value = v
1246	r.prev.Aux = aux
1247	if p.Aggregated > 1 {
1248		r.prev.Aggregated += p.Aggregated
1249	} else {
1250		r.prev.Aggregated++
1251	}
1252}
1253
1254// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
1255func (r *UnsignedFuncStringReducer) Emit() []StringPoint {
1256	return []StringPoint{*r.prev}
1257}
1258
1259// UnsignedReduceStringSliceFunc is the function called by a UnsignedPoint reducer.
1260type UnsignedReduceStringSliceFunc func(a []UnsignedPoint) []StringPoint
1261
1262// UnsignedSliceFuncStringReducer is a reducer that aggregates
1263// the passed in points and then invokes the function to reduce the points when they are emitted.
1264type UnsignedSliceFuncStringReducer struct {
1265	points []UnsignedPoint
1266	fn     UnsignedReduceStringSliceFunc
1267}
1268
1269// NewUnsignedSliceFuncStringReducer creates a new UnsignedSliceFuncStringReducer.
1270func NewUnsignedSliceFuncStringReducer(fn UnsignedReduceStringSliceFunc) *UnsignedSliceFuncStringReducer {
1271	return &UnsignedSliceFuncStringReducer{fn: fn}
1272}
1273
1274// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
1275// to the reduce function when Emit is called.
1276func (r *UnsignedSliceFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) {
1277	r.points = append(r.points, *p.Clone())
1278}
1279
1280// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
1281// This is a more efficient version of calling AggregateUnsigned on each point.
1282func (r *UnsignedSliceFuncStringReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
1283	r.points = append(r.points, points...)
1284}
1285
1286// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1287// This method does not clear the points from the internal slice.
1288func (r *UnsignedSliceFuncStringReducer) Emit() []StringPoint {
1289	return r.fn(r.points)
1290}
1291
1292// UnsignedReduceBooleanFunc is the function called by a UnsignedPoint reducer.
1293type UnsignedReduceBooleanFunc func(prev *BooleanPoint, curr *UnsignedPoint) (t int64, v bool, aux []interface{})
1294
1295// UnsignedFuncBooleanReducer is a reducer that reduces
1296// the passed in points to a single point using a reduce function.
1297type UnsignedFuncBooleanReducer struct {
1298	prev *BooleanPoint
1299	fn   UnsignedReduceBooleanFunc
1300}
1301
1302// NewUnsignedFuncBooleanReducer creates a new UnsignedFuncBooleanReducer.
1303func NewUnsignedFuncBooleanReducer(fn UnsignedReduceBooleanFunc, prev *BooleanPoint) *UnsignedFuncBooleanReducer {
1304	return &UnsignedFuncBooleanReducer{fn: fn, prev: prev}
1305}
1306
1307// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the
1308// current and new point to modify the current point.
1309func (r *UnsignedFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
1310	t, v, aux := r.fn(r.prev, p)
1311	if r.prev == nil {
1312		r.prev = &BooleanPoint{}
1313	}
1314	r.prev.Time = t
1315	r.prev.Value = v
1316	r.prev.Aux = aux
1317	if p.Aggregated > 1 {
1318		r.prev.Aggregated += p.Aggregated
1319	} else {
1320		r.prev.Aggregated++
1321	}
1322}
1323
1324// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned.
1325func (r *UnsignedFuncBooleanReducer) Emit() []BooleanPoint {
1326	return []BooleanPoint{*r.prev}
1327}
1328
1329// UnsignedReduceBooleanSliceFunc is the function called by a UnsignedPoint reducer.
1330type UnsignedReduceBooleanSliceFunc func(a []UnsignedPoint) []BooleanPoint
1331
1332// UnsignedSliceFuncBooleanReducer is a reducer that aggregates
1333// the passed in points and then invokes the function to reduce the points when they are emitted.
1334type UnsignedSliceFuncBooleanReducer struct {
1335	points []UnsignedPoint
1336	fn     UnsignedReduceBooleanSliceFunc
1337}
1338
1339// NewUnsignedSliceFuncBooleanReducer creates a new UnsignedSliceFuncBooleanReducer.
1340func NewUnsignedSliceFuncBooleanReducer(fn UnsignedReduceBooleanSliceFunc) *UnsignedSliceFuncBooleanReducer {
1341	return &UnsignedSliceFuncBooleanReducer{fn: fn}
1342}
1343
1344// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed
1345// to the reduce function when Emit is called.
1346func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) {
1347	r.points = append(r.points, *p.Clone())
1348}
1349
1350// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice.
1351// This is a more efficient version of calling AggregateUnsigned on each point.
1352func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsignedBulk(points []UnsignedPoint) {
1353	r.points = append(r.points, points...)
1354}
1355
1356// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1357// This method does not clear the points from the internal slice.
1358func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint {
1359	return r.fn(r.points)
1360}
1361
1362// UnsignedDistinctReducer returns the distinct points in a series.
1363type UnsignedDistinctReducer struct {
1364	m map[uint64]UnsignedPoint
1365}
1366
1367// NewUnsignedDistinctReducer creates a new UnsignedDistinctReducer.
1368func NewUnsignedDistinctReducer() *UnsignedDistinctReducer {
1369	return &UnsignedDistinctReducer{m: make(map[uint64]UnsignedPoint)}
1370}
1371
1372// AggregateUnsigned aggregates a point into the reducer.
1373func (r *UnsignedDistinctReducer) AggregateUnsigned(p *UnsignedPoint) {
1374	if _, ok := r.m[p.Value]; !ok {
1375		r.m[p.Value] = *p
1376	}
1377}
1378
1379// Emit emits the distinct points that have been aggregated into the reducer.
1380func (r *UnsignedDistinctReducer) Emit() []UnsignedPoint {
1381	points := make([]UnsignedPoint, 0, len(r.m))
1382	for _, p := range r.m {
1383		points = append(points, UnsignedPoint{Time: p.Time, Value: p.Value})
1384	}
1385	sort.Sort(unsignedPoints(points))
1386	return points
1387}
1388
1389// UnsignedElapsedReducer calculates the elapsed of the aggregated points.
1390type UnsignedElapsedReducer struct {
1391	unitConversion int64
1392	prev           UnsignedPoint
1393	curr           UnsignedPoint
1394}
1395
1396// NewUnsignedElapsedReducer creates a new UnsignedElapsedReducer.
1397func NewUnsignedElapsedReducer(interval Interval) *UnsignedElapsedReducer {
1398	return &UnsignedElapsedReducer{
1399		unitConversion: int64(interval.Duration),
1400		prev:           UnsignedPoint{Nil: true},
1401		curr:           UnsignedPoint{Nil: true},
1402	}
1403}
1404
1405// AggregateUnsigned aggregates a point into the reducer and updates the current window.
1406func (r *UnsignedElapsedReducer) AggregateUnsigned(p *UnsignedPoint) {
1407	r.prev = r.curr
1408	r.curr = *p
1409}
1410
1411// Emit emits the elapsed of the reducer at the current point.
1412func (r *UnsignedElapsedReducer) Emit() []IntegerPoint {
1413	if !r.prev.Nil {
1414		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
1415		return []IntegerPoint{
1416			{Time: r.curr.Time, Value: elapsed},
1417		}
1418	}
1419	return nil
1420}
1421
1422// UnsignedSampleReducer implements a reservoir sampling to calculate a random subset of points
1423type UnsignedSampleReducer struct {
1424	count int        // how many points we've iterated over
1425	rng   *rand.Rand // random number generator for each reducer
1426
1427	points unsignedPoints // the reservoir
1428}
1429
1430// NewUnsignedSampleReducer creates a new UnsignedSampleReducer
1431func NewUnsignedSampleReducer(size int) *UnsignedSampleReducer {
1432	return &UnsignedSampleReducer{
1433		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
1434		points: make(unsignedPoints, size),
1435	}
1436}
1437
1438// AggregateUnsigned aggregates a point into the reducer.
1439func (r *UnsignedSampleReducer) AggregateUnsigned(p *UnsignedPoint) {
1440	r.count++
1441	// Fill the reservoir with the first n points
1442	if r.count-1 < len(r.points) {
1443		p.CopyTo(&r.points[r.count-1])
1444		return
1445	}
1446
1447	// Generate a random integer between 1 and the count and
1448	// if that number is less than the length of the slice
1449	// replace the point at that index rnd with p.
1450	rnd := r.rng.Intn(r.count)
1451	if rnd < len(r.points) {
1452		p.CopyTo(&r.points[rnd])
1453	}
1454}
1455
1456// Emit emits the reservoir sample as many points.
1457func (r *UnsignedSampleReducer) Emit() []UnsignedPoint {
1458	min := len(r.points)
1459	if r.count < min {
1460		min = r.count
1461	}
1462	pts := r.points[:min]
1463	sort.Sort(pts)
1464	return pts
1465}
1466
1467// StringPointAggregator aggregates points to produce a single point.
1468type StringPointAggregator interface {
1469	AggregateString(p *StringPoint)
1470}
1471
1472// StringBulkPointAggregator aggregates multiple points at a time.
1473type StringBulkPointAggregator interface {
1474	AggregateStringBulk(points []StringPoint)
1475}
1476
1477// AggregateStringPoints feeds a slice of StringPoint into an
1478// aggregator. If the aggregator is a StringBulkPointAggregator, it will
1479// use the AggregateBulk method.
1480func AggregateStringPoints(a StringPointAggregator, points []StringPoint) {
1481	switch a := a.(type) {
1482	case StringBulkPointAggregator:
1483		a.AggregateStringBulk(points)
1484	default:
1485		for _, p := range points {
1486			a.AggregateString(&p)
1487		}
1488	}
1489}
1490
1491// StringPointEmitter produces a single point from an aggregate.
1492type StringPointEmitter interface {
1493	Emit() []StringPoint
1494}
1495
1496// StringReduceFloatFunc is the function called by a StringPoint reducer.
1497type StringReduceFloatFunc func(prev *FloatPoint, curr *StringPoint) (t int64, v float64, aux []interface{})
1498
1499// StringFuncFloatReducer is a reducer that reduces
1500// the passed in points to a single point using a reduce function.
1501type StringFuncFloatReducer struct {
1502	prev *FloatPoint
1503	fn   StringReduceFloatFunc
1504}
1505
1506// NewStringFuncFloatReducer creates a new StringFuncFloatReducer.
1507func NewStringFuncFloatReducer(fn StringReduceFloatFunc, prev *FloatPoint) *StringFuncFloatReducer {
1508	return &StringFuncFloatReducer{fn: fn, prev: prev}
1509}
1510
1511// AggregateString takes a StringPoint and invokes the reduce function with the
1512// current and new point to modify the current point.
1513func (r *StringFuncFloatReducer) AggregateString(p *StringPoint) {
1514	t, v, aux := r.fn(r.prev, p)
1515	if r.prev == nil {
1516		r.prev = &FloatPoint{}
1517	}
1518	r.prev.Time = t
1519	r.prev.Value = v
1520	r.prev.Aux = aux
1521	if p.Aggregated > 1 {
1522		r.prev.Aggregated += p.Aggregated
1523	} else {
1524		r.prev.Aggregated++
1525	}
1526}
1527
1528// Emit emits the point that was generated when reducing the points fed in with AggregateString.
1529func (r *StringFuncFloatReducer) Emit() []FloatPoint {
1530	return []FloatPoint{*r.prev}
1531}
1532
1533// StringReduceFloatSliceFunc is the function called by a StringPoint reducer.
1534type StringReduceFloatSliceFunc func(a []StringPoint) []FloatPoint
1535
1536// StringSliceFuncFloatReducer is a reducer that aggregates
1537// the passed in points and then invokes the function to reduce the points when they are emitted.
1538type StringSliceFuncFloatReducer struct {
1539	points []StringPoint
1540	fn     StringReduceFloatSliceFunc
1541}
1542
1543// NewStringSliceFuncFloatReducer creates a new StringSliceFuncFloatReducer.
1544func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceFuncFloatReducer {
1545	return &StringSliceFuncFloatReducer{fn: fn}
1546}
1547
1548// AggregateString copies the StringPoint into the internal slice to be passed
1549// to the reduce function when Emit is called.
1550func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) {
1551	r.points = append(r.points, *p.Clone())
1552}
1553
1554// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
1555// This is a more efficient version of calling AggregateString on each point.
1556func (r *StringSliceFuncFloatReducer) AggregateStringBulk(points []StringPoint) {
1557	r.points = append(r.points, points...)
1558}
1559
1560// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1561// This method does not clear the points from the internal slice.
1562func (r *StringSliceFuncFloatReducer) Emit() []FloatPoint {
1563	return r.fn(r.points)
1564}
1565
1566// StringReduceIntegerFunc is the function called by a StringPoint reducer.
1567type StringReduceIntegerFunc func(prev *IntegerPoint, curr *StringPoint) (t int64, v int64, aux []interface{})
1568
1569// StringFuncIntegerReducer is a reducer that reduces
1570// the passed in points to a single point using a reduce function.
1571type StringFuncIntegerReducer struct {
1572	prev *IntegerPoint
1573	fn   StringReduceIntegerFunc
1574}
1575
1576// NewStringFuncIntegerReducer creates a new StringFuncIntegerReducer.
1577func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc, prev *IntegerPoint) *StringFuncIntegerReducer {
1578	return &StringFuncIntegerReducer{fn: fn, prev: prev}
1579}
1580
1581// AggregateString takes a StringPoint and invokes the reduce function with the
1582// current and new point to modify the current point.
1583func (r *StringFuncIntegerReducer) AggregateString(p *StringPoint) {
1584	t, v, aux := r.fn(r.prev, p)
1585	if r.prev == nil {
1586		r.prev = &IntegerPoint{}
1587	}
1588	r.prev.Time = t
1589	r.prev.Value = v
1590	r.prev.Aux = aux
1591	if p.Aggregated > 1 {
1592		r.prev.Aggregated += p.Aggregated
1593	} else {
1594		r.prev.Aggregated++
1595	}
1596}
1597
1598// Emit emits the point that was generated when reducing the points fed in with AggregateString.
1599func (r *StringFuncIntegerReducer) Emit() []IntegerPoint {
1600	return []IntegerPoint{*r.prev}
1601}
1602
1603// StringReduceIntegerSliceFunc is the function called by a StringPoint reducer.
1604type StringReduceIntegerSliceFunc func(a []StringPoint) []IntegerPoint
1605
1606// StringSliceFuncIntegerReducer is a reducer that aggregates
1607// the passed in points and then invokes the function to reduce the points when they are emitted.
1608type StringSliceFuncIntegerReducer struct {
1609	points []StringPoint
1610	fn     StringReduceIntegerSliceFunc
1611}
1612
1613// NewStringSliceFuncIntegerReducer creates a new StringSliceFuncIntegerReducer.
1614func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSliceFuncIntegerReducer {
1615	return &StringSliceFuncIntegerReducer{fn: fn}
1616}
1617
1618// AggregateString copies the StringPoint into the internal slice to be passed
1619// to the reduce function when Emit is called.
1620func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) {
1621	r.points = append(r.points, *p.Clone())
1622}
1623
1624// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
1625// This is a more efficient version of calling AggregateString on each point.
1626func (r *StringSliceFuncIntegerReducer) AggregateStringBulk(points []StringPoint) {
1627	r.points = append(r.points, points...)
1628}
1629
1630// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1631// This method does not clear the points from the internal slice.
1632func (r *StringSliceFuncIntegerReducer) Emit() []IntegerPoint {
1633	return r.fn(r.points)
1634}
1635
1636// StringReduceUnsignedFunc is the function called by a StringPoint reducer.
1637type StringReduceUnsignedFunc func(prev *UnsignedPoint, curr *StringPoint) (t int64, v uint64, aux []interface{})
1638
1639// StringFuncUnsignedReducer is a reducer that reduces
1640// the passed in points to a single point using a reduce function.
1641type StringFuncUnsignedReducer struct {
1642	prev *UnsignedPoint
1643	fn   StringReduceUnsignedFunc
1644}
1645
1646// NewStringFuncUnsignedReducer creates a new StringFuncUnsignedReducer.
1647func NewStringFuncUnsignedReducer(fn StringReduceUnsignedFunc, prev *UnsignedPoint) *StringFuncUnsignedReducer {
1648	return &StringFuncUnsignedReducer{fn: fn, prev: prev}
1649}
1650
1651// AggregateString takes a StringPoint and invokes the reduce function with the
1652// current and new point to modify the current point.
1653func (r *StringFuncUnsignedReducer) AggregateString(p *StringPoint) {
1654	t, v, aux := r.fn(r.prev, p)
1655	if r.prev == nil {
1656		r.prev = &UnsignedPoint{}
1657	}
1658	r.prev.Time = t
1659	r.prev.Value = v
1660	r.prev.Aux = aux
1661	if p.Aggregated > 1 {
1662		r.prev.Aggregated += p.Aggregated
1663	} else {
1664		r.prev.Aggregated++
1665	}
1666}
1667
1668// Emit emits the point that was generated when reducing the points fed in with AggregateString.
1669func (r *StringFuncUnsignedReducer) Emit() []UnsignedPoint {
1670	return []UnsignedPoint{*r.prev}
1671}
1672
1673// StringReduceUnsignedSliceFunc is the function called by a StringPoint reducer.
1674type StringReduceUnsignedSliceFunc func(a []StringPoint) []UnsignedPoint
1675
1676// StringSliceFuncUnsignedReducer is a reducer that aggregates
1677// the passed in points and then invokes the function to reduce the points when they are emitted.
1678type StringSliceFuncUnsignedReducer struct {
1679	points []StringPoint
1680	fn     StringReduceUnsignedSliceFunc
1681}
1682
1683// NewStringSliceFuncUnsignedReducer creates a new StringSliceFuncUnsignedReducer.
1684func NewStringSliceFuncUnsignedReducer(fn StringReduceUnsignedSliceFunc) *StringSliceFuncUnsignedReducer {
1685	return &StringSliceFuncUnsignedReducer{fn: fn}
1686}
1687
1688// AggregateString copies the StringPoint into the internal slice to be passed
1689// to the reduce function when Emit is called.
1690func (r *StringSliceFuncUnsignedReducer) AggregateString(p *StringPoint) {
1691	r.points = append(r.points, *p.Clone())
1692}
1693
1694// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
1695// This is a more efficient version of calling AggregateString on each point.
1696func (r *StringSliceFuncUnsignedReducer) AggregateStringBulk(points []StringPoint) {
1697	r.points = append(r.points, points...)
1698}
1699
1700// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1701// This method does not clear the points from the internal slice.
1702func (r *StringSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
1703	return r.fn(r.points)
1704}
1705
1706// StringReduceFunc is the function called by a StringPoint reducer.
1707type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{})
1708
1709// StringFuncReducer is a reducer that reduces
1710// the passed in points to a single point using a reduce function.
1711type StringFuncReducer struct {
1712	prev *StringPoint
1713	fn   StringReduceFunc
1714}
1715
1716// NewStringFuncReducer creates a new StringFuncStringReducer.
1717func NewStringFuncReducer(fn StringReduceFunc, prev *StringPoint) *StringFuncReducer {
1718	return &StringFuncReducer{fn: fn, prev: prev}
1719}
1720
1721// AggregateString takes a StringPoint and invokes the reduce function with the
1722// current and new point to modify the current point.
1723func (r *StringFuncReducer) AggregateString(p *StringPoint) {
1724	t, v, aux := r.fn(r.prev, p)
1725	if r.prev == nil {
1726		r.prev = &StringPoint{}
1727	}
1728	r.prev.Time = t
1729	r.prev.Value = v
1730	r.prev.Aux = aux
1731	if p.Aggregated > 1 {
1732		r.prev.Aggregated += p.Aggregated
1733	} else {
1734		r.prev.Aggregated++
1735	}
1736}
1737
1738// Emit emits the point that was generated when reducing the points fed in with AggregateString.
1739func (r *StringFuncReducer) Emit() []StringPoint {
1740	return []StringPoint{*r.prev}
1741}
1742
1743// StringReduceSliceFunc is the function called by a StringPoint reducer.
1744type StringReduceSliceFunc func(a []StringPoint) []StringPoint
1745
1746// StringSliceFuncReducer is a reducer that aggregates
1747// the passed in points and then invokes the function to reduce the points when they are emitted.
1748type StringSliceFuncReducer struct {
1749	points []StringPoint
1750	fn     StringReduceSliceFunc
1751}
1752
1753// NewStringSliceFuncReducer creates a new StringSliceFuncReducer.
1754func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer {
1755	return &StringSliceFuncReducer{fn: fn}
1756}
1757
1758// AggregateString copies the StringPoint into the internal slice to be passed
1759// to the reduce function when Emit is called.
1760func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) {
1761	r.points = append(r.points, *p.Clone())
1762}
1763
1764// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
1765// This is a more efficient version of calling AggregateString on each point.
1766func (r *StringSliceFuncReducer) AggregateStringBulk(points []StringPoint) {
1767	r.points = append(r.points, points...)
1768}
1769
1770// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1771// This method does not clear the points from the internal slice.
1772func (r *StringSliceFuncReducer) Emit() []StringPoint {
1773	return r.fn(r.points)
1774}
1775
1776// StringReduceBooleanFunc is the function called by a StringPoint reducer.
1777type StringReduceBooleanFunc func(prev *BooleanPoint, curr *StringPoint) (t int64, v bool, aux []interface{})
1778
1779// StringFuncBooleanReducer is a reducer that reduces
1780// the passed in points to a single point using a reduce function.
1781type StringFuncBooleanReducer struct {
1782	prev *BooleanPoint
1783	fn   StringReduceBooleanFunc
1784}
1785
1786// NewStringFuncBooleanReducer creates a new StringFuncBooleanReducer.
1787func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc, prev *BooleanPoint) *StringFuncBooleanReducer {
1788	return &StringFuncBooleanReducer{fn: fn, prev: prev}
1789}
1790
1791// AggregateString takes a StringPoint and invokes the reduce function with the
1792// current and new point to modify the current point.
1793func (r *StringFuncBooleanReducer) AggregateString(p *StringPoint) {
1794	t, v, aux := r.fn(r.prev, p)
1795	if r.prev == nil {
1796		r.prev = &BooleanPoint{}
1797	}
1798	r.prev.Time = t
1799	r.prev.Value = v
1800	r.prev.Aux = aux
1801	if p.Aggregated > 1 {
1802		r.prev.Aggregated += p.Aggregated
1803	} else {
1804		r.prev.Aggregated++
1805	}
1806}
1807
1808// Emit emits the point that was generated when reducing the points fed in with AggregateString.
1809func (r *StringFuncBooleanReducer) Emit() []BooleanPoint {
1810	return []BooleanPoint{*r.prev}
1811}
1812
1813// StringReduceBooleanSliceFunc is the function called by a StringPoint reducer.
1814type StringReduceBooleanSliceFunc func(a []StringPoint) []BooleanPoint
1815
1816// StringSliceFuncBooleanReducer is a reducer that aggregates
1817// the passed in points and then invokes the function to reduce the points when they are emitted.
1818type StringSliceFuncBooleanReducer struct {
1819	points []StringPoint
1820	fn     StringReduceBooleanSliceFunc
1821}
1822
1823// NewStringSliceFuncBooleanReducer creates a new StringSliceFuncBooleanReducer.
1824func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSliceFuncBooleanReducer {
1825	return &StringSliceFuncBooleanReducer{fn: fn}
1826}
1827
1828// AggregateString copies the StringPoint into the internal slice to be passed
1829// to the reduce function when Emit is called.
1830func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) {
1831	r.points = append(r.points, *p.Clone())
1832}
1833
1834// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
1835// This is a more efficient version of calling AggregateString on each point.
1836func (r *StringSliceFuncBooleanReducer) AggregateStringBulk(points []StringPoint) {
1837	r.points = append(r.points, points...)
1838}
1839
1840// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
1841// This method does not clear the points from the internal slice.
1842func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint {
1843	return r.fn(r.points)
1844}
1845
1846// StringDistinctReducer returns the distinct points in a series.
1847type StringDistinctReducer struct {
1848	m map[string]StringPoint
1849}
1850
1851// NewStringDistinctReducer creates a new StringDistinctReducer.
1852func NewStringDistinctReducer() *StringDistinctReducer {
1853	return &StringDistinctReducer{m: make(map[string]StringPoint)}
1854}
1855
1856// AggregateString aggregates a point into the reducer.
1857func (r *StringDistinctReducer) AggregateString(p *StringPoint) {
1858	if _, ok := r.m[p.Value]; !ok {
1859		r.m[p.Value] = *p
1860	}
1861}
1862
1863// Emit emits the distinct points that have been aggregated into the reducer.
1864func (r *StringDistinctReducer) Emit() []StringPoint {
1865	points := make([]StringPoint, 0, len(r.m))
1866	for _, p := range r.m {
1867		points = append(points, StringPoint{Time: p.Time, Value: p.Value})
1868	}
1869	sort.Sort(stringPoints(points))
1870	return points
1871}
1872
1873// StringElapsedReducer calculates the elapsed of the aggregated points.
1874type StringElapsedReducer struct {
1875	unitConversion int64
1876	prev           StringPoint
1877	curr           StringPoint
1878}
1879
1880// NewStringElapsedReducer creates a new StringElapsedReducer.
1881func NewStringElapsedReducer(interval Interval) *StringElapsedReducer {
1882	return &StringElapsedReducer{
1883		unitConversion: int64(interval.Duration),
1884		prev:           StringPoint{Nil: true},
1885		curr:           StringPoint{Nil: true},
1886	}
1887}
1888
1889// AggregateString aggregates a point into the reducer and updates the current window.
1890func (r *StringElapsedReducer) AggregateString(p *StringPoint) {
1891	r.prev = r.curr
1892	r.curr = *p
1893}
1894
1895// Emit emits the elapsed of the reducer at the current point.
1896func (r *StringElapsedReducer) Emit() []IntegerPoint {
1897	if !r.prev.Nil {
1898		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
1899		return []IntegerPoint{
1900			{Time: r.curr.Time, Value: elapsed},
1901		}
1902	}
1903	return nil
1904}
1905
1906// StringSampleReducer implements a reservoir sampling to calculate a random subset of points
1907type StringSampleReducer struct {
1908	count int        // how many points we've iterated over
1909	rng   *rand.Rand // random number generator for each reducer
1910
1911	points stringPoints // the reservoir
1912}
1913
1914// NewStringSampleReducer creates a new StringSampleReducer
1915func NewStringSampleReducer(size int) *StringSampleReducer {
1916	return &StringSampleReducer{
1917		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
1918		points: make(stringPoints, size),
1919	}
1920}
1921
1922// AggregateString aggregates a point into the reducer.
1923func (r *StringSampleReducer) AggregateString(p *StringPoint) {
1924	r.count++
1925	// Fill the reservoir with the first n points
1926	if r.count-1 < len(r.points) {
1927		p.CopyTo(&r.points[r.count-1])
1928		return
1929	}
1930
1931	// Generate a random integer between 1 and the count and
1932	// if that number is less than the length of the slice
1933	// replace the point at that index rnd with p.
1934	rnd := r.rng.Intn(r.count)
1935	if rnd < len(r.points) {
1936		p.CopyTo(&r.points[rnd])
1937	}
1938}
1939
1940// Emit emits the reservoir sample as many points.
1941func (r *StringSampleReducer) Emit() []StringPoint {
1942	min := len(r.points)
1943	if r.count < min {
1944		min = r.count
1945	}
1946	pts := r.points[:min]
1947	sort.Sort(pts)
1948	return pts
1949}
1950
1951// BooleanPointAggregator aggregates points to produce a single point.
1952type BooleanPointAggregator interface {
1953	AggregateBoolean(p *BooleanPoint)
1954}
1955
1956// BooleanBulkPointAggregator aggregates multiple points at a time.
1957type BooleanBulkPointAggregator interface {
1958	AggregateBooleanBulk(points []BooleanPoint)
1959}
1960
1961// AggregateBooleanPoints feeds a slice of BooleanPoint into an
1962// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will
1963// use the AggregateBulk method.
1964func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) {
1965	switch a := a.(type) {
1966	case BooleanBulkPointAggregator:
1967		a.AggregateBooleanBulk(points)
1968	default:
1969		for _, p := range points {
1970			a.AggregateBoolean(&p)
1971		}
1972	}
1973}
1974
1975// BooleanPointEmitter produces a single point from an aggregate.
1976type BooleanPointEmitter interface {
1977	Emit() []BooleanPoint
1978}
1979
1980// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer.
1981type BooleanReduceFloatFunc func(prev *FloatPoint, curr *BooleanPoint) (t int64, v float64, aux []interface{})
1982
1983// BooleanFuncFloatReducer is a reducer that reduces
1984// the passed in points to a single point using a reduce function.
1985type BooleanFuncFloatReducer struct {
1986	prev *FloatPoint
1987	fn   BooleanReduceFloatFunc
1988}
1989
1990// NewBooleanFuncFloatReducer creates a new BooleanFuncFloatReducer.
1991func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc, prev *FloatPoint) *BooleanFuncFloatReducer {
1992	return &BooleanFuncFloatReducer{fn: fn, prev: prev}
1993}
1994
1995// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
1996// current and new point to modify the current point.
1997func (r *BooleanFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
1998	t, v, aux := r.fn(r.prev, p)
1999	if r.prev == nil {
2000		r.prev = &FloatPoint{}
2001	}
2002	r.prev.Time = t
2003	r.prev.Value = v
2004	r.prev.Aux = aux
2005	if p.Aggregated > 1 {
2006		r.prev.Aggregated += p.Aggregated
2007	} else {
2008		r.prev.Aggregated++
2009	}
2010}
2011
2012// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
2013func (r *BooleanFuncFloatReducer) Emit() []FloatPoint {
2014	return []FloatPoint{*r.prev}
2015}
2016
2017// BooleanReduceFloatSliceFunc is the function called by a BooleanPoint reducer.
2018type BooleanReduceFloatSliceFunc func(a []BooleanPoint) []FloatPoint
2019
2020// BooleanSliceFuncFloatReducer is a reducer that aggregates
2021// the passed in points and then invokes the function to reduce the points when they are emitted.
2022type BooleanSliceFuncFloatReducer struct {
2023	points []BooleanPoint
2024	fn     BooleanReduceFloatSliceFunc
2025}
2026
2027// NewBooleanSliceFuncFloatReducer creates a new BooleanSliceFuncFloatReducer.
2028func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSliceFuncFloatReducer {
2029	return &BooleanSliceFuncFloatReducer{fn: fn}
2030}
2031
2032// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
2033// to the reduce function when Emit is called.
2034func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
2035	r.points = append(r.points, *p.Clone())
2036}
2037
2038// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
2039// This is a more efficient version of calling AggregateBoolean on each point.
2040func (r *BooleanSliceFuncFloatReducer) AggregateBooleanBulk(points []BooleanPoint) {
2041	r.points = append(r.points, points...)
2042}
2043
2044// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
2045// This method does not clear the points from the internal slice.
2046func (r *BooleanSliceFuncFloatReducer) Emit() []FloatPoint {
2047	return r.fn(r.points)
2048}
2049
2050// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer.
2051type BooleanReduceIntegerFunc func(prev *IntegerPoint, curr *BooleanPoint) (t int64, v int64, aux []interface{})
2052
2053// BooleanFuncIntegerReducer is a reducer that reduces
2054// the passed in points to a single point using a reduce function.
2055type BooleanFuncIntegerReducer struct {
2056	prev *IntegerPoint
2057	fn   BooleanReduceIntegerFunc
2058}
2059
2060// NewBooleanFuncIntegerReducer creates a new BooleanFuncIntegerReducer.
2061func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc, prev *IntegerPoint) *BooleanFuncIntegerReducer {
2062	return &BooleanFuncIntegerReducer{fn: fn, prev: prev}
2063}
2064
2065// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
2066// current and new point to modify the current point.
2067func (r *BooleanFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
2068	t, v, aux := r.fn(r.prev, p)
2069	if r.prev == nil {
2070		r.prev = &IntegerPoint{}
2071	}
2072	r.prev.Time = t
2073	r.prev.Value = v
2074	r.prev.Aux = aux
2075	if p.Aggregated > 1 {
2076		r.prev.Aggregated += p.Aggregated
2077	} else {
2078		r.prev.Aggregated++
2079	}
2080}
2081
2082// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
2083func (r *BooleanFuncIntegerReducer) Emit() []IntegerPoint {
2084	return []IntegerPoint{*r.prev}
2085}
2086
2087// BooleanReduceIntegerSliceFunc is the function called by a BooleanPoint reducer.
2088type BooleanReduceIntegerSliceFunc func(a []BooleanPoint) []IntegerPoint
2089
2090// BooleanSliceFuncIntegerReducer is a reducer that aggregates
2091// the passed in points and then invokes the function to reduce the points when they are emitted.
2092type BooleanSliceFuncIntegerReducer struct {
2093	points []BooleanPoint
2094	fn     BooleanReduceIntegerSliceFunc
2095}
2096
2097// NewBooleanSliceFuncIntegerReducer creates a new BooleanSliceFuncIntegerReducer.
2098func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *BooleanSliceFuncIntegerReducer {
2099	return &BooleanSliceFuncIntegerReducer{fn: fn}
2100}
2101
2102// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
2103// to the reduce function when Emit is called.
2104func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
2105	r.points = append(r.points, *p.Clone())
2106}
2107
2108// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
2109// This is a more efficient version of calling AggregateBoolean on each point.
2110func (r *BooleanSliceFuncIntegerReducer) AggregateBooleanBulk(points []BooleanPoint) {
2111	r.points = append(r.points, points...)
2112}
2113
2114// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
2115// This method does not clear the points from the internal slice.
2116func (r *BooleanSliceFuncIntegerReducer) Emit() []IntegerPoint {
2117	return r.fn(r.points)
2118}
2119
2120// BooleanReduceUnsignedFunc is the function called by a BooleanPoint reducer.
2121type BooleanReduceUnsignedFunc func(prev *UnsignedPoint, curr *BooleanPoint) (t int64, v uint64, aux []interface{})
2122
2123// BooleanFuncUnsignedReducer is a reducer that reduces
2124// the passed in points to a single point using a reduce function.
2125type BooleanFuncUnsignedReducer struct {
2126	prev *UnsignedPoint
2127	fn   BooleanReduceUnsignedFunc
2128}
2129
2130// NewBooleanFuncUnsignedReducer creates a new BooleanFuncUnsignedReducer.
2131func NewBooleanFuncUnsignedReducer(fn BooleanReduceUnsignedFunc, prev *UnsignedPoint) *BooleanFuncUnsignedReducer {
2132	return &BooleanFuncUnsignedReducer{fn: fn, prev: prev}
2133}
2134
2135// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
2136// current and new point to modify the current point.
2137func (r *BooleanFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
2138	t, v, aux := r.fn(r.prev, p)
2139	if r.prev == nil {
2140		r.prev = &UnsignedPoint{}
2141	}
2142	r.prev.Time = t
2143	r.prev.Value = v
2144	r.prev.Aux = aux
2145	if p.Aggregated > 1 {
2146		r.prev.Aggregated += p.Aggregated
2147	} else {
2148		r.prev.Aggregated++
2149	}
2150}
2151
2152// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
2153func (r *BooleanFuncUnsignedReducer) Emit() []UnsignedPoint {
2154	return []UnsignedPoint{*r.prev}
2155}
2156
2157// BooleanReduceUnsignedSliceFunc is the function called by a BooleanPoint reducer.
2158type BooleanReduceUnsignedSliceFunc func(a []BooleanPoint) []UnsignedPoint
2159
2160// BooleanSliceFuncUnsignedReducer is a reducer that aggregates
2161// the passed in points and then invokes the function to reduce the points when they are emitted.
2162type BooleanSliceFuncUnsignedReducer struct {
2163	points []BooleanPoint
2164	fn     BooleanReduceUnsignedSliceFunc
2165}
2166
2167// NewBooleanSliceFuncUnsignedReducer creates a new BooleanSliceFuncUnsignedReducer.
2168func NewBooleanSliceFuncUnsignedReducer(fn BooleanReduceUnsignedSliceFunc) *BooleanSliceFuncUnsignedReducer {
2169	return &BooleanSliceFuncUnsignedReducer{fn: fn}
2170}
2171
2172// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
2173// to the reduce function when Emit is called.
2174func (r *BooleanSliceFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) {
2175	r.points = append(r.points, *p.Clone())
2176}
2177
2178// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
2179// This is a more efficient version of calling AggregateBoolean on each point.
2180func (r *BooleanSliceFuncUnsignedReducer) AggregateBooleanBulk(points []BooleanPoint) {
2181	r.points = append(r.points, points...)
2182}
2183
2184// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
2185// This method does not clear the points from the internal slice.
2186func (r *BooleanSliceFuncUnsignedReducer) Emit() []UnsignedPoint {
2187	return r.fn(r.points)
2188}
2189
2190// BooleanReduceStringFunc is the function called by a BooleanPoint reducer.
2191type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{})
2192
2193// BooleanFuncStringReducer is a reducer that reduces
2194// the passed in points to a single point using a reduce function.
2195type BooleanFuncStringReducer struct {
2196	prev *StringPoint
2197	fn   BooleanReduceStringFunc
2198}
2199
2200// NewBooleanFuncStringReducer creates a new BooleanFuncStringReducer.
2201func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc, prev *StringPoint) *BooleanFuncStringReducer {
2202	return &BooleanFuncStringReducer{fn: fn, prev: prev}
2203}
2204
2205// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
2206// current and new point to modify the current point.
2207func (r *BooleanFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
2208	t, v, aux := r.fn(r.prev, p)
2209	if r.prev == nil {
2210		r.prev = &StringPoint{}
2211	}
2212	r.prev.Time = t
2213	r.prev.Value = v
2214	r.prev.Aux = aux
2215	if p.Aggregated > 1 {
2216		r.prev.Aggregated += p.Aggregated
2217	} else {
2218		r.prev.Aggregated++
2219	}
2220}
2221
2222// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
2223func (r *BooleanFuncStringReducer) Emit() []StringPoint {
2224	return []StringPoint{*r.prev}
2225}
2226
2227// BooleanReduceStringSliceFunc is the function called by a BooleanPoint reducer.
2228type BooleanReduceStringSliceFunc func(a []BooleanPoint) []StringPoint
2229
2230// BooleanSliceFuncStringReducer is a reducer that aggregates
2231// the passed in points and then invokes the function to reduce the points when they are emitted.
2232type BooleanSliceFuncStringReducer struct {
2233	points []BooleanPoint
2234	fn     BooleanReduceStringSliceFunc
2235}
2236
2237// NewBooleanSliceFuncStringReducer creates a new BooleanSliceFuncStringReducer.
2238func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanSliceFuncStringReducer {
2239	return &BooleanSliceFuncStringReducer{fn: fn}
2240}
2241
2242// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
2243// to the reduce function when Emit is called.
2244func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
2245	r.points = append(r.points, *p.Clone())
2246}
2247
2248// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
2249// This is a more efficient version of calling AggregateBoolean on each point.
2250func (r *BooleanSliceFuncStringReducer) AggregateBooleanBulk(points []BooleanPoint) {
2251	r.points = append(r.points, points...)
2252}
2253
2254// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
2255// This method does not clear the points from the internal slice.
2256func (r *BooleanSliceFuncStringReducer) Emit() []StringPoint {
2257	return r.fn(r.points)
2258}
2259
2260// BooleanReduceFunc is the function called by a BooleanPoint reducer.
2261type BooleanReduceFunc func(prev *BooleanPoint, curr *BooleanPoint) (t int64, v bool, aux []interface{})
2262
2263// BooleanFuncReducer is a reducer that reduces
2264// the passed in points to a single point using a reduce function.
2265type BooleanFuncReducer struct {
2266	prev *BooleanPoint
2267	fn   BooleanReduceFunc
2268}
2269
2270// NewBooleanFuncReducer creates a new BooleanFuncBooleanReducer.
2271func NewBooleanFuncReducer(fn BooleanReduceFunc, prev *BooleanPoint) *BooleanFuncReducer {
2272	return &BooleanFuncReducer{fn: fn, prev: prev}
2273}
2274
2275// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the
2276// current and new point to modify the current point.
2277func (r *BooleanFuncReducer) AggregateBoolean(p *BooleanPoint) {
2278	t, v, aux := r.fn(r.prev, p)
2279	if r.prev == nil {
2280		r.prev = &BooleanPoint{}
2281	}
2282	r.prev.Time = t
2283	r.prev.Value = v
2284	r.prev.Aux = aux
2285	if p.Aggregated > 1 {
2286		r.prev.Aggregated += p.Aggregated
2287	} else {
2288		r.prev.Aggregated++
2289	}
2290}
2291
2292// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean.
2293func (r *BooleanFuncReducer) Emit() []BooleanPoint {
2294	return []BooleanPoint{*r.prev}
2295}
2296
2297// BooleanReduceSliceFunc is the function called by a BooleanPoint reducer.
2298type BooleanReduceSliceFunc func(a []BooleanPoint) []BooleanPoint
2299
2300// BooleanSliceFuncReducer is a reducer that aggregates
2301// the passed in points and then invokes the function to reduce the points when they are emitted.
2302type BooleanSliceFuncReducer struct {
2303	points []BooleanPoint
2304	fn     BooleanReduceSliceFunc
2305}
2306
2307// NewBooleanSliceFuncReducer creates a new BooleanSliceFuncReducer.
2308func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncReducer {
2309	return &BooleanSliceFuncReducer{fn: fn}
2310}
2311
2312// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
2313// to the reduce function when Emit is called.
2314func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) {
2315	r.points = append(r.points, *p.Clone())
2316}
2317
2318// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
2319// This is a more efficient version of calling AggregateBoolean on each point.
2320func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) {
2321	r.points = append(r.points, points...)
2322}
2323
2324// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
2325// This method does not clear the points from the internal slice.
2326func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint {
2327	return r.fn(r.points)
2328}
2329
2330// BooleanDistinctReducer returns the distinct points in a series.
2331type BooleanDistinctReducer struct {
2332	m map[bool]BooleanPoint
2333}
2334
2335// NewBooleanDistinctReducer creates a new BooleanDistinctReducer.
2336func NewBooleanDistinctReducer() *BooleanDistinctReducer {
2337	return &BooleanDistinctReducer{m: make(map[bool]BooleanPoint)}
2338}
2339
2340// AggregateBoolean aggregates a point into the reducer.
2341func (r *BooleanDistinctReducer) AggregateBoolean(p *BooleanPoint) {
2342	if _, ok := r.m[p.Value]; !ok {
2343		r.m[p.Value] = *p
2344	}
2345}
2346
2347// Emit emits the distinct points that have been aggregated into the reducer.
2348func (r *BooleanDistinctReducer) Emit() []BooleanPoint {
2349	points := make([]BooleanPoint, 0, len(r.m))
2350	for _, p := range r.m {
2351		points = append(points, BooleanPoint{Time: p.Time, Value: p.Value})
2352	}
2353	sort.Sort(booleanPoints(points))
2354	return points
2355}
2356
2357// BooleanElapsedReducer calculates the elapsed of the aggregated points.
2358type BooleanElapsedReducer struct {
2359	unitConversion int64
2360	prev           BooleanPoint
2361	curr           BooleanPoint
2362}
2363
2364// NewBooleanElapsedReducer creates a new BooleanElapsedReducer.
2365func NewBooleanElapsedReducer(interval Interval) *BooleanElapsedReducer {
2366	return &BooleanElapsedReducer{
2367		unitConversion: int64(interval.Duration),
2368		prev:           BooleanPoint{Nil: true},
2369		curr:           BooleanPoint{Nil: true},
2370	}
2371}
2372
2373// AggregateBoolean aggregates a point into the reducer and updates the current window.
2374func (r *BooleanElapsedReducer) AggregateBoolean(p *BooleanPoint) {
2375	r.prev = r.curr
2376	r.curr = *p
2377}
2378
2379// Emit emits the elapsed of the reducer at the current point.
2380func (r *BooleanElapsedReducer) Emit() []IntegerPoint {
2381	if !r.prev.Nil {
2382		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
2383		return []IntegerPoint{
2384			{Time: r.curr.Time, Value: elapsed},
2385		}
2386	}
2387	return nil
2388}
2389
2390// BooleanSampleReducer implements a reservoir sampling to calculate a random subset of points
2391type BooleanSampleReducer struct {
2392	count int        // how many points we've iterated over
2393	rng   *rand.Rand // random number generator for each reducer
2394
2395	points booleanPoints // the reservoir
2396}
2397
2398// NewBooleanSampleReducer creates a new BooleanSampleReducer
2399func NewBooleanSampleReducer(size int) *BooleanSampleReducer {
2400	return &BooleanSampleReducer{
2401		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
2402		points: make(booleanPoints, size),
2403	}
2404}
2405
2406// AggregateBoolean aggregates a point into the reducer.
2407func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) {
2408	r.count++
2409	// Fill the reservoir with the first n points
2410	if r.count-1 < len(r.points) {
2411		p.CopyTo(&r.points[r.count-1])
2412		return
2413	}
2414
2415	// Generate a random integer between 1 and the count and
2416	// if that number is less than the length of the slice
2417	// replace the point at that index rnd with p.
2418	rnd := r.rng.Intn(r.count)
2419	if rnd < len(r.points) {
2420		p.CopyTo(&r.points[rnd])
2421	}
2422}
2423
2424// Emit emits the reservoir sample as many points.
2425func (r *BooleanSampleReducer) Emit() []BooleanPoint {
2426	min := len(r.points)
2427	if r.count < min {
2428		min = r.count
2429	}
2430	pts := r.points[:min]
2431	sort.Sort(pts)
2432	return pts
2433}
2434