1package query
2
3import (
4	"fmt"
5	"math"
6	"sort"
7	"time"
8
9	"github.com/influxdata/influxdb/query/internal/gota"
10	"github.com/influxdata/influxql"
11)
12
13/*
14This file contains iterator implementations for each function call available
15in InfluxQL. Call iterators are separated into two groups:
16
171. Map/reduce-style iterators - these are passed to IteratorCreator so that
18   processing can be at the low-level storage and aggregates are returned.
19
202. Raw aggregate iterators - these require the full set of data for a window.
21   These are handled by the select() function and raw points are streamed in
22   from the low-level storage.
23
24There are helpers to aid in building aggregate iterators. For simple map/reduce
25iterators, you can use the reduceIterator types and pass a reduce function. This
26reduce function is passed a previous and current value and the new timestamp,
27value, and auxiliary fields are returned from it.
28
29For raw aggregate iterators, you can use the reduceSliceIterators which pass
30in a slice of all points to the function and return a point. For more complex
31iterator types, you may need to create your own iterators by hand.
32
33Once your iterator is complete, you'll need to add it to the NewCallIterator()
34function if it is to be available to IteratorCreators and add it to the select()
35function to allow it to be included during planning.
36*/
37
38// NewCallIterator returns a new iterator for a Call.
39func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
40	name := opt.Expr.(*influxql.Call).Name
41	switch name {
42	case "count":
43		return newCountIterator(input, opt)
44	case "min":
45		return newMinIterator(input, opt)
46	case "max":
47		return newMaxIterator(input, opt)
48	case "sum":
49		return newSumIterator(input, opt)
50	case "first":
51		return newFirstIterator(input, opt)
52	case "last":
53		return newLastIterator(input, opt)
54	case "mean":
55		return newMeanIterator(input, opt)
56	default:
57		return nil, fmt.Errorf("unsupported function call: %s", name)
58	}
59}
60
61// newCountIterator returns an iterator for operating on a count() call.
62func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
63	// FIXME: Wrap iterator in int-type iterator and always output int value.
64
65	switch input := input.(type) {
66	case FloatIterator:
67		createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
68			fn := NewFloatFuncIntegerReducer(FloatCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
69			return fn, fn
70		}
71		return newFloatReduceIntegerIterator(input, opt, createFn), nil
72	case IntegerIterator:
73		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
74			fn := NewIntegerFuncReducer(IntegerCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
75			return fn, fn
76		}
77		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
78	case UnsignedIterator:
79		createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
80			fn := NewUnsignedFuncIntegerReducer(UnsignedCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
81			return fn, fn
82		}
83		return newUnsignedReduceIntegerIterator(input, opt, createFn), nil
84	case StringIterator:
85		createFn := func() (StringPointAggregator, IntegerPointEmitter) {
86			fn := NewStringFuncIntegerReducer(StringCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
87			return fn, fn
88		}
89		return newStringReduceIntegerIterator(input, opt, createFn), nil
90	case BooleanIterator:
91		createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
92			fn := NewBooleanFuncIntegerReducer(BooleanCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
93			return fn, fn
94		}
95		return newBooleanReduceIntegerIterator(input, opt, createFn), nil
96	default:
97		return nil, fmt.Errorf("unsupported count iterator type: %T", input)
98	}
99}
100
101// FloatCountReduce returns the count of points.
102func FloatCountReduce(prev *IntegerPoint, curr *FloatPoint) (int64, int64, []interface{}) {
103	if prev == nil {
104		return ZeroTime, 1, nil
105	}
106	return ZeroTime, prev.Value + 1, nil
107}
108
109// IntegerCountReduce returns the count of points.
110func IntegerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
111	if prev == nil {
112		return ZeroTime, 1, nil
113	}
114	return ZeroTime, prev.Value + 1, nil
115}
116
117// UnsignedCountReduce returns the count of points.
118func UnsignedCountReduce(prev *IntegerPoint, curr *UnsignedPoint) (int64, int64, []interface{}) {
119	if prev == nil {
120		return ZeroTime, 1, nil
121	}
122	return ZeroTime, prev.Value + 1, nil
123}
124
125// StringCountReduce returns the count of points.
126func StringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) {
127	if prev == nil {
128		return ZeroTime, 1, nil
129	}
130	return ZeroTime, prev.Value + 1, nil
131}
132
133// BooleanCountReduce returns the count of points.
134func BooleanCountReduce(prev *IntegerPoint, curr *BooleanPoint) (int64, int64, []interface{}) {
135	if prev == nil {
136		return ZeroTime, 1, nil
137	}
138	return ZeroTime, prev.Value + 1, nil
139}
140
141// newMinIterator returns an iterator for operating on a min() call.
142func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
143	switch input := input.(type) {
144	case FloatIterator:
145		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
146			fn := NewFloatFuncReducer(FloatMinReduce, nil)
147			return fn, fn
148		}
149		return newFloatReduceFloatIterator(input, opt, createFn), nil
150	case IntegerIterator:
151		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
152			fn := NewIntegerFuncReducer(IntegerMinReduce, nil)
153			return fn, fn
154		}
155		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
156	case UnsignedIterator:
157		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
158			fn := NewUnsignedFuncReducer(UnsignedMinReduce, nil)
159			return fn, fn
160		}
161		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
162	case BooleanIterator:
163		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
164			fn := NewBooleanFuncReducer(BooleanMinReduce, nil)
165			return fn, fn
166		}
167		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
168	default:
169		return nil, fmt.Errorf("unsupported min iterator type: %T", input)
170	}
171}
172
173// FloatMinReduce returns the minimum value between prev & curr.
174func FloatMinReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
175	if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
176		return curr.Time, curr.Value, cloneAux(curr.Aux)
177	}
178	return prev.Time, prev.Value, prev.Aux
179}
180
181// IntegerMinReduce returns the minimum value between prev & curr.
182func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
183	if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
184		return curr.Time, curr.Value, cloneAux(curr.Aux)
185	}
186	return prev.Time, prev.Value, prev.Aux
187}
188
189// UnsignedMinReduce returns the minimum value between prev & curr.
190func UnsignedMinReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
191	if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
192		return curr.Time, curr.Value, cloneAux(curr.Aux)
193	}
194	return prev.Time, prev.Value, prev.Aux
195}
196
197// BooleanMinReduce returns the minimum value between prev & curr.
198func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
199	if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
200		return curr.Time, curr.Value, cloneAux(curr.Aux)
201	}
202	return prev.Time, prev.Value, prev.Aux
203}
204
205// newMaxIterator returns an iterator for operating on a max() call.
206func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
207	switch input := input.(type) {
208	case FloatIterator:
209		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
210			fn := NewFloatFuncReducer(FloatMaxReduce, nil)
211			return fn, fn
212		}
213		return newFloatReduceFloatIterator(input, opt, createFn), nil
214	case IntegerIterator:
215		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
216			fn := NewIntegerFuncReducer(IntegerMaxReduce, nil)
217			return fn, fn
218		}
219		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
220	case UnsignedIterator:
221		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
222			fn := NewUnsignedFuncReducer(UnsignedMaxReduce, nil)
223			return fn, fn
224		}
225		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
226	case BooleanIterator:
227		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
228			fn := NewBooleanFuncReducer(BooleanMaxReduce, nil)
229			return fn, fn
230		}
231		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
232	default:
233		return nil, fmt.Errorf("unsupported max iterator type: %T", input)
234	}
235}
236
237// FloatMaxReduce returns the maximum value between prev & curr.
238func FloatMaxReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
239	if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
240		return curr.Time, curr.Value, cloneAux(curr.Aux)
241	}
242	return prev.Time, prev.Value, prev.Aux
243}
244
245// IntegerMaxReduce returns the maximum value between prev & curr.
246func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
247	if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
248		return curr.Time, curr.Value, cloneAux(curr.Aux)
249	}
250	return prev.Time, prev.Value, prev.Aux
251}
252
253// UnsignedMaxReduce returns the maximum value between prev & curr.
254func UnsignedMaxReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
255	if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) {
256		return curr.Time, curr.Value, cloneAux(curr.Aux)
257	}
258	return prev.Time, prev.Value, prev.Aux
259}
260
261// BooleanMaxReduce returns the minimum value between prev & curr.
262func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
263	if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) {
264		return curr.Time, curr.Value, cloneAux(curr.Aux)
265	}
266	return prev.Time, prev.Value, prev.Aux
267}
268
269// newSumIterator returns an iterator for operating on a sum() call.
270func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
271	switch input := input.(type) {
272	case FloatIterator:
273		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
274			fn := NewFloatFuncReducer(FloatSumReduce, &FloatPoint{Value: 0, Time: ZeroTime})
275			return fn, fn
276		}
277		return newFloatReduceFloatIterator(input, opt, createFn), nil
278	case IntegerIterator:
279		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
280			fn := NewIntegerFuncReducer(IntegerSumReduce, &IntegerPoint{Value: 0, Time: ZeroTime})
281			return fn, fn
282		}
283		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
284	case UnsignedIterator:
285		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
286			fn := NewUnsignedFuncReducer(UnsignedSumReduce, &UnsignedPoint{Value: 0, Time: ZeroTime})
287			return fn, fn
288		}
289		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
290	default:
291		return nil, fmt.Errorf("unsupported sum iterator type: %T", input)
292	}
293}
294
295// FloatSumReduce returns the sum prev value & curr value.
296func FloatSumReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
297	if prev == nil {
298		return ZeroTime, curr.Value, nil
299	}
300	return prev.Time, prev.Value + curr.Value, nil
301}
302
303// IntegerSumReduce returns the sum prev value & curr value.
304func IntegerSumReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
305	if prev == nil {
306		return ZeroTime, curr.Value, nil
307	}
308	return prev.Time, prev.Value + curr.Value, nil
309}
310
311// UnsignedSumReduce returns the sum prev value & curr value.
312func UnsignedSumReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
313	if prev == nil {
314		return ZeroTime, curr.Value, nil
315	}
316	return prev.Time, prev.Value + curr.Value, nil
317}
318
319// newFirstIterator returns an iterator for operating on a first() call.
320func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
321	switch input := input.(type) {
322	case FloatIterator:
323		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
324			fn := NewFloatFuncReducer(FloatFirstReduce, nil)
325			return fn, fn
326		}
327		return newFloatReduceFloatIterator(input, opt, createFn), nil
328	case IntegerIterator:
329		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
330			fn := NewIntegerFuncReducer(IntegerFirstReduce, nil)
331			return fn, fn
332		}
333		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
334	case UnsignedIterator:
335		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
336			fn := NewUnsignedFuncReducer(UnsignedFirstReduce, nil)
337			return fn, fn
338		}
339		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
340	case StringIterator:
341		createFn := func() (StringPointAggregator, StringPointEmitter) {
342			fn := NewStringFuncReducer(StringFirstReduce, nil)
343			return fn, fn
344		}
345		return newStringReduceStringIterator(input, opt, createFn), nil
346	case BooleanIterator:
347		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
348			fn := NewBooleanFuncReducer(BooleanFirstReduce, nil)
349			return fn, fn
350		}
351		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
352	default:
353		return nil, fmt.Errorf("unsupported first iterator type: %T", input)
354	}
355}
356
357// FloatFirstReduce returns the first point sorted by time.
358func FloatFirstReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
359	if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
360		return curr.Time, curr.Value, cloneAux(curr.Aux)
361	}
362	return prev.Time, prev.Value, prev.Aux
363}
364
365// IntegerFirstReduce returns the first point sorted by time.
366func IntegerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
367	if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
368		return curr.Time, curr.Value, cloneAux(curr.Aux)
369	}
370	return prev.Time, prev.Value, prev.Aux
371}
372
373// UnsignedFirstReduce returns the first point sorted by time.
374func UnsignedFirstReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
375	if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
376		return curr.Time, curr.Value, cloneAux(curr.Aux)
377	}
378	return prev.Time, prev.Value, prev.Aux
379}
380
381// StringFirstReduce returns the first point sorted by time.
382func StringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
383	if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
384		return curr.Time, curr.Value, cloneAux(curr.Aux)
385	}
386	return prev.Time, prev.Value, prev.Aux
387}
388
389// BooleanFirstReduce returns the first point sorted by time.
390func BooleanFirstReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
391	if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && !curr.Value && prev.Value) {
392		return curr.Time, curr.Value, cloneAux(curr.Aux)
393	}
394	return prev.Time, prev.Value, prev.Aux
395}
396
397// newLastIterator returns an iterator for operating on a last() call.
398func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
399	switch input := input.(type) {
400	case FloatIterator:
401		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
402			fn := NewFloatFuncReducer(FloatLastReduce, nil)
403			return fn, fn
404		}
405		return newFloatReduceFloatIterator(input, opt, createFn), nil
406	case IntegerIterator:
407		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
408			fn := NewIntegerFuncReducer(IntegerLastReduce, nil)
409			return fn, fn
410		}
411		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
412	case UnsignedIterator:
413		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
414			fn := NewUnsignedFuncReducer(UnsignedLastReduce, nil)
415			return fn, fn
416		}
417		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
418	case StringIterator:
419		createFn := func() (StringPointAggregator, StringPointEmitter) {
420			fn := NewStringFuncReducer(StringLastReduce, nil)
421			return fn, fn
422		}
423		return newStringReduceStringIterator(input, opt, createFn), nil
424	case BooleanIterator:
425		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
426			fn := NewBooleanFuncReducer(BooleanLastReduce, nil)
427			return fn, fn
428		}
429		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
430	default:
431		return nil, fmt.Errorf("unsupported last iterator type: %T", input)
432	}
433}
434
435// FloatLastReduce returns the last point sorted by time.
436func FloatLastReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) {
437	if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
438		return curr.Time, curr.Value, cloneAux(curr.Aux)
439	}
440	return prev.Time, prev.Value, prev.Aux
441}
442
443// IntegerLastReduce returns the last point sorted by time.
444func IntegerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) {
445	if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
446		return curr.Time, curr.Value, cloneAux(curr.Aux)
447	}
448	return prev.Time, prev.Value, prev.Aux
449}
450
451// UnsignedLastReduce returns the last point sorted by time.
452func UnsignedLastReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) {
453	if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
454		return curr.Time, curr.Value, cloneAux(curr.Aux)
455	}
456	return prev.Time, prev.Value, prev.Aux
457}
458
459// StringLastReduce returns the first point sorted by time.
460func StringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) {
461	if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) {
462		return curr.Time, curr.Value, cloneAux(curr.Aux)
463	}
464	return prev.Time, prev.Value, prev.Aux
465}
466
467// BooleanLastReduce returns the first point sorted by time.
468func BooleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) {
469	if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value && !prev.Value) {
470		return curr.Time, curr.Value, cloneAux(curr.Aux)
471	}
472	return prev.Time, prev.Value, prev.Aux
473}
474
475// NewDistinctIterator returns an iterator for operating on a distinct() call.
476func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
477	switch input := input.(type) {
478	case FloatIterator:
479		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
480			fn := NewFloatDistinctReducer()
481			return fn, fn
482		}
483		return newFloatReduceFloatIterator(input, opt, createFn), nil
484	case IntegerIterator:
485		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
486			fn := NewIntegerDistinctReducer()
487			return fn, fn
488		}
489		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
490	case UnsignedIterator:
491		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
492			fn := NewUnsignedDistinctReducer()
493			return fn, fn
494		}
495		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
496	case StringIterator:
497		createFn := func() (StringPointAggregator, StringPointEmitter) {
498			fn := NewStringDistinctReducer()
499			return fn, fn
500		}
501		return newStringReduceStringIterator(input, opt, createFn), nil
502	case BooleanIterator:
503		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
504			fn := NewBooleanDistinctReducer()
505			return fn, fn
506		}
507		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
508	default:
509		return nil, fmt.Errorf("unsupported distinct iterator type: %T", input)
510	}
511}
512
513// newMeanIterator returns an iterator for operating on a mean() call.
514func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
515	switch input := input.(type) {
516	case FloatIterator:
517		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
518			fn := NewFloatMeanReducer()
519			return fn, fn
520		}
521		return newFloatReduceFloatIterator(input, opt, createFn), nil
522	case IntegerIterator:
523		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
524			fn := NewIntegerMeanReducer()
525			return fn, fn
526		}
527		return newIntegerReduceFloatIterator(input, opt, createFn), nil
528	case UnsignedIterator:
529		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
530			fn := NewUnsignedMeanReducer()
531			return fn, fn
532		}
533		return newUnsignedReduceFloatIterator(input, opt, createFn), nil
534	default:
535		return nil, fmt.Errorf("unsupported mean iterator type: %T", input)
536	}
537}
538
539// NewMedianIterator returns an iterator for operating on a median() call.
540func NewMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
541	return newMedianIterator(input, opt)
542}
543
544// newMedianIterator returns an iterator for operating on a median() call.
545func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
546	switch input := input.(type) {
547	case FloatIterator:
548		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
549			fn := NewFloatSliceFuncReducer(FloatMedianReduceSlice)
550			return fn, fn
551		}
552		return newFloatReduceFloatIterator(input, opt, createFn), nil
553	case IntegerIterator:
554		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
555			fn := NewIntegerSliceFuncFloatReducer(IntegerMedianReduceSlice)
556			return fn, fn
557		}
558		return newIntegerReduceFloatIterator(input, opt, createFn), nil
559	case UnsignedIterator:
560		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
561			fn := NewUnsignedSliceFuncFloatReducer(UnsignedMedianReduceSlice)
562			return fn, fn
563		}
564		return newUnsignedReduceFloatIterator(input, opt, createFn), nil
565	default:
566		return nil, fmt.Errorf("unsupported median iterator type: %T", input)
567	}
568}
569
570// FloatMedianReduceSlice returns the median value within a window.
571func FloatMedianReduceSlice(a []FloatPoint) []FloatPoint {
572	if len(a) == 1 {
573		return a
574	}
575
576	// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
577
578	// Return the middle value from the points.
579	// If there are an even number of points then return the mean of the two middle points.
580	sort.Sort(floatPointsByValue(a))
581	if len(a)%2 == 0 {
582		lo, hi := a[len(a)/2-1], a[(len(a)/2)]
583		return []FloatPoint{{Time: ZeroTime, Value: lo.Value + (hi.Value-lo.Value)/2}}
584	}
585	return []FloatPoint{{Time: ZeroTime, Value: a[len(a)/2].Value}}
586}
587
588// IntegerMedianReduceSlice returns the median value within a window.
589func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
590	if len(a) == 1 {
591		return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
592	}
593
594	// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
595
596	// Return the middle value from the points.
597	// If there are an even number of points then return the mean of the two middle points.
598	sort.Sort(integerPointsByValue(a))
599	if len(a)%2 == 0 {
600		lo, hi := a[len(a)/2-1], a[(len(a)/2)]
601		return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
602	}
603	return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
604}
605
606// UnsignedMedianReduceSlice returns the median value within a window.
607func UnsignedMedianReduceSlice(a []UnsignedPoint) []FloatPoint {
608	if len(a) == 1 {
609		return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}}
610	}
611
612	// OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1.
613
614	// Return the middle value from the points.
615	// If there are an even number of points then return the mean of the two middle points.
616	sort.Sort(unsignedPointsByValue(a))
617	if len(a)%2 == 0 {
618		lo, hi := a[len(a)/2-1], a[(len(a)/2)]
619		return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}}
620	}
621	return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
622}
623
624// newModeIterator returns an iterator for operating on a mode() call.
625func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
626	switch input := input.(type) {
627	case FloatIterator:
628		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
629			fn := NewFloatSliceFuncReducer(FloatModeReduceSlice)
630			return fn, fn
631		}
632		return newFloatReduceFloatIterator(input, opt, createFn), nil
633	case IntegerIterator:
634		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
635			fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice)
636			return fn, fn
637		}
638		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
639	case UnsignedIterator:
640		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
641			fn := NewUnsignedSliceFuncReducer(UnsignedModeReduceSlice)
642			return fn, fn
643		}
644		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
645	case StringIterator:
646		createFn := func() (StringPointAggregator, StringPointEmitter) {
647			fn := NewStringSliceFuncReducer(StringModeReduceSlice)
648			return fn, fn
649		}
650		return newStringReduceStringIterator(input, opt, createFn), nil
651	case BooleanIterator:
652		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
653			fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice)
654			return fn, fn
655		}
656		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
657	default:
658		return nil, fmt.Errorf("unsupported median iterator type: %T", input)
659	}
660}
661
662// FloatModeReduceSlice returns the mode value within a window.
663func FloatModeReduceSlice(a []FloatPoint) []FloatPoint {
664	if len(a) == 1 {
665		return a
666	}
667
668	sort.Sort(floatPointsByValue(a))
669
670	mostFreq := 0
671	currFreq := 0
672	currMode := a[0].Value
673	mostMode := a[0].Value
674	mostTime := a[0].Time
675	currTime := a[0].Time
676
677	for _, p := range a {
678		if p.Value != currMode {
679			currFreq = 1
680			currMode = p.Value
681			currTime = p.Time
682			continue
683		}
684		currFreq++
685		if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
686			continue
687		}
688		mostFreq = currFreq
689		mostMode = p.Value
690		mostTime = p.Time
691	}
692
693	return []FloatPoint{{Time: ZeroTime, Value: mostMode}}
694}
695
696// IntegerModeReduceSlice returns the mode value within a window.
697func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint {
698	if len(a) == 1 {
699		return a
700	}
701	sort.Sort(integerPointsByValue(a))
702
703	mostFreq := 0
704	currFreq := 0
705	currMode := a[0].Value
706	mostMode := a[0].Value
707	mostTime := a[0].Time
708	currTime := a[0].Time
709
710	for _, p := range a {
711		if p.Value != currMode {
712			currFreq = 1
713			currMode = p.Value
714			currTime = p.Time
715			continue
716		}
717		currFreq++
718		if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
719			continue
720		}
721		mostFreq = currFreq
722		mostMode = p.Value
723		mostTime = p.Time
724	}
725
726	return []IntegerPoint{{Time: ZeroTime, Value: mostMode}}
727}
728
729// UnsignedModeReduceSlice returns the mode value within a window.
730func UnsignedModeReduceSlice(a []UnsignedPoint) []UnsignedPoint {
731	if len(a) == 1 {
732		return a
733	}
734	sort.Sort(unsignedPointsByValue(a))
735
736	mostFreq := 0
737	currFreq := 0
738	currMode := a[0].Value
739	mostMode := a[0].Value
740	mostTime := a[0].Time
741	currTime := a[0].Time
742
743	for _, p := range a {
744		if p.Value != currMode {
745			currFreq = 1
746			currMode = p.Value
747			currTime = p.Time
748			continue
749		}
750		currFreq++
751		if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
752			continue
753		}
754		mostFreq = currFreq
755		mostMode = p.Value
756		mostTime = p.Time
757	}
758
759	return []UnsignedPoint{{Time: ZeroTime, Value: mostMode}}
760}
761
762// StringModeReduceSlice returns the mode value within a window.
763func StringModeReduceSlice(a []StringPoint) []StringPoint {
764	if len(a) == 1 {
765		return a
766	}
767
768	sort.Sort(stringPointsByValue(a))
769
770	mostFreq := 0
771	currFreq := 0
772	currMode := a[0].Value
773	mostMode := a[0].Value
774	mostTime := a[0].Time
775	currTime := a[0].Time
776
777	for _, p := range a {
778		if p.Value != currMode {
779			currFreq = 1
780			currMode = p.Value
781			currTime = p.Time
782			continue
783		}
784		currFreq++
785		if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
786			continue
787		}
788		mostFreq = currFreq
789		mostMode = p.Value
790		mostTime = p.Time
791	}
792
793	return []StringPoint{{Time: ZeroTime, Value: mostMode}}
794}
795
796// BooleanModeReduceSlice returns the mode value within a window.
797func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint {
798	if len(a) == 1 {
799		return a
800	}
801
802	trueFreq := 0
803	falsFreq := 0
804	mostMode := false
805
806	for _, p := range a {
807		if p.Value {
808			trueFreq++
809		} else {
810			falsFreq++
811		}
812	}
813	// In case either of true or false are mode then retuned mode value wont be
814	// of metric with oldest timestamp
815	if trueFreq >= falsFreq {
816		mostMode = true
817	}
818
819	return []BooleanPoint{{Time: ZeroTime, Value: mostMode}}
820}
821
822// newStddevIterator returns an iterator for operating on a stddev() call.
823func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
824	switch input := input.(type) {
825	case FloatIterator:
826		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
827			fn := NewFloatSliceFuncReducer(FloatStddevReduceSlice)
828			return fn, fn
829		}
830		return newFloatReduceFloatIterator(input, opt, createFn), nil
831	case IntegerIterator:
832		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
833			fn := NewIntegerSliceFuncFloatReducer(IntegerStddevReduceSlice)
834			return fn, fn
835		}
836		return newIntegerReduceFloatIterator(input, opt, createFn), nil
837	case UnsignedIterator:
838		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
839			fn := NewUnsignedSliceFuncFloatReducer(UnsignedStddevReduceSlice)
840			return fn, fn
841		}
842		return newUnsignedReduceFloatIterator(input, opt, createFn), nil
843	default:
844		return nil, fmt.Errorf("unsupported stddev iterator type: %T", input)
845	}
846}
847
848// FloatStddevReduceSlice returns the stddev value within a window.
849func FloatStddevReduceSlice(a []FloatPoint) []FloatPoint {
850	// If there is only one point then return NaN.
851	if len(a) < 2 {
852		return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}}
853	}
854
855	// Calculate the mean.
856	var mean float64
857	var count int
858	for _, p := range a {
859		if math.IsNaN(p.Value) {
860			continue
861		}
862		count++
863		mean += (p.Value - mean) / float64(count)
864	}
865
866	// Calculate the variance.
867	var variance float64
868	for _, p := range a {
869		if math.IsNaN(p.Value) {
870			continue
871		}
872		variance += math.Pow(p.Value-mean, 2)
873	}
874	return []FloatPoint{{
875		Time:  ZeroTime,
876		Value: math.Sqrt(variance / float64(count-1)),
877	}}
878}
879
880// IntegerStddevReduceSlice returns the stddev value within a window.
881func IntegerStddevReduceSlice(a []IntegerPoint) []FloatPoint {
882	// If there is only one point then return NaN.
883	if len(a) < 2 {
884		return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}}
885	}
886
887	// Calculate the mean.
888	var mean float64
889	var count int
890	for _, p := range a {
891		count++
892		mean += (float64(p.Value) - mean) / float64(count)
893	}
894
895	// Calculate the variance.
896	var variance float64
897	for _, p := range a {
898		variance += math.Pow(float64(p.Value)-mean, 2)
899	}
900	return []FloatPoint{{
901		Time:  ZeroTime,
902		Value: math.Sqrt(variance / float64(count-1)),
903	}}
904}
905
906// UnsignedStddevReduceSlice returns the stddev value within a window.
907func UnsignedStddevReduceSlice(a []UnsignedPoint) []FloatPoint {
908	// If there is only one point then return NaN.
909	if len(a) < 2 {
910		return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}}
911	}
912
913	// Calculate the mean.
914	var mean float64
915	var count int
916	for _, p := range a {
917		count++
918		mean += (float64(p.Value) - mean) / float64(count)
919	}
920
921	// Calculate the variance.
922	var variance float64
923	for _, p := range a {
924		variance += math.Pow(float64(p.Value)-mean, 2)
925	}
926	return []FloatPoint{{
927		Time:  ZeroTime,
928		Value: math.Sqrt(variance / float64(count-1)),
929	}}
930}
931
932// newSpreadIterator returns an iterator for operating on a spread() call.
933func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
934	switch input := input.(type) {
935	case FloatIterator:
936		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
937			fn := NewFloatSpreadReducer()
938			return fn, fn
939		}
940		return newFloatReduceFloatIterator(input, opt, createFn), nil
941	case IntegerIterator:
942		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
943			fn := NewIntegerSpreadReducer()
944			return fn, fn
945		}
946		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
947	case UnsignedIterator:
948		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
949			fn := NewUnsignedSpreadReducer()
950			return fn, fn
951		}
952		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
953	default:
954		return nil, fmt.Errorf("unsupported spread iterator type: %T", input)
955	}
956}
957
958func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
959	switch input := input.(type) {
960	case FloatIterator:
961		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
962			fn := NewFloatTopReducer(n)
963			return fn, fn
964		}
965		itr := newFloatReduceFloatIterator(input, opt, createFn)
966		itr.keepTags = keepTags
967		return itr, nil
968	case IntegerIterator:
969		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
970			fn := NewIntegerTopReducer(n)
971			return fn, fn
972		}
973		itr := newIntegerReduceIntegerIterator(input, opt, createFn)
974		itr.keepTags = keepTags
975		return itr, nil
976	case UnsignedIterator:
977		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
978			fn := NewUnsignedTopReducer(n)
979			return fn, fn
980		}
981		itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
982		itr.keepTags = keepTags
983		return itr, nil
984	default:
985		return nil, fmt.Errorf("unsupported top iterator type: %T", input)
986	}
987}
988
989func newBottomIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) {
990	switch input := input.(type) {
991	case FloatIterator:
992		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
993			fn := NewFloatBottomReducer(n)
994			return fn, fn
995		}
996		itr := newFloatReduceFloatIterator(input, opt, createFn)
997		itr.keepTags = keepTags
998		return itr, nil
999	case IntegerIterator:
1000		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1001			fn := NewIntegerBottomReducer(n)
1002			return fn, fn
1003		}
1004		itr := newIntegerReduceIntegerIterator(input, opt, createFn)
1005		itr.keepTags = keepTags
1006		return itr, nil
1007	case UnsignedIterator:
1008		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
1009			fn := NewUnsignedBottomReducer(n)
1010			return fn, fn
1011		}
1012		itr := newUnsignedReduceUnsignedIterator(input, opt, createFn)
1013		itr.keepTags = keepTags
1014		return itr, nil
1015	default:
1016		return nil, fmt.Errorf("unsupported bottom iterator type: %T", input)
1017	}
1018}
1019
1020// newPercentileIterator returns an iterator for operating on a percentile() call.
1021func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
1022	switch input := input.(type) {
1023	case FloatIterator:
1024		floatPercentileReduceSlice := NewFloatPercentileReduceSliceFunc(percentile)
1025		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1026			fn := NewFloatSliceFuncReducer(floatPercentileReduceSlice)
1027			return fn, fn
1028		}
1029		return newFloatReduceFloatIterator(input, opt, createFn), nil
1030	case IntegerIterator:
1031		integerPercentileReduceSlice := NewIntegerPercentileReduceSliceFunc(percentile)
1032		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1033			fn := NewIntegerSliceFuncReducer(integerPercentileReduceSlice)
1034			return fn, fn
1035		}
1036		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
1037	case UnsignedIterator:
1038		unsignedPercentileReduceSlice := NewUnsignedPercentileReduceSliceFunc(percentile)
1039		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
1040			fn := NewUnsignedSliceFuncReducer(unsignedPercentileReduceSlice)
1041			return fn, fn
1042		}
1043		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
1044	default:
1045		return nil, fmt.Errorf("unsupported percentile iterator type: %T", input)
1046	}
1047}
1048
1049// NewFloatPercentileReduceSliceFunc returns the percentile value within a window.
1050func NewFloatPercentileReduceSliceFunc(percentile float64) FloatReduceSliceFunc {
1051	return func(a []FloatPoint) []FloatPoint {
1052		length := len(a)
1053		i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
1054
1055		if i < 0 || i >= length {
1056			return nil
1057		}
1058
1059		sort.Sort(floatPointsByValue(a))
1060		return []FloatPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
1061	}
1062}
1063
1064// NewIntegerPercentileReduceSliceFunc returns the percentile value within a window.
1065func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceFunc {
1066	return func(a []IntegerPoint) []IntegerPoint {
1067		length := len(a)
1068		i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
1069
1070		if i < 0 || i >= length {
1071			return nil
1072		}
1073
1074		sort.Sort(integerPointsByValue(a))
1075		return []IntegerPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
1076	}
1077}
1078
1079// NewUnsignedPercentileReduceSliceFunc returns the percentile value within a window.
1080func NewUnsignedPercentileReduceSliceFunc(percentile float64) UnsignedReduceSliceFunc {
1081	return func(a []UnsignedPoint) []UnsignedPoint {
1082		length := len(a)
1083		i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1
1084
1085		if i < 0 || i >= length {
1086			return nil
1087		}
1088
1089		sort.Sort(unsignedPointsByValue(a))
1090		return []UnsignedPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}}
1091	}
1092}
1093
1094// newDerivativeIterator returns an iterator for operating on a derivative() call.
1095func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
1096	switch input := input.(type) {
1097	case FloatIterator:
1098		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1099			fn := NewFloatDerivativeReducer(interval, isNonNegative, opt.Ascending)
1100			return fn, fn
1101		}
1102		return newFloatStreamFloatIterator(input, createFn, opt), nil
1103	case IntegerIterator:
1104		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1105			fn := NewIntegerDerivativeReducer(interval, isNonNegative, opt.Ascending)
1106			return fn, fn
1107		}
1108		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1109	case UnsignedIterator:
1110		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1111			fn := NewUnsignedDerivativeReducer(interval, isNonNegative, opt.Ascending)
1112			return fn, fn
1113		}
1114		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1115	default:
1116		return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
1117	}
1118}
1119
1120// newDifferenceIterator returns an iterator for operating on a difference() call.
1121func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bool) (Iterator, error) {
1122	switch input := input.(type) {
1123	case FloatIterator:
1124		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1125			fn := NewFloatDifferenceReducer(isNonNegative)
1126			return fn, fn
1127		}
1128		return newFloatStreamFloatIterator(input, createFn, opt), nil
1129	case IntegerIterator:
1130		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1131			fn := NewIntegerDifferenceReducer(isNonNegative)
1132			return fn, fn
1133		}
1134		return newIntegerStreamIntegerIterator(input, createFn, opt), nil
1135	case UnsignedIterator:
1136		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
1137			fn := NewUnsignedDifferenceReducer(isNonNegative)
1138			return fn, fn
1139		}
1140		return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
1141	default:
1142		return nil, fmt.Errorf("unsupported difference iterator type: %T", input)
1143	}
1144}
1145
1146// newElapsedIterator returns an iterator for operating on a elapsed() call.
1147func newElapsedIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
1148	switch input := input.(type) {
1149	case FloatIterator:
1150		createFn := func() (FloatPointAggregator, IntegerPointEmitter) {
1151			fn := NewFloatElapsedReducer(interval)
1152			return fn, fn
1153		}
1154		return newFloatStreamIntegerIterator(input, createFn, opt), nil
1155	case IntegerIterator:
1156		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1157			fn := NewIntegerElapsedReducer(interval)
1158			return fn, fn
1159		}
1160		return newIntegerStreamIntegerIterator(input, createFn, opt), nil
1161	case UnsignedIterator:
1162		createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) {
1163			fn := NewUnsignedElapsedReducer(interval)
1164			return fn, fn
1165		}
1166		return newUnsignedStreamIntegerIterator(input, createFn, opt), nil
1167	case BooleanIterator:
1168		createFn := func() (BooleanPointAggregator, IntegerPointEmitter) {
1169			fn := NewBooleanElapsedReducer(interval)
1170			return fn, fn
1171		}
1172		return newBooleanStreamIntegerIterator(input, createFn, opt), nil
1173	case StringIterator:
1174		createFn := func() (StringPointAggregator, IntegerPointEmitter) {
1175			fn := NewStringElapsedReducer(interval)
1176			return fn, fn
1177		}
1178		return newStringStreamIntegerIterator(input, createFn, opt), nil
1179	default:
1180		return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
1181	}
1182}
1183
1184// newMovingAverageIterator returns an iterator for operating on a moving_average() call.
1185func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) {
1186	switch input := input.(type) {
1187	case FloatIterator:
1188		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1189			fn := NewFloatMovingAverageReducer(n)
1190			return fn, fn
1191		}
1192		return newFloatStreamFloatIterator(input, createFn, opt), nil
1193	case IntegerIterator:
1194		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1195			fn := NewIntegerMovingAverageReducer(n)
1196			return fn, fn
1197		}
1198		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1199	case UnsignedIterator:
1200		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1201			fn := NewUnsignedMovingAverageReducer(n)
1202			return fn, fn
1203		}
1204		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1205	default:
1206		return nil, fmt.Errorf("unsupported moving average iterator type: %T", input)
1207	}
1208}
1209
1210// newExponentialMovingAverageIterator returns an iterator for operating on an exponential_moving_average() call.
1211func newExponentialMovingAverageIterator(input Iterator, n, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1212	switch input := input.(type) {
1213	case FloatIterator:
1214		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1215			fn := NewExponentialMovingAverageReducer(n, nHold, warmupType)
1216			return fn, fn
1217		}
1218		return newFloatStreamFloatIterator(input, createFn, opt), nil
1219	case IntegerIterator:
1220		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1221			fn := NewExponentialMovingAverageReducer(n, nHold, warmupType)
1222			return fn, fn
1223		}
1224		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1225	case UnsignedIterator:
1226		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1227			fn := NewExponentialMovingAverageReducer(n, nHold, warmupType)
1228			return fn, fn
1229		}
1230		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1231	default:
1232		return nil, fmt.Errorf("unsupported exponential moving average iterator type: %T", input)
1233	}
1234}
1235
1236// newDoubleExponentialMovingAverageIterator returns an iterator for operating on a double_exponential_moving_average() call.
1237func newDoubleExponentialMovingAverageIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1238	switch input := input.(type) {
1239	case FloatIterator:
1240		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1241			fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType)
1242			return fn, fn
1243		}
1244		return newFloatStreamFloatIterator(input, createFn, opt), nil
1245	case IntegerIterator:
1246		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1247			fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType)
1248			return fn, fn
1249		}
1250		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1251	case UnsignedIterator:
1252		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1253			fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType)
1254			return fn, fn
1255		}
1256		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1257	default:
1258		return nil, fmt.Errorf("unsupported double exponential moving average iterator type: %T", input)
1259	}
1260}
1261
1262// newTripleExponentialMovingAverageIterator returns an iterator for operating on a triple_exponential_moving_average() call.
1263func newTripleExponentialMovingAverageIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1264	switch input := input.(type) {
1265	case FloatIterator:
1266		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1267			fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType)
1268			return fn, fn
1269		}
1270		return newFloatStreamFloatIterator(input, createFn, opt), nil
1271	case IntegerIterator:
1272		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1273			fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType)
1274			return fn, fn
1275		}
1276		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1277	case UnsignedIterator:
1278		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1279			fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType)
1280			return fn, fn
1281		}
1282		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1283	default:
1284		return nil, fmt.Errorf("unsupported triple exponential moving average iterator type: %T", input)
1285	}
1286}
1287
1288// newRelativeStrengthIndexIterator returns an iterator for operating on a triple_exponential_moving_average() call.
1289func newRelativeStrengthIndexIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1290	switch input := input.(type) {
1291	case FloatIterator:
1292		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1293			fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType)
1294			return fn, fn
1295		}
1296		return newFloatStreamFloatIterator(input, createFn, opt), nil
1297	case IntegerIterator:
1298		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1299			fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType)
1300			return fn, fn
1301		}
1302		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1303	case UnsignedIterator:
1304		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1305			fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType)
1306			return fn, fn
1307		}
1308		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1309	default:
1310		return nil, fmt.Errorf("unsupported relative strength index iterator type: %T", input)
1311	}
1312}
1313
1314// newTripleExponentialDerivativeIterator returns an iterator for operating on a triple_exponential_moving_average() call.
1315func newTripleExponentialDerivativeIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1316	switch input := input.(type) {
1317	case FloatIterator:
1318		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1319			fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType)
1320			return fn, fn
1321		}
1322		return newFloatStreamFloatIterator(input, createFn, opt), nil
1323	case IntegerIterator:
1324		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1325			fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType)
1326			return fn, fn
1327		}
1328		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1329	case UnsignedIterator:
1330		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1331			fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType)
1332			return fn, fn
1333		}
1334		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1335	default:
1336		return nil, fmt.Errorf("unsupported triple exponential derivative iterator type: %T", input)
1337	}
1338}
1339
1340// newKaufmansEfficiencyRatioIterator returns an iterator for operating on a kaufmans_efficiency_ratio() call.
1341func newKaufmansEfficiencyRatioIterator(input Iterator, n int, nHold int, opt IteratorOptions) (Iterator, error) {
1342	switch input := input.(type) {
1343	case FloatIterator:
1344		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1345			fn := NewKaufmansEfficiencyRatioReducer(n, nHold)
1346			return fn, fn
1347		}
1348		return newFloatStreamFloatIterator(input, createFn, opt), nil
1349	case IntegerIterator:
1350		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1351			fn := NewKaufmansEfficiencyRatioReducer(n, nHold)
1352			return fn, fn
1353		}
1354		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1355	case UnsignedIterator:
1356		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1357			fn := NewKaufmansEfficiencyRatioReducer(n, nHold)
1358			return fn, fn
1359		}
1360		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1361	default:
1362		return nil, fmt.Errorf("unsupported kaufmans efficiency ratio iterator type: %T", input)
1363	}
1364}
1365
1366// newKaufmansAdaptiveMovingAverageIterator returns an iterator for operating on a kaufmans_adaptive_moving_average() call.
1367func newKaufmansAdaptiveMovingAverageIterator(input Iterator, n int, nHold int, opt IteratorOptions) (Iterator, error) {
1368	switch input := input.(type) {
1369	case FloatIterator:
1370		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1371			fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold)
1372			return fn, fn
1373		}
1374		return newFloatStreamFloatIterator(input, createFn, opt), nil
1375	case IntegerIterator:
1376		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1377			fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold)
1378			return fn, fn
1379		}
1380		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1381	case UnsignedIterator:
1382		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1383			fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold)
1384			return fn, fn
1385		}
1386		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1387	default:
1388		return nil, fmt.Errorf("unsupported kaufmans adaptive moving average iterator type: %T", input)
1389	}
1390}
1391
1392// newChandeMomentumOscillatorIterator returns an iterator for operating on a triple_exponential_moving_average() call.
1393func newChandeMomentumOscillatorIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) {
1394	switch input := input.(type) {
1395	case FloatIterator:
1396		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1397			fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType)
1398			return fn, fn
1399		}
1400		return newFloatStreamFloatIterator(input, createFn, opt), nil
1401	case IntegerIterator:
1402		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1403			fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType)
1404			return fn, fn
1405		}
1406		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1407	case UnsignedIterator:
1408		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1409			fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType)
1410			return fn, fn
1411		}
1412		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1413	default:
1414		return nil, fmt.Errorf("unsupported chande momentum oscillator iterator type: %T", input)
1415	}
1416}
1417
1418// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call.
1419func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
1420	switch input := input.(type) {
1421	case FloatIterator:
1422		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1423			fn := NewFloatCumulativeSumReducer()
1424			return fn, fn
1425		}
1426		return newFloatStreamFloatIterator(input, createFn, opt), nil
1427	case IntegerIterator:
1428		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1429			fn := NewIntegerCumulativeSumReducer()
1430			return fn, fn
1431		}
1432		return newIntegerStreamIntegerIterator(input, createFn, opt), nil
1433	case UnsignedIterator:
1434		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
1435			fn := NewUnsignedCumulativeSumReducer()
1436			return fn, fn
1437		}
1438		return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil
1439	default:
1440		return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input)
1441	}
1442}
1443
1444// newHoltWintersIterator returns an iterator for operating on a holt_winters() call.
1445func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) {
1446	switch input := input.(type) {
1447	case FloatIterator:
1448		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1449			fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval)
1450			return fn, fn
1451		}
1452		return newFloatReduceFloatIterator(input, opt, createFn), nil
1453	case IntegerIterator:
1454		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1455			fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval)
1456			return fn, fn
1457		}
1458		return newIntegerReduceFloatIterator(input, opt, createFn), nil
1459	default:
1460		return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
1461	}
1462}
1463
1464// NewSampleIterator returns an iterator for operating on a sample() call (exported for use in test).
1465func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
1466	return newSampleIterator(input, opt, size)
1467}
1468
1469// newSampleIterator returns an iterator for operating on a sample() call.
1470func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
1471	switch input := input.(type) {
1472	case FloatIterator:
1473		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1474			fn := NewFloatSampleReducer(size)
1475			return fn, fn
1476		}
1477		return newFloatReduceFloatIterator(input, opt, createFn), nil
1478	case IntegerIterator:
1479		createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
1480			fn := NewIntegerSampleReducer(size)
1481			return fn, fn
1482		}
1483		return newIntegerReduceIntegerIterator(input, opt, createFn), nil
1484	case UnsignedIterator:
1485		createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) {
1486			fn := NewUnsignedSampleReducer(size)
1487			return fn, fn
1488		}
1489		return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil
1490	case StringIterator:
1491		createFn := func() (StringPointAggregator, StringPointEmitter) {
1492			fn := NewStringSampleReducer(size)
1493			return fn, fn
1494		}
1495		return newStringReduceStringIterator(input, opt, createFn), nil
1496	case BooleanIterator:
1497		createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
1498			fn := NewBooleanSampleReducer(size)
1499			return fn, fn
1500		}
1501		return newBooleanReduceBooleanIterator(input, opt, createFn), nil
1502	default:
1503		return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
1504	}
1505}
1506
1507// newIntegralIterator returns an iterator for operating on a integral() call.
1508func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
1509	switch input := input.(type) {
1510	case FloatIterator:
1511		createFn := func() (FloatPointAggregator, FloatPointEmitter) {
1512			fn := NewFloatIntegralReducer(interval, opt)
1513			return fn, fn
1514		}
1515		return newFloatStreamFloatIterator(input, createFn, opt), nil
1516	case IntegerIterator:
1517		createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
1518			fn := NewIntegerIntegralReducer(interval, opt)
1519			return fn, fn
1520		}
1521		return newIntegerStreamFloatIterator(input, createFn, opt), nil
1522	case UnsignedIterator:
1523		createFn := func() (UnsignedPointAggregator, FloatPointEmitter) {
1524			fn := NewUnsignedIntegralReducer(interval, opt)
1525			return fn, fn
1526		}
1527		return newUnsignedStreamFloatIterator(input, createFn, opt), nil
1528	default:
1529		return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
1530	}
1531}
1532