1package query 2 3import ( 4 "fmt" 5 "math" 6 "sort" 7 "time" 8 9 "github.com/influxdata/influxdb/query/internal/gota" 10 "github.com/influxdata/influxql" 11) 12 13/* 14This file contains iterator implementations for each function call available 15in InfluxQL. Call iterators are separated into two groups: 16 171. Map/reduce-style iterators - these are passed to IteratorCreator so that 18 processing can be at the low-level storage and aggregates are returned. 19 202. Raw aggregate iterators - these require the full set of data for a window. 21 These are handled by the select() function and raw points are streamed in 22 from the low-level storage. 23 24There are helpers to aid in building aggregate iterators. For simple map/reduce 25iterators, you can use the reduceIterator types and pass a reduce function. This 26reduce function is passed a previous and current value and the new timestamp, 27value, and auxiliary fields are returned from it. 28 29For raw aggregate iterators, you can use the reduceSliceIterators which pass 30in a slice of all points to the function and return a point. For more complex 31iterator types, you may need to create your own iterators by hand. 32 33Once your iterator is complete, you'll need to add it to the NewCallIterator() 34function if it is to be available to IteratorCreators and add it to the select() 35function to allow it to be included during planning. 36*/ 37 38// NewCallIterator returns a new iterator for a Call. 39func NewCallIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 40 name := opt.Expr.(*influxql.Call).Name 41 switch name { 42 case "count": 43 return newCountIterator(input, opt) 44 case "min": 45 return newMinIterator(input, opt) 46 case "max": 47 return newMaxIterator(input, opt) 48 case "sum": 49 return newSumIterator(input, opt) 50 case "first": 51 return newFirstIterator(input, opt) 52 case "last": 53 return newLastIterator(input, opt) 54 case "mean": 55 return newMeanIterator(input, opt) 56 default: 57 return nil, fmt.Errorf("unsupported function call: %s", name) 58 } 59} 60 61// newCountIterator returns an iterator for operating on a count() call. 62func newCountIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 63 // FIXME: Wrap iterator in int-type iterator and always output int value. 64 65 switch input := input.(type) { 66 case FloatIterator: 67 createFn := func() (FloatPointAggregator, IntegerPointEmitter) { 68 fn := NewFloatFuncIntegerReducer(FloatCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 69 return fn, fn 70 } 71 return newFloatReduceIntegerIterator(input, opt, createFn), nil 72 case IntegerIterator: 73 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 74 fn := NewIntegerFuncReducer(IntegerCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 75 return fn, fn 76 } 77 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 78 case UnsignedIterator: 79 createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) { 80 fn := NewUnsignedFuncIntegerReducer(UnsignedCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 81 return fn, fn 82 } 83 return newUnsignedReduceIntegerIterator(input, opt, createFn), nil 84 case StringIterator: 85 createFn := func() (StringPointAggregator, IntegerPointEmitter) { 86 fn := NewStringFuncIntegerReducer(StringCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 87 return fn, fn 88 } 89 return newStringReduceIntegerIterator(input, opt, createFn), nil 90 case BooleanIterator: 91 createFn := func() (BooleanPointAggregator, IntegerPointEmitter) { 92 fn := NewBooleanFuncIntegerReducer(BooleanCountReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 93 return fn, fn 94 } 95 return newBooleanReduceIntegerIterator(input, opt, createFn), nil 96 default: 97 return nil, fmt.Errorf("unsupported count iterator type: %T", input) 98 } 99} 100 101// FloatCountReduce returns the count of points. 102func FloatCountReduce(prev *IntegerPoint, curr *FloatPoint) (int64, int64, []interface{}) { 103 if prev == nil { 104 return ZeroTime, 1, nil 105 } 106 return ZeroTime, prev.Value + 1, nil 107} 108 109// IntegerCountReduce returns the count of points. 110func IntegerCountReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 111 if prev == nil { 112 return ZeroTime, 1, nil 113 } 114 return ZeroTime, prev.Value + 1, nil 115} 116 117// UnsignedCountReduce returns the count of points. 118func UnsignedCountReduce(prev *IntegerPoint, curr *UnsignedPoint) (int64, int64, []interface{}) { 119 if prev == nil { 120 return ZeroTime, 1, nil 121 } 122 return ZeroTime, prev.Value + 1, nil 123} 124 125// StringCountReduce returns the count of points. 126func StringCountReduce(prev *IntegerPoint, curr *StringPoint) (int64, int64, []interface{}) { 127 if prev == nil { 128 return ZeroTime, 1, nil 129 } 130 return ZeroTime, prev.Value + 1, nil 131} 132 133// BooleanCountReduce returns the count of points. 134func BooleanCountReduce(prev *IntegerPoint, curr *BooleanPoint) (int64, int64, []interface{}) { 135 if prev == nil { 136 return ZeroTime, 1, nil 137 } 138 return ZeroTime, prev.Value + 1, nil 139} 140 141// newMinIterator returns an iterator for operating on a min() call. 142func newMinIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 143 switch input := input.(type) { 144 case FloatIterator: 145 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 146 fn := NewFloatFuncReducer(FloatMinReduce, nil) 147 return fn, fn 148 } 149 return newFloatReduceFloatIterator(input, opt, createFn), nil 150 case IntegerIterator: 151 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 152 fn := NewIntegerFuncReducer(IntegerMinReduce, nil) 153 return fn, fn 154 } 155 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 156 case UnsignedIterator: 157 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 158 fn := NewUnsignedFuncReducer(UnsignedMinReduce, nil) 159 return fn, fn 160 } 161 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 162 case BooleanIterator: 163 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 164 fn := NewBooleanFuncReducer(BooleanMinReduce, nil) 165 return fn, fn 166 } 167 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 168 default: 169 return nil, fmt.Errorf("unsupported min iterator type: %T", input) 170 } 171} 172 173// FloatMinReduce returns the minimum value between prev & curr. 174func FloatMinReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) { 175 if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 176 return curr.Time, curr.Value, cloneAux(curr.Aux) 177 } 178 return prev.Time, prev.Value, prev.Aux 179} 180 181// IntegerMinReduce returns the minimum value between prev & curr. 182func IntegerMinReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 183 if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 184 return curr.Time, curr.Value, cloneAux(curr.Aux) 185 } 186 return prev.Time, prev.Value, prev.Aux 187} 188 189// UnsignedMinReduce returns the minimum value between prev & curr. 190func UnsignedMinReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { 191 if prev == nil || curr.Value < prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 192 return curr.Time, curr.Value, cloneAux(curr.Aux) 193 } 194 return prev.Time, prev.Value, prev.Aux 195} 196 197// BooleanMinReduce returns the minimum value between prev & curr. 198func BooleanMinReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { 199 if prev == nil || (curr.Value != prev.Value && !curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) { 200 return curr.Time, curr.Value, cloneAux(curr.Aux) 201 } 202 return prev.Time, prev.Value, prev.Aux 203} 204 205// newMaxIterator returns an iterator for operating on a max() call. 206func newMaxIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 207 switch input := input.(type) { 208 case FloatIterator: 209 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 210 fn := NewFloatFuncReducer(FloatMaxReduce, nil) 211 return fn, fn 212 } 213 return newFloatReduceFloatIterator(input, opt, createFn), nil 214 case IntegerIterator: 215 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 216 fn := NewIntegerFuncReducer(IntegerMaxReduce, nil) 217 return fn, fn 218 } 219 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 220 case UnsignedIterator: 221 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 222 fn := NewUnsignedFuncReducer(UnsignedMaxReduce, nil) 223 return fn, fn 224 } 225 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 226 case BooleanIterator: 227 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 228 fn := NewBooleanFuncReducer(BooleanMaxReduce, nil) 229 return fn, fn 230 } 231 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 232 default: 233 return nil, fmt.Errorf("unsupported max iterator type: %T", input) 234 } 235} 236 237// FloatMaxReduce returns the maximum value between prev & curr. 238func FloatMaxReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) { 239 if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 240 return curr.Time, curr.Value, cloneAux(curr.Aux) 241 } 242 return prev.Time, prev.Value, prev.Aux 243} 244 245// IntegerMaxReduce returns the maximum value between prev & curr. 246func IntegerMaxReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 247 if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 248 return curr.Time, curr.Value, cloneAux(curr.Aux) 249 } 250 return prev.Time, prev.Value, prev.Aux 251} 252 253// UnsignedMaxReduce returns the maximum value between prev & curr. 254func UnsignedMaxReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { 255 if prev == nil || curr.Value > prev.Value || (curr.Value == prev.Value && curr.Time < prev.Time) { 256 return curr.Time, curr.Value, cloneAux(curr.Aux) 257 } 258 return prev.Time, prev.Value, prev.Aux 259} 260 261// BooleanMaxReduce returns the minimum value between prev & curr. 262func BooleanMaxReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { 263 if prev == nil || (curr.Value != prev.Value && curr.Value) || (curr.Value == prev.Value && curr.Time < prev.Time) { 264 return curr.Time, curr.Value, cloneAux(curr.Aux) 265 } 266 return prev.Time, prev.Value, prev.Aux 267} 268 269// newSumIterator returns an iterator for operating on a sum() call. 270func newSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 271 switch input := input.(type) { 272 case FloatIterator: 273 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 274 fn := NewFloatFuncReducer(FloatSumReduce, &FloatPoint{Value: 0, Time: ZeroTime}) 275 return fn, fn 276 } 277 return newFloatReduceFloatIterator(input, opt, createFn), nil 278 case IntegerIterator: 279 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 280 fn := NewIntegerFuncReducer(IntegerSumReduce, &IntegerPoint{Value: 0, Time: ZeroTime}) 281 return fn, fn 282 } 283 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 284 case UnsignedIterator: 285 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 286 fn := NewUnsignedFuncReducer(UnsignedSumReduce, &UnsignedPoint{Value: 0, Time: ZeroTime}) 287 return fn, fn 288 } 289 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 290 default: 291 return nil, fmt.Errorf("unsupported sum iterator type: %T", input) 292 } 293} 294 295// FloatSumReduce returns the sum prev value & curr value. 296func FloatSumReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) { 297 if prev == nil { 298 return ZeroTime, curr.Value, nil 299 } 300 return prev.Time, prev.Value + curr.Value, nil 301} 302 303// IntegerSumReduce returns the sum prev value & curr value. 304func IntegerSumReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 305 if prev == nil { 306 return ZeroTime, curr.Value, nil 307 } 308 return prev.Time, prev.Value + curr.Value, nil 309} 310 311// UnsignedSumReduce returns the sum prev value & curr value. 312func UnsignedSumReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { 313 if prev == nil { 314 return ZeroTime, curr.Value, nil 315 } 316 return prev.Time, prev.Value + curr.Value, nil 317} 318 319// newFirstIterator returns an iterator for operating on a first() call. 320func newFirstIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 321 switch input := input.(type) { 322 case FloatIterator: 323 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 324 fn := NewFloatFuncReducer(FloatFirstReduce, nil) 325 return fn, fn 326 } 327 return newFloatReduceFloatIterator(input, opt, createFn), nil 328 case IntegerIterator: 329 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 330 fn := NewIntegerFuncReducer(IntegerFirstReduce, nil) 331 return fn, fn 332 } 333 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 334 case UnsignedIterator: 335 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 336 fn := NewUnsignedFuncReducer(UnsignedFirstReduce, nil) 337 return fn, fn 338 } 339 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 340 case StringIterator: 341 createFn := func() (StringPointAggregator, StringPointEmitter) { 342 fn := NewStringFuncReducer(StringFirstReduce, nil) 343 return fn, fn 344 } 345 return newStringReduceStringIterator(input, opt, createFn), nil 346 case BooleanIterator: 347 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 348 fn := NewBooleanFuncReducer(BooleanFirstReduce, nil) 349 return fn, fn 350 } 351 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 352 default: 353 return nil, fmt.Errorf("unsupported first iterator type: %T", input) 354 } 355} 356 357// FloatFirstReduce returns the first point sorted by time. 358func FloatFirstReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) { 359 if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 360 return curr.Time, curr.Value, cloneAux(curr.Aux) 361 } 362 return prev.Time, prev.Value, prev.Aux 363} 364 365// IntegerFirstReduce returns the first point sorted by time. 366func IntegerFirstReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 367 if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 368 return curr.Time, curr.Value, cloneAux(curr.Aux) 369 } 370 return prev.Time, prev.Value, prev.Aux 371} 372 373// UnsignedFirstReduce returns the first point sorted by time. 374func UnsignedFirstReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { 375 if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 376 return curr.Time, curr.Value, cloneAux(curr.Aux) 377 } 378 return prev.Time, prev.Value, prev.Aux 379} 380 381// StringFirstReduce returns the first point sorted by time. 382func StringFirstReduce(prev, curr *StringPoint) (int64, string, []interface{}) { 383 if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 384 return curr.Time, curr.Value, cloneAux(curr.Aux) 385 } 386 return prev.Time, prev.Value, prev.Aux 387} 388 389// BooleanFirstReduce returns the first point sorted by time. 390func BooleanFirstReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { 391 if prev == nil || curr.Time < prev.Time || (curr.Time == prev.Time && !curr.Value && prev.Value) { 392 return curr.Time, curr.Value, cloneAux(curr.Aux) 393 } 394 return prev.Time, prev.Value, prev.Aux 395} 396 397// newLastIterator returns an iterator for operating on a last() call. 398func newLastIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 399 switch input := input.(type) { 400 case FloatIterator: 401 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 402 fn := NewFloatFuncReducer(FloatLastReduce, nil) 403 return fn, fn 404 } 405 return newFloatReduceFloatIterator(input, opt, createFn), nil 406 case IntegerIterator: 407 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 408 fn := NewIntegerFuncReducer(IntegerLastReduce, nil) 409 return fn, fn 410 } 411 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 412 case UnsignedIterator: 413 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 414 fn := NewUnsignedFuncReducer(UnsignedLastReduce, nil) 415 return fn, fn 416 } 417 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 418 case StringIterator: 419 createFn := func() (StringPointAggregator, StringPointEmitter) { 420 fn := NewStringFuncReducer(StringLastReduce, nil) 421 return fn, fn 422 } 423 return newStringReduceStringIterator(input, opt, createFn), nil 424 case BooleanIterator: 425 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 426 fn := NewBooleanFuncReducer(BooleanLastReduce, nil) 427 return fn, fn 428 } 429 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 430 default: 431 return nil, fmt.Errorf("unsupported last iterator type: %T", input) 432 } 433} 434 435// FloatLastReduce returns the last point sorted by time. 436func FloatLastReduce(prev, curr *FloatPoint) (int64, float64, []interface{}) { 437 if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 438 return curr.Time, curr.Value, cloneAux(curr.Aux) 439 } 440 return prev.Time, prev.Value, prev.Aux 441} 442 443// IntegerLastReduce returns the last point sorted by time. 444func IntegerLastReduce(prev, curr *IntegerPoint) (int64, int64, []interface{}) { 445 if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 446 return curr.Time, curr.Value, cloneAux(curr.Aux) 447 } 448 return prev.Time, prev.Value, prev.Aux 449} 450 451// UnsignedLastReduce returns the last point sorted by time. 452func UnsignedLastReduce(prev, curr *UnsignedPoint) (int64, uint64, []interface{}) { 453 if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 454 return curr.Time, curr.Value, cloneAux(curr.Aux) 455 } 456 return prev.Time, prev.Value, prev.Aux 457} 458 459// StringLastReduce returns the first point sorted by time. 460func StringLastReduce(prev, curr *StringPoint) (int64, string, []interface{}) { 461 if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value > prev.Value) { 462 return curr.Time, curr.Value, cloneAux(curr.Aux) 463 } 464 return prev.Time, prev.Value, prev.Aux 465} 466 467// BooleanLastReduce returns the first point sorted by time. 468func BooleanLastReduce(prev, curr *BooleanPoint) (int64, bool, []interface{}) { 469 if prev == nil || curr.Time > prev.Time || (curr.Time == prev.Time && curr.Value && !prev.Value) { 470 return curr.Time, curr.Value, cloneAux(curr.Aux) 471 } 472 return prev.Time, prev.Value, prev.Aux 473} 474 475// NewDistinctIterator returns an iterator for operating on a distinct() call. 476func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 477 switch input := input.(type) { 478 case FloatIterator: 479 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 480 fn := NewFloatDistinctReducer() 481 return fn, fn 482 } 483 return newFloatReduceFloatIterator(input, opt, createFn), nil 484 case IntegerIterator: 485 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 486 fn := NewIntegerDistinctReducer() 487 return fn, fn 488 } 489 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 490 case UnsignedIterator: 491 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 492 fn := NewUnsignedDistinctReducer() 493 return fn, fn 494 } 495 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 496 case StringIterator: 497 createFn := func() (StringPointAggregator, StringPointEmitter) { 498 fn := NewStringDistinctReducer() 499 return fn, fn 500 } 501 return newStringReduceStringIterator(input, opt, createFn), nil 502 case BooleanIterator: 503 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 504 fn := NewBooleanDistinctReducer() 505 return fn, fn 506 } 507 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 508 default: 509 return nil, fmt.Errorf("unsupported distinct iterator type: %T", input) 510 } 511} 512 513// newMeanIterator returns an iterator for operating on a mean() call. 514func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 515 switch input := input.(type) { 516 case FloatIterator: 517 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 518 fn := NewFloatMeanReducer() 519 return fn, fn 520 } 521 return newFloatReduceFloatIterator(input, opt, createFn), nil 522 case IntegerIterator: 523 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 524 fn := NewIntegerMeanReducer() 525 return fn, fn 526 } 527 return newIntegerReduceFloatIterator(input, opt, createFn), nil 528 case UnsignedIterator: 529 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 530 fn := NewUnsignedMeanReducer() 531 return fn, fn 532 } 533 return newUnsignedReduceFloatIterator(input, opt, createFn), nil 534 default: 535 return nil, fmt.Errorf("unsupported mean iterator type: %T", input) 536 } 537} 538 539// NewMedianIterator returns an iterator for operating on a median() call. 540func NewMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 541 return newMedianIterator(input, opt) 542} 543 544// newMedianIterator returns an iterator for operating on a median() call. 545func newMedianIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 546 switch input := input.(type) { 547 case FloatIterator: 548 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 549 fn := NewFloatSliceFuncReducer(FloatMedianReduceSlice) 550 return fn, fn 551 } 552 return newFloatReduceFloatIterator(input, opt, createFn), nil 553 case IntegerIterator: 554 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 555 fn := NewIntegerSliceFuncFloatReducer(IntegerMedianReduceSlice) 556 return fn, fn 557 } 558 return newIntegerReduceFloatIterator(input, opt, createFn), nil 559 case UnsignedIterator: 560 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 561 fn := NewUnsignedSliceFuncFloatReducer(UnsignedMedianReduceSlice) 562 return fn, fn 563 } 564 return newUnsignedReduceFloatIterator(input, opt, createFn), nil 565 default: 566 return nil, fmt.Errorf("unsupported median iterator type: %T", input) 567 } 568} 569 570// FloatMedianReduceSlice returns the median value within a window. 571func FloatMedianReduceSlice(a []FloatPoint) []FloatPoint { 572 if len(a) == 1 { 573 return a 574 } 575 576 // OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1. 577 578 // Return the middle value from the points. 579 // If there are an even number of points then return the mean of the two middle points. 580 sort.Sort(floatPointsByValue(a)) 581 if len(a)%2 == 0 { 582 lo, hi := a[len(a)/2-1], a[(len(a)/2)] 583 return []FloatPoint{{Time: ZeroTime, Value: lo.Value + (hi.Value-lo.Value)/2}} 584 } 585 return []FloatPoint{{Time: ZeroTime, Value: a[len(a)/2].Value}} 586} 587 588// IntegerMedianReduceSlice returns the median value within a window. 589func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint { 590 if len(a) == 1 { 591 return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}} 592 } 593 594 // OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1. 595 596 // Return the middle value from the points. 597 // If there are an even number of points then return the mean of the two middle points. 598 sort.Sort(integerPointsByValue(a)) 599 if len(a)%2 == 0 { 600 lo, hi := a[len(a)/2-1], a[(len(a)/2)] 601 return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}} 602 } 603 return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}} 604} 605 606// UnsignedMedianReduceSlice returns the median value within a window. 607func UnsignedMedianReduceSlice(a []UnsignedPoint) []FloatPoint { 608 if len(a) == 1 { 609 return []FloatPoint{{Time: ZeroTime, Value: float64(a[0].Value)}} 610 } 611 612 // OPTIMIZE(benbjohnson): Use getSortedRange() from v0.9.5.1. 613 614 // Return the middle value from the points. 615 // If there are an even number of points then return the mean of the two middle points. 616 sort.Sort(unsignedPointsByValue(a)) 617 if len(a)%2 == 0 { 618 lo, hi := a[len(a)/2-1], a[(len(a)/2)] 619 return []FloatPoint{{Time: ZeroTime, Value: float64(lo.Value) + float64(hi.Value-lo.Value)/2}} 620 } 621 return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}} 622} 623 624// newModeIterator returns an iterator for operating on a mode() call. 625func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 626 switch input := input.(type) { 627 case FloatIterator: 628 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 629 fn := NewFloatSliceFuncReducer(FloatModeReduceSlice) 630 return fn, fn 631 } 632 return newFloatReduceFloatIterator(input, opt, createFn), nil 633 case IntegerIterator: 634 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 635 fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice) 636 return fn, fn 637 } 638 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 639 case UnsignedIterator: 640 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 641 fn := NewUnsignedSliceFuncReducer(UnsignedModeReduceSlice) 642 return fn, fn 643 } 644 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 645 case StringIterator: 646 createFn := func() (StringPointAggregator, StringPointEmitter) { 647 fn := NewStringSliceFuncReducer(StringModeReduceSlice) 648 return fn, fn 649 } 650 return newStringReduceStringIterator(input, opt, createFn), nil 651 case BooleanIterator: 652 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 653 fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice) 654 return fn, fn 655 } 656 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 657 default: 658 return nil, fmt.Errorf("unsupported median iterator type: %T", input) 659 } 660} 661 662// FloatModeReduceSlice returns the mode value within a window. 663func FloatModeReduceSlice(a []FloatPoint) []FloatPoint { 664 if len(a) == 1 { 665 return a 666 } 667 668 sort.Sort(floatPointsByValue(a)) 669 670 mostFreq := 0 671 currFreq := 0 672 currMode := a[0].Value 673 mostMode := a[0].Value 674 mostTime := a[0].Time 675 currTime := a[0].Time 676 677 for _, p := range a { 678 if p.Value != currMode { 679 currFreq = 1 680 currMode = p.Value 681 currTime = p.Time 682 continue 683 } 684 currFreq++ 685 if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { 686 continue 687 } 688 mostFreq = currFreq 689 mostMode = p.Value 690 mostTime = p.Time 691 } 692 693 return []FloatPoint{{Time: ZeroTime, Value: mostMode}} 694} 695 696// IntegerModeReduceSlice returns the mode value within a window. 697func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint { 698 if len(a) == 1 { 699 return a 700 } 701 sort.Sort(integerPointsByValue(a)) 702 703 mostFreq := 0 704 currFreq := 0 705 currMode := a[0].Value 706 mostMode := a[0].Value 707 mostTime := a[0].Time 708 currTime := a[0].Time 709 710 for _, p := range a { 711 if p.Value != currMode { 712 currFreq = 1 713 currMode = p.Value 714 currTime = p.Time 715 continue 716 } 717 currFreq++ 718 if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { 719 continue 720 } 721 mostFreq = currFreq 722 mostMode = p.Value 723 mostTime = p.Time 724 } 725 726 return []IntegerPoint{{Time: ZeroTime, Value: mostMode}} 727} 728 729// UnsignedModeReduceSlice returns the mode value within a window. 730func UnsignedModeReduceSlice(a []UnsignedPoint) []UnsignedPoint { 731 if len(a) == 1 { 732 return a 733 } 734 sort.Sort(unsignedPointsByValue(a)) 735 736 mostFreq := 0 737 currFreq := 0 738 currMode := a[0].Value 739 mostMode := a[0].Value 740 mostTime := a[0].Time 741 currTime := a[0].Time 742 743 for _, p := range a { 744 if p.Value != currMode { 745 currFreq = 1 746 currMode = p.Value 747 currTime = p.Time 748 continue 749 } 750 currFreq++ 751 if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { 752 continue 753 } 754 mostFreq = currFreq 755 mostMode = p.Value 756 mostTime = p.Time 757 } 758 759 return []UnsignedPoint{{Time: ZeroTime, Value: mostMode}} 760} 761 762// StringModeReduceSlice returns the mode value within a window. 763func StringModeReduceSlice(a []StringPoint) []StringPoint { 764 if len(a) == 1 { 765 return a 766 } 767 768 sort.Sort(stringPointsByValue(a)) 769 770 mostFreq := 0 771 currFreq := 0 772 currMode := a[0].Value 773 mostMode := a[0].Value 774 mostTime := a[0].Time 775 currTime := a[0].Time 776 777 for _, p := range a { 778 if p.Value != currMode { 779 currFreq = 1 780 currMode = p.Value 781 currTime = p.Time 782 continue 783 } 784 currFreq++ 785 if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { 786 continue 787 } 788 mostFreq = currFreq 789 mostMode = p.Value 790 mostTime = p.Time 791 } 792 793 return []StringPoint{{Time: ZeroTime, Value: mostMode}} 794} 795 796// BooleanModeReduceSlice returns the mode value within a window. 797func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint { 798 if len(a) == 1 { 799 return a 800 } 801 802 trueFreq := 0 803 falsFreq := 0 804 mostMode := false 805 806 for _, p := range a { 807 if p.Value { 808 trueFreq++ 809 } else { 810 falsFreq++ 811 } 812 } 813 // In case either of true or false are mode then retuned mode value wont be 814 // of metric with oldest timestamp 815 if trueFreq >= falsFreq { 816 mostMode = true 817 } 818 819 return []BooleanPoint{{Time: ZeroTime, Value: mostMode}} 820} 821 822// newStddevIterator returns an iterator for operating on a stddev() call. 823func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 824 switch input := input.(type) { 825 case FloatIterator: 826 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 827 fn := NewFloatSliceFuncReducer(FloatStddevReduceSlice) 828 return fn, fn 829 } 830 return newFloatReduceFloatIterator(input, opt, createFn), nil 831 case IntegerIterator: 832 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 833 fn := NewIntegerSliceFuncFloatReducer(IntegerStddevReduceSlice) 834 return fn, fn 835 } 836 return newIntegerReduceFloatIterator(input, opt, createFn), nil 837 case UnsignedIterator: 838 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 839 fn := NewUnsignedSliceFuncFloatReducer(UnsignedStddevReduceSlice) 840 return fn, fn 841 } 842 return newUnsignedReduceFloatIterator(input, opt, createFn), nil 843 default: 844 return nil, fmt.Errorf("unsupported stddev iterator type: %T", input) 845 } 846} 847 848// FloatStddevReduceSlice returns the stddev value within a window. 849func FloatStddevReduceSlice(a []FloatPoint) []FloatPoint { 850 // If there is only one point then return NaN. 851 if len(a) < 2 { 852 return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}} 853 } 854 855 // Calculate the mean. 856 var mean float64 857 var count int 858 for _, p := range a { 859 if math.IsNaN(p.Value) { 860 continue 861 } 862 count++ 863 mean += (p.Value - mean) / float64(count) 864 } 865 866 // Calculate the variance. 867 var variance float64 868 for _, p := range a { 869 if math.IsNaN(p.Value) { 870 continue 871 } 872 variance += math.Pow(p.Value-mean, 2) 873 } 874 return []FloatPoint{{ 875 Time: ZeroTime, 876 Value: math.Sqrt(variance / float64(count-1)), 877 }} 878} 879 880// IntegerStddevReduceSlice returns the stddev value within a window. 881func IntegerStddevReduceSlice(a []IntegerPoint) []FloatPoint { 882 // If there is only one point then return NaN. 883 if len(a) < 2 { 884 return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}} 885 } 886 887 // Calculate the mean. 888 var mean float64 889 var count int 890 for _, p := range a { 891 count++ 892 mean += (float64(p.Value) - mean) / float64(count) 893 } 894 895 // Calculate the variance. 896 var variance float64 897 for _, p := range a { 898 variance += math.Pow(float64(p.Value)-mean, 2) 899 } 900 return []FloatPoint{{ 901 Time: ZeroTime, 902 Value: math.Sqrt(variance / float64(count-1)), 903 }} 904} 905 906// UnsignedStddevReduceSlice returns the stddev value within a window. 907func UnsignedStddevReduceSlice(a []UnsignedPoint) []FloatPoint { 908 // If there is only one point then return NaN. 909 if len(a) < 2 { 910 return []FloatPoint{{Time: ZeroTime, Value: math.NaN()}} 911 } 912 913 // Calculate the mean. 914 var mean float64 915 var count int 916 for _, p := range a { 917 count++ 918 mean += (float64(p.Value) - mean) / float64(count) 919 } 920 921 // Calculate the variance. 922 var variance float64 923 for _, p := range a { 924 variance += math.Pow(float64(p.Value)-mean, 2) 925 } 926 return []FloatPoint{{ 927 Time: ZeroTime, 928 Value: math.Sqrt(variance / float64(count-1)), 929 }} 930} 931 932// newSpreadIterator returns an iterator for operating on a spread() call. 933func newSpreadIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 934 switch input := input.(type) { 935 case FloatIterator: 936 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 937 fn := NewFloatSpreadReducer() 938 return fn, fn 939 } 940 return newFloatReduceFloatIterator(input, opt, createFn), nil 941 case IntegerIterator: 942 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 943 fn := NewIntegerSpreadReducer() 944 return fn, fn 945 } 946 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 947 case UnsignedIterator: 948 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 949 fn := NewUnsignedSpreadReducer() 950 return fn, fn 951 } 952 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 953 default: 954 return nil, fmt.Errorf("unsupported spread iterator type: %T", input) 955 } 956} 957 958func newTopIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) { 959 switch input := input.(type) { 960 case FloatIterator: 961 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 962 fn := NewFloatTopReducer(n) 963 return fn, fn 964 } 965 itr := newFloatReduceFloatIterator(input, opt, createFn) 966 itr.keepTags = keepTags 967 return itr, nil 968 case IntegerIterator: 969 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 970 fn := NewIntegerTopReducer(n) 971 return fn, fn 972 } 973 itr := newIntegerReduceIntegerIterator(input, opt, createFn) 974 itr.keepTags = keepTags 975 return itr, nil 976 case UnsignedIterator: 977 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 978 fn := NewUnsignedTopReducer(n) 979 return fn, fn 980 } 981 itr := newUnsignedReduceUnsignedIterator(input, opt, createFn) 982 itr.keepTags = keepTags 983 return itr, nil 984 default: 985 return nil, fmt.Errorf("unsupported top iterator type: %T", input) 986 } 987} 988 989func newBottomIterator(input Iterator, opt IteratorOptions, n int, keepTags bool) (Iterator, error) { 990 switch input := input.(type) { 991 case FloatIterator: 992 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 993 fn := NewFloatBottomReducer(n) 994 return fn, fn 995 } 996 itr := newFloatReduceFloatIterator(input, opt, createFn) 997 itr.keepTags = keepTags 998 return itr, nil 999 case IntegerIterator: 1000 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1001 fn := NewIntegerBottomReducer(n) 1002 return fn, fn 1003 } 1004 itr := newIntegerReduceIntegerIterator(input, opt, createFn) 1005 itr.keepTags = keepTags 1006 return itr, nil 1007 case UnsignedIterator: 1008 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 1009 fn := NewUnsignedBottomReducer(n) 1010 return fn, fn 1011 } 1012 itr := newUnsignedReduceUnsignedIterator(input, opt, createFn) 1013 itr.keepTags = keepTags 1014 return itr, nil 1015 default: 1016 return nil, fmt.Errorf("unsupported bottom iterator type: %T", input) 1017 } 1018} 1019 1020// newPercentileIterator returns an iterator for operating on a percentile() call. 1021func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) { 1022 switch input := input.(type) { 1023 case FloatIterator: 1024 floatPercentileReduceSlice := NewFloatPercentileReduceSliceFunc(percentile) 1025 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1026 fn := NewFloatSliceFuncReducer(floatPercentileReduceSlice) 1027 return fn, fn 1028 } 1029 return newFloatReduceFloatIterator(input, opt, createFn), nil 1030 case IntegerIterator: 1031 integerPercentileReduceSlice := NewIntegerPercentileReduceSliceFunc(percentile) 1032 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1033 fn := NewIntegerSliceFuncReducer(integerPercentileReduceSlice) 1034 return fn, fn 1035 } 1036 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 1037 case UnsignedIterator: 1038 unsignedPercentileReduceSlice := NewUnsignedPercentileReduceSliceFunc(percentile) 1039 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 1040 fn := NewUnsignedSliceFuncReducer(unsignedPercentileReduceSlice) 1041 return fn, fn 1042 } 1043 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 1044 default: 1045 return nil, fmt.Errorf("unsupported percentile iterator type: %T", input) 1046 } 1047} 1048 1049// NewFloatPercentileReduceSliceFunc returns the percentile value within a window. 1050func NewFloatPercentileReduceSliceFunc(percentile float64) FloatReduceSliceFunc { 1051 return func(a []FloatPoint) []FloatPoint { 1052 length := len(a) 1053 i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 1054 1055 if i < 0 || i >= length { 1056 return nil 1057 } 1058 1059 sort.Sort(floatPointsByValue(a)) 1060 return []FloatPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}} 1061 } 1062} 1063 1064// NewIntegerPercentileReduceSliceFunc returns the percentile value within a window. 1065func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceFunc { 1066 return func(a []IntegerPoint) []IntegerPoint { 1067 length := len(a) 1068 i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 1069 1070 if i < 0 || i >= length { 1071 return nil 1072 } 1073 1074 sort.Sort(integerPointsByValue(a)) 1075 return []IntegerPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}} 1076 } 1077} 1078 1079// NewUnsignedPercentileReduceSliceFunc returns the percentile value within a window. 1080func NewUnsignedPercentileReduceSliceFunc(percentile float64) UnsignedReduceSliceFunc { 1081 return func(a []UnsignedPoint) []UnsignedPoint { 1082 length := len(a) 1083 i := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 1084 1085 if i < 0 || i >= length { 1086 return nil 1087 } 1088 1089 sort.Sort(unsignedPointsByValue(a)) 1090 return []UnsignedPoint{{Time: a[i].Time, Value: a[i].Value, Aux: cloneAux(a[i].Aux)}} 1091 } 1092} 1093 1094// newDerivativeIterator returns an iterator for operating on a derivative() call. 1095func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) { 1096 switch input := input.(type) { 1097 case FloatIterator: 1098 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1099 fn := NewFloatDerivativeReducer(interval, isNonNegative, opt.Ascending) 1100 return fn, fn 1101 } 1102 return newFloatStreamFloatIterator(input, createFn, opt), nil 1103 case IntegerIterator: 1104 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1105 fn := NewIntegerDerivativeReducer(interval, isNonNegative, opt.Ascending) 1106 return fn, fn 1107 } 1108 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1109 case UnsignedIterator: 1110 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1111 fn := NewUnsignedDerivativeReducer(interval, isNonNegative, opt.Ascending) 1112 return fn, fn 1113 } 1114 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1115 default: 1116 return nil, fmt.Errorf("unsupported derivative iterator type: %T", input) 1117 } 1118} 1119 1120// newDifferenceIterator returns an iterator for operating on a difference() call. 1121func newDifferenceIterator(input Iterator, opt IteratorOptions, isNonNegative bool) (Iterator, error) { 1122 switch input := input.(type) { 1123 case FloatIterator: 1124 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1125 fn := NewFloatDifferenceReducer(isNonNegative) 1126 return fn, fn 1127 } 1128 return newFloatStreamFloatIterator(input, createFn, opt), nil 1129 case IntegerIterator: 1130 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1131 fn := NewIntegerDifferenceReducer(isNonNegative) 1132 return fn, fn 1133 } 1134 return newIntegerStreamIntegerIterator(input, createFn, opt), nil 1135 case UnsignedIterator: 1136 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 1137 fn := NewUnsignedDifferenceReducer(isNonNegative) 1138 return fn, fn 1139 } 1140 return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil 1141 default: 1142 return nil, fmt.Errorf("unsupported difference iterator type: %T", input) 1143 } 1144} 1145 1146// newElapsedIterator returns an iterator for operating on a elapsed() call. 1147func newElapsedIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) { 1148 switch input := input.(type) { 1149 case FloatIterator: 1150 createFn := func() (FloatPointAggregator, IntegerPointEmitter) { 1151 fn := NewFloatElapsedReducer(interval) 1152 return fn, fn 1153 } 1154 return newFloatStreamIntegerIterator(input, createFn, opt), nil 1155 case IntegerIterator: 1156 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1157 fn := NewIntegerElapsedReducer(interval) 1158 return fn, fn 1159 } 1160 return newIntegerStreamIntegerIterator(input, createFn, opt), nil 1161 case UnsignedIterator: 1162 createFn := func() (UnsignedPointAggregator, IntegerPointEmitter) { 1163 fn := NewUnsignedElapsedReducer(interval) 1164 return fn, fn 1165 } 1166 return newUnsignedStreamIntegerIterator(input, createFn, opt), nil 1167 case BooleanIterator: 1168 createFn := func() (BooleanPointAggregator, IntegerPointEmitter) { 1169 fn := NewBooleanElapsedReducer(interval) 1170 return fn, fn 1171 } 1172 return newBooleanStreamIntegerIterator(input, createFn, opt), nil 1173 case StringIterator: 1174 createFn := func() (StringPointAggregator, IntegerPointEmitter) { 1175 fn := NewStringElapsedReducer(interval) 1176 return fn, fn 1177 } 1178 return newStringStreamIntegerIterator(input, createFn, opt), nil 1179 default: 1180 return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) 1181 } 1182} 1183 1184// newMovingAverageIterator returns an iterator for operating on a moving_average() call. 1185func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) { 1186 switch input := input.(type) { 1187 case FloatIterator: 1188 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1189 fn := NewFloatMovingAverageReducer(n) 1190 return fn, fn 1191 } 1192 return newFloatStreamFloatIterator(input, createFn, opt), nil 1193 case IntegerIterator: 1194 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1195 fn := NewIntegerMovingAverageReducer(n) 1196 return fn, fn 1197 } 1198 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1199 case UnsignedIterator: 1200 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1201 fn := NewUnsignedMovingAverageReducer(n) 1202 return fn, fn 1203 } 1204 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1205 default: 1206 return nil, fmt.Errorf("unsupported moving average iterator type: %T", input) 1207 } 1208} 1209 1210// newExponentialMovingAverageIterator returns an iterator for operating on an exponential_moving_average() call. 1211func newExponentialMovingAverageIterator(input Iterator, n, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1212 switch input := input.(type) { 1213 case FloatIterator: 1214 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1215 fn := NewExponentialMovingAverageReducer(n, nHold, warmupType) 1216 return fn, fn 1217 } 1218 return newFloatStreamFloatIterator(input, createFn, opt), nil 1219 case IntegerIterator: 1220 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1221 fn := NewExponentialMovingAverageReducer(n, nHold, warmupType) 1222 return fn, fn 1223 } 1224 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1225 case UnsignedIterator: 1226 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1227 fn := NewExponentialMovingAverageReducer(n, nHold, warmupType) 1228 return fn, fn 1229 } 1230 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1231 default: 1232 return nil, fmt.Errorf("unsupported exponential moving average iterator type: %T", input) 1233 } 1234} 1235 1236// newDoubleExponentialMovingAverageIterator returns an iterator for operating on a double_exponential_moving_average() call. 1237func newDoubleExponentialMovingAverageIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1238 switch input := input.(type) { 1239 case FloatIterator: 1240 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1241 fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType) 1242 return fn, fn 1243 } 1244 return newFloatStreamFloatIterator(input, createFn, opt), nil 1245 case IntegerIterator: 1246 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1247 fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType) 1248 return fn, fn 1249 } 1250 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1251 case UnsignedIterator: 1252 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1253 fn := NewDoubleExponentialMovingAverageReducer(n, nHold, warmupType) 1254 return fn, fn 1255 } 1256 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1257 default: 1258 return nil, fmt.Errorf("unsupported double exponential moving average iterator type: %T", input) 1259 } 1260} 1261 1262// newTripleExponentialMovingAverageIterator returns an iterator for operating on a triple_exponential_moving_average() call. 1263func newTripleExponentialMovingAverageIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1264 switch input := input.(type) { 1265 case FloatIterator: 1266 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1267 fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType) 1268 return fn, fn 1269 } 1270 return newFloatStreamFloatIterator(input, createFn, opt), nil 1271 case IntegerIterator: 1272 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1273 fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType) 1274 return fn, fn 1275 } 1276 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1277 case UnsignedIterator: 1278 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1279 fn := NewTripleExponentialMovingAverageReducer(n, nHold, warmupType) 1280 return fn, fn 1281 } 1282 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1283 default: 1284 return nil, fmt.Errorf("unsupported triple exponential moving average iterator type: %T", input) 1285 } 1286} 1287 1288// newRelativeStrengthIndexIterator returns an iterator for operating on a triple_exponential_moving_average() call. 1289func newRelativeStrengthIndexIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1290 switch input := input.(type) { 1291 case FloatIterator: 1292 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1293 fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType) 1294 return fn, fn 1295 } 1296 return newFloatStreamFloatIterator(input, createFn, opt), nil 1297 case IntegerIterator: 1298 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1299 fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType) 1300 return fn, fn 1301 } 1302 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1303 case UnsignedIterator: 1304 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1305 fn := NewRelativeStrengthIndexReducer(n, nHold, warmupType) 1306 return fn, fn 1307 } 1308 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1309 default: 1310 return nil, fmt.Errorf("unsupported relative strength index iterator type: %T", input) 1311 } 1312} 1313 1314// newTripleExponentialDerivativeIterator returns an iterator for operating on a triple_exponential_moving_average() call. 1315func newTripleExponentialDerivativeIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1316 switch input := input.(type) { 1317 case FloatIterator: 1318 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1319 fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType) 1320 return fn, fn 1321 } 1322 return newFloatStreamFloatIterator(input, createFn, opt), nil 1323 case IntegerIterator: 1324 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1325 fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType) 1326 return fn, fn 1327 } 1328 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1329 case UnsignedIterator: 1330 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1331 fn := NewTripleExponentialDerivativeReducer(n, nHold, warmupType) 1332 return fn, fn 1333 } 1334 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1335 default: 1336 return nil, fmt.Errorf("unsupported triple exponential derivative iterator type: %T", input) 1337 } 1338} 1339 1340// newKaufmansEfficiencyRatioIterator returns an iterator for operating on a kaufmans_efficiency_ratio() call. 1341func newKaufmansEfficiencyRatioIterator(input Iterator, n int, nHold int, opt IteratorOptions) (Iterator, error) { 1342 switch input := input.(type) { 1343 case FloatIterator: 1344 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1345 fn := NewKaufmansEfficiencyRatioReducer(n, nHold) 1346 return fn, fn 1347 } 1348 return newFloatStreamFloatIterator(input, createFn, opt), nil 1349 case IntegerIterator: 1350 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1351 fn := NewKaufmansEfficiencyRatioReducer(n, nHold) 1352 return fn, fn 1353 } 1354 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1355 case UnsignedIterator: 1356 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1357 fn := NewKaufmansEfficiencyRatioReducer(n, nHold) 1358 return fn, fn 1359 } 1360 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1361 default: 1362 return nil, fmt.Errorf("unsupported kaufmans efficiency ratio iterator type: %T", input) 1363 } 1364} 1365 1366// newKaufmansAdaptiveMovingAverageIterator returns an iterator for operating on a kaufmans_adaptive_moving_average() call. 1367func newKaufmansAdaptiveMovingAverageIterator(input Iterator, n int, nHold int, opt IteratorOptions) (Iterator, error) { 1368 switch input := input.(type) { 1369 case FloatIterator: 1370 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1371 fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold) 1372 return fn, fn 1373 } 1374 return newFloatStreamFloatIterator(input, createFn, opt), nil 1375 case IntegerIterator: 1376 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1377 fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold) 1378 return fn, fn 1379 } 1380 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1381 case UnsignedIterator: 1382 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1383 fn := NewKaufmansAdaptiveMovingAverageReducer(n, nHold) 1384 return fn, fn 1385 } 1386 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1387 default: 1388 return nil, fmt.Errorf("unsupported kaufmans adaptive moving average iterator type: %T", input) 1389 } 1390} 1391 1392// newChandeMomentumOscillatorIterator returns an iterator for operating on a triple_exponential_moving_average() call. 1393func newChandeMomentumOscillatorIterator(input Iterator, n int, nHold int, warmupType gota.WarmupType, opt IteratorOptions) (Iterator, error) { 1394 switch input := input.(type) { 1395 case FloatIterator: 1396 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1397 fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType) 1398 return fn, fn 1399 } 1400 return newFloatStreamFloatIterator(input, createFn, opt), nil 1401 case IntegerIterator: 1402 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1403 fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType) 1404 return fn, fn 1405 } 1406 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1407 case UnsignedIterator: 1408 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1409 fn := NewChandeMomentumOscillatorReducer(n, nHold, warmupType) 1410 return fn, fn 1411 } 1412 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1413 default: 1414 return nil, fmt.Errorf("unsupported chande momentum oscillator iterator type: %T", input) 1415 } 1416} 1417 1418// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call. 1419func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) { 1420 switch input := input.(type) { 1421 case FloatIterator: 1422 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1423 fn := NewFloatCumulativeSumReducer() 1424 return fn, fn 1425 } 1426 return newFloatStreamFloatIterator(input, createFn, opt), nil 1427 case IntegerIterator: 1428 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1429 fn := NewIntegerCumulativeSumReducer() 1430 return fn, fn 1431 } 1432 return newIntegerStreamIntegerIterator(input, createFn, opt), nil 1433 case UnsignedIterator: 1434 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 1435 fn := NewUnsignedCumulativeSumReducer() 1436 return fn, fn 1437 } 1438 return newUnsignedStreamUnsignedIterator(input, createFn, opt), nil 1439 default: 1440 return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input) 1441 } 1442} 1443 1444// newHoltWintersIterator returns an iterator for operating on a holt_winters() call. 1445func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) { 1446 switch input := input.(type) { 1447 case FloatIterator: 1448 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1449 fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval) 1450 return fn, fn 1451 } 1452 return newFloatReduceFloatIterator(input, opt, createFn), nil 1453 case IntegerIterator: 1454 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1455 fn := NewFloatHoltWintersReducer(h, m, includeFitData, interval) 1456 return fn, fn 1457 } 1458 return newIntegerReduceFloatIterator(input, opt, createFn), nil 1459 default: 1460 return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) 1461 } 1462} 1463 1464// NewSampleIterator returns an iterator for operating on a sample() call (exported for use in test). 1465func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) { 1466 return newSampleIterator(input, opt, size) 1467} 1468 1469// newSampleIterator returns an iterator for operating on a sample() call. 1470func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) { 1471 switch input := input.(type) { 1472 case FloatIterator: 1473 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1474 fn := NewFloatSampleReducer(size) 1475 return fn, fn 1476 } 1477 return newFloatReduceFloatIterator(input, opt, createFn), nil 1478 case IntegerIterator: 1479 createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { 1480 fn := NewIntegerSampleReducer(size) 1481 return fn, fn 1482 } 1483 return newIntegerReduceIntegerIterator(input, opt, createFn), nil 1484 case UnsignedIterator: 1485 createFn := func() (UnsignedPointAggregator, UnsignedPointEmitter) { 1486 fn := NewUnsignedSampleReducer(size) 1487 return fn, fn 1488 } 1489 return newUnsignedReduceUnsignedIterator(input, opt, createFn), nil 1490 case StringIterator: 1491 createFn := func() (StringPointAggregator, StringPointEmitter) { 1492 fn := NewStringSampleReducer(size) 1493 return fn, fn 1494 } 1495 return newStringReduceStringIterator(input, opt, createFn), nil 1496 case BooleanIterator: 1497 createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { 1498 fn := NewBooleanSampleReducer(size) 1499 return fn, fn 1500 } 1501 return newBooleanReduceBooleanIterator(input, opt, createFn), nil 1502 default: 1503 return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input) 1504 } 1505} 1506 1507// newIntegralIterator returns an iterator for operating on a integral() call. 1508func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) { 1509 switch input := input.(type) { 1510 case FloatIterator: 1511 createFn := func() (FloatPointAggregator, FloatPointEmitter) { 1512 fn := NewFloatIntegralReducer(interval, opt) 1513 return fn, fn 1514 } 1515 return newFloatStreamFloatIterator(input, createFn, opt), nil 1516 case IntegerIterator: 1517 createFn := func() (IntegerPointAggregator, FloatPointEmitter) { 1518 fn := NewIntegerIntegralReducer(interval, opt) 1519 return fn, fn 1520 } 1521 return newIntegerStreamFloatIterator(input, createFn, opt), nil 1522 case UnsignedIterator: 1523 createFn := func() (UnsignedPointAggregator, FloatPointEmitter) { 1524 fn := NewUnsignedIntegralReducer(interval, opt) 1525 return fn, fn 1526 } 1527 return newUnsignedStreamFloatIterator(input, createFn, opt), nil 1528 default: 1529 return nil, fmt.Errorf("unsupported integral iterator type: %T", input) 1530 } 1531} 1532