1// Generated by tmpl 2// https://github.com/benbjohnson/tmpl 3// 4// DO NOT EDIT! 5// Source: functions.gen.go.tmpl 6 7package query 8 9import ( 10 "math/rand" 11 "sort" 12 "time" 13) 14 15// FloatPointAggregator aggregates points to produce a single point. 16type FloatPointAggregator interface { 17 AggregateFloat(p *FloatPoint) 18} 19 20// FloatBulkPointAggregator aggregates multiple points at a time. 21type FloatBulkPointAggregator interface { 22 AggregateFloatBulk(points []FloatPoint) 23} 24 25// AggregateFloatPoints feeds a slice of FloatPoint into an 26// aggregator. If the aggregator is a FloatBulkPointAggregator, it will 27// use the AggregateBulk method. 28func AggregateFloatPoints(a FloatPointAggregator, points []FloatPoint) { 29 switch a := a.(type) { 30 case FloatBulkPointAggregator: 31 a.AggregateFloatBulk(points) 32 default: 33 for _, p := range points { 34 a.AggregateFloat(&p) 35 } 36 } 37} 38 39// FloatPointEmitter produces a single point from an aggregate. 40type FloatPointEmitter interface { 41 Emit() []FloatPoint 42} 43 44// FloatReduceFunc is the function called by a FloatPoint reducer. 45type FloatReduceFunc func(prev *FloatPoint, curr *FloatPoint) (t int64, v float64, aux []interface{}) 46 47// FloatFuncReducer is a reducer that reduces 48// the passed in points to a single point using a reduce function. 49type FloatFuncReducer struct { 50 prev *FloatPoint 51 fn FloatReduceFunc 52} 53 54// NewFloatFuncReducer creates a new FloatFuncFloatReducer. 55func NewFloatFuncReducer(fn FloatReduceFunc, prev *FloatPoint) *FloatFuncReducer { 56 return &FloatFuncReducer{fn: fn, prev: prev} 57} 58 59// AggregateFloat takes a FloatPoint and invokes the reduce function with the 60// current and new point to modify the current point. 61func (r *FloatFuncReducer) AggregateFloat(p *FloatPoint) { 62 t, v, aux := r.fn(r.prev, p) 63 if r.prev == nil { 64 r.prev = &FloatPoint{} 65 } 66 r.prev.Time = t 67 r.prev.Value = v 68 r.prev.Aux = aux 69 if p.Aggregated > 1 { 70 r.prev.Aggregated += p.Aggregated 71 } else { 72 r.prev.Aggregated++ 73 } 74} 75 76// Emit emits the point that was generated when reducing the points fed in with AggregateFloat. 77func (r *FloatFuncReducer) Emit() []FloatPoint { 78 return []FloatPoint{*r.prev} 79} 80 81// FloatReduceSliceFunc is the function called by a FloatPoint reducer. 82type FloatReduceSliceFunc func(a []FloatPoint) []FloatPoint 83 84// FloatSliceFuncReducer is a reducer that aggregates 85// the passed in points and then invokes the function to reduce the points when they are emitted. 86type FloatSliceFuncReducer struct { 87 points []FloatPoint 88 fn FloatReduceSliceFunc 89} 90 91// NewFloatSliceFuncReducer creates a new FloatSliceFuncReducer. 92func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer { 93 return &FloatSliceFuncReducer{fn: fn} 94} 95 96// AggregateFloat copies the FloatPoint into the internal slice to be passed 97// to the reduce function when Emit is called. 98func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) { 99 r.points = append(r.points, *p.Clone()) 100} 101 102// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. 103// This is a more efficient version of calling AggregateFloat on each point. 104func (r *FloatSliceFuncReducer) AggregateFloatBulk(points []FloatPoint) { 105 r.points = append(r.points, points...) 106} 107 108// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 109// This method does not clear the points from the internal slice. 110func (r *FloatSliceFuncReducer) Emit() []FloatPoint { 111 return r.fn(r.points) 112} 113 114// FloatReduceIntegerFunc is the function called by a FloatPoint reducer. 115type FloatReduceIntegerFunc func(prev *IntegerPoint, curr *FloatPoint) (t int64, v int64, aux []interface{}) 116 117// FloatFuncIntegerReducer is a reducer that reduces 118// the passed in points to a single point using a reduce function. 119type FloatFuncIntegerReducer struct { 120 prev *IntegerPoint 121 fn FloatReduceIntegerFunc 122} 123 124// NewFloatFuncIntegerReducer creates a new FloatFuncIntegerReducer. 125func NewFloatFuncIntegerReducer(fn FloatReduceIntegerFunc, prev *IntegerPoint) *FloatFuncIntegerReducer { 126 return &FloatFuncIntegerReducer{fn: fn, prev: prev} 127} 128 129// AggregateFloat takes a FloatPoint and invokes the reduce function with the 130// current and new point to modify the current point. 131func (r *FloatFuncIntegerReducer) AggregateFloat(p *FloatPoint) { 132 t, v, aux := r.fn(r.prev, p) 133 if r.prev == nil { 134 r.prev = &IntegerPoint{} 135 } 136 r.prev.Time = t 137 r.prev.Value = v 138 r.prev.Aux = aux 139 if p.Aggregated > 1 { 140 r.prev.Aggregated += p.Aggregated 141 } else { 142 r.prev.Aggregated++ 143 } 144} 145 146// Emit emits the point that was generated when reducing the points fed in with AggregateFloat. 147func (r *FloatFuncIntegerReducer) Emit() []IntegerPoint { 148 return []IntegerPoint{*r.prev} 149} 150 151// FloatReduceIntegerSliceFunc is the function called by a FloatPoint reducer. 152type FloatReduceIntegerSliceFunc func(a []FloatPoint) []IntegerPoint 153 154// FloatSliceFuncIntegerReducer is a reducer that aggregates 155// the passed in points and then invokes the function to reduce the points when they are emitted. 156type FloatSliceFuncIntegerReducer struct { 157 points []FloatPoint 158 fn FloatReduceIntegerSliceFunc 159} 160 161// NewFloatSliceFuncIntegerReducer creates a new FloatSliceFuncIntegerReducer. 162func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSliceFuncIntegerReducer { 163 return &FloatSliceFuncIntegerReducer{fn: fn} 164} 165 166// AggregateFloat copies the FloatPoint into the internal slice to be passed 167// to the reduce function when Emit is called. 168func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) { 169 r.points = append(r.points, *p.Clone()) 170} 171 172// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. 173// This is a more efficient version of calling AggregateFloat on each point. 174func (r *FloatSliceFuncIntegerReducer) AggregateFloatBulk(points []FloatPoint) { 175 r.points = append(r.points, points...) 176} 177 178// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 179// This method does not clear the points from the internal slice. 180func (r *FloatSliceFuncIntegerReducer) Emit() []IntegerPoint { 181 return r.fn(r.points) 182} 183 184// FloatReduceUnsignedFunc is the function called by a FloatPoint reducer. 185type FloatReduceUnsignedFunc func(prev *UnsignedPoint, curr *FloatPoint) (t int64, v uint64, aux []interface{}) 186 187// FloatFuncUnsignedReducer is a reducer that reduces 188// the passed in points to a single point using a reduce function. 189type FloatFuncUnsignedReducer struct { 190 prev *UnsignedPoint 191 fn FloatReduceUnsignedFunc 192} 193 194// NewFloatFuncUnsignedReducer creates a new FloatFuncUnsignedReducer. 195func NewFloatFuncUnsignedReducer(fn FloatReduceUnsignedFunc, prev *UnsignedPoint) *FloatFuncUnsignedReducer { 196 return &FloatFuncUnsignedReducer{fn: fn, prev: prev} 197} 198 199// AggregateFloat takes a FloatPoint and invokes the reduce function with the 200// current and new point to modify the current point. 201func (r *FloatFuncUnsignedReducer) AggregateFloat(p *FloatPoint) { 202 t, v, aux := r.fn(r.prev, p) 203 if r.prev == nil { 204 r.prev = &UnsignedPoint{} 205 } 206 r.prev.Time = t 207 r.prev.Value = v 208 r.prev.Aux = aux 209 if p.Aggregated > 1 { 210 r.prev.Aggregated += p.Aggregated 211 } else { 212 r.prev.Aggregated++ 213 } 214} 215 216// Emit emits the point that was generated when reducing the points fed in with AggregateFloat. 217func (r *FloatFuncUnsignedReducer) Emit() []UnsignedPoint { 218 return []UnsignedPoint{*r.prev} 219} 220 221// FloatReduceUnsignedSliceFunc is the function called by a FloatPoint reducer. 222type FloatReduceUnsignedSliceFunc func(a []FloatPoint) []UnsignedPoint 223 224// FloatSliceFuncUnsignedReducer is a reducer that aggregates 225// the passed in points and then invokes the function to reduce the points when they are emitted. 226type FloatSliceFuncUnsignedReducer struct { 227 points []FloatPoint 228 fn FloatReduceUnsignedSliceFunc 229} 230 231// NewFloatSliceFuncUnsignedReducer creates a new FloatSliceFuncUnsignedReducer. 232func NewFloatSliceFuncUnsignedReducer(fn FloatReduceUnsignedSliceFunc) *FloatSliceFuncUnsignedReducer { 233 return &FloatSliceFuncUnsignedReducer{fn: fn} 234} 235 236// AggregateFloat copies the FloatPoint into the internal slice to be passed 237// to the reduce function when Emit is called. 238func (r *FloatSliceFuncUnsignedReducer) AggregateFloat(p *FloatPoint) { 239 r.points = append(r.points, *p.Clone()) 240} 241 242// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. 243// This is a more efficient version of calling AggregateFloat on each point. 244func (r *FloatSliceFuncUnsignedReducer) AggregateFloatBulk(points []FloatPoint) { 245 r.points = append(r.points, points...) 246} 247 248// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 249// This method does not clear the points from the internal slice. 250func (r *FloatSliceFuncUnsignedReducer) Emit() []UnsignedPoint { 251 return r.fn(r.points) 252} 253 254// FloatReduceStringFunc is the function called by a FloatPoint reducer. 255type FloatReduceStringFunc func(prev *StringPoint, curr *FloatPoint) (t int64, v string, aux []interface{}) 256 257// FloatFuncStringReducer is a reducer that reduces 258// the passed in points to a single point using a reduce function. 259type FloatFuncStringReducer struct { 260 prev *StringPoint 261 fn FloatReduceStringFunc 262} 263 264// NewFloatFuncStringReducer creates a new FloatFuncStringReducer. 265func NewFloatFuncStringReducer(fn FloatReduceStringFunc, prev *StringPoint) *FloatFuncStringReducer { 266 return &FloatFuncStringReducer{fn: fn, prev: prev} 267} 268 269// AggregateFloat takes a FloatPoint and invokes the reduce function with the 270// current and new point to modify the current point. 271func (r *FloatFuncStringReducer) AggregateFloat(p *FloatPoint) { 272 t, v, aux := r.fn(r.prev, p) 273 if r.prev == nil { 274 r.prev = &StringPoint{} 275 } 276 r.prev.Time = t 277 r.prev.Value = v 278 r.prev.Aux = aux 279 if p.Aggregated > 1 { 280 r.prev.Aggregated += p.Aggregated 281 } else { 282 r.prev.Aggregated++ 283 } 284} 285 286// Emit emits the point that was generated when reducing the points fed in with AggregateFloat. 287func (r *FloatFuncStringReducer) Emit() []StringPoint { 288 return []StringPoint{*r.prev} 289} 290 291// FloatReduceStringSliceFunc is the function called by a FloatPoint reducer. 292type FloatReduceStringSliceFunc func(a []FloatPoint) []StringPoint 293 294// FloatSliceFuncStringReducer is a reducer that aggregates 295// the passed in points and then invokes the function to reduce the points when they are emitted. 296type FloatSliceFuncStringReducer struct { 297 points []FloatPoint 298 fn FloatReduceStringSliceFunc 299} 300 301// NewFloatSliceFuncStringReducer creates a new FloatSliceFuncStringReducer. 302func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFuncStringReducer { 303 return &FloatSliceFuncStringReducer{fn: fn} 304} 305 306// AggregateFloat copies the FloatPoint into the internal slice to be passed 307// to the reduce function when Emit is called. 308func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) { 309 r.points = append(r.points, *p.Clone()) 310} 311 312// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. 313// This is a more efficient version of calling AggregateFloat on each point. 314func (r *FloatSliceFuncStringReducer) AggregateFloatBulk(points []FloatPoint) { 315 r.points = append(r.points, points...) 316} 317 318// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 319// This method does not clear the points from the internal slice. 320func (r *FloatSliceFuncStringReducer) Emit() []StringPoint { 321 return r.fn(r.points) 322} 323 324// FloatReduceBooleanFunc is the function called by a FloatPoint reducer. 325type FloatReduceBooleanFunc func(prev *BooleanPoint, curr *FloatPoint) (t int64, v bool, aux []interface{}) 326 327// FloatFuncBooleanReducer is a reducer that reduces 328// the passed in points to a single point using a reduce function. 329type FloatFuncBooleanReducer struct { 330 prev *BooleanPoint 331 fn FloatReduceBooleanFunc 332} 333 334// NewFloatFuncBooleanReducer creates a new FloatFuncBooleanReducer. 335func NewFloatFuncBooleanReducer(fn FloatReduceBooleanFunc, prev *BooleanPoint) *FloatFuncBooleanReducer { 336 return &FloatFuncBooleanReducer{fn: fn, prev: prev} 337} 338 339// AggregateFloat takes a FloatPoint and invokes the reduce function with the 340// current and new point to modify the current point. 341func (r *FloatFuncBooleanReducer) AggregateFloat(p *FloatPoint) { 342 t, v, aux := r.fn(r.prev, p) 343 if r.prev == nil { 344 r.prev = &BooleanPoint{} 345 } 346 r.prev.Time = t 347 r.prev.Value = v 348 r.prev.Aux = aux 349 if p.Aggregated > 1 { 350 r.prev.Aggregated += p.Aggregated 351 } else { 352 r.prev.Aggregated++ 353 } 354} 355 356// Emit emits the point that was generated when reducing the points fed in with AggregateFloat. 357func (r *FloatFuncBooleanReducer) Emit() []BooleanPoint { 358 return []BooleanPoint{*r.prev} 359} 360 361// FloatReduceBooleanSliceFunc is the function called by a FloatPoint reducer. 362type FloatReduceBooleanSliceFunc func(a []FloatPoint) []BooleanPoint 363 364// FloatSliceFuncBooleanReducer is a reducer that aggregates 365// the passed in points and then invokes the function to reduce the points when they are emitted. 366type FloatSliceFuncBooleanReducer struct { 367 points []FloatPoint 368 fn FloatReduceBooleanSliceFunc 369} 370 371// NewFloatSliceFuncBooleanReducer creates a new FloatSliceFuncBooleanReducer. 372func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSliceFuncBooleanReducer { 373 return &FloatSliceFuncBooleanReducer{fn: fn} 374} 375 376// AggregateFloat copies the FloatPoint into the internal slice to be passed 377// to the reduce function when Emit is called. 378func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) { 379 r.points = append(r.points, *p.Clone()) 380} 381 382// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. 383// This is a more efficient version of calling AggregateFloat on each point. 384func (r *FloatSliceFuncBooleanReducer) AggregateFloatBulk(points []FloatPoint) { 385 r.points = append(r.points, points...) 386} 387 388// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 389// This method does not clear the points from the internal slice. 390func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint { 391 return r.fn(r.points) 392} 393 394// FloatDistinctReducer returns the distinct points in a series. 395type FloatDistinctReducer struct { 396 m map[float64]FloatPoint 397} 398 399// NewFloatDistinctReducer creates a new FloatDistinctReducer. 400func NewFloatDistinctReducer() *FloatDistinctReducer { 401 return &FloatDistinctReducer{m: make(map[float64]FloatPoint)} 402} 403 404// AggregateFloat aggregates a point into the reducer. 405func (r *FloatDistinctReducer) AggregateFloat(p *FloatPoint) { 406 if _, ok := r.m[p.Value]; !ok { 407 r.m[p.Value] = *p 408 } 409} 410 411// Emit emits the distinct points that have been aggregated into the reducer. 412func (r *FloatDistinctReducer) Emit() []FloatPoint { 413 points := make([]FloatPoint, 0, len(r.m)) 414 for _, p := range r.m { 415 points = append(points, FloatPoint{Time: p.Time, Value: p.Value}) 416 } 417 sort.Sort(floatPoints(points)) 418 return points 419} 420 421// FloatElapsedReducer calculates the elapsed of the aggregated points. 422type FloatElapsedReducer struct { 423 unitConversion int64 424 prev FloatPoint 425 curr FloatPoint 426} 427 428// NewFloatElapsedReducer creates a new FloatElapsedReducer. 429func NewFloatElapsedReducer(interval Interval) *FloatElapsedReducer { 430 return &FloatElapsedReducer{ 431 unitConversion: int64(interval.Duration), 432 prev: FloatPoint{Nil: true}, 433 curr: FloatPoint{Nil: true}, 434 } 435} 436 437// AggregateFloat aggregates a point into the reducer and updates the current window. 438func (r *FloatElapsedReducer) AggregateFloat(p *FloatPoint) { 439 r.prev = r.curr 440 r.curr = *p 441} 442 443// Emit emits the elapsed of the reducer at the current point. 444func (r *FloatElapsedReducer) Emit() []IntegerPoint { 445 if !r.prev.Nil { 446 elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion 447 return []IntegerPoint{ 448 {Time: r.curr.Time, Value: elapsed}, 449 } 450 } 451 return nil 452} 453 454// FloatSampleReducer implements a reservoir sampling to calculate a random subset of points 455type FloatSampleReducer struct { 456 count int // how many points we've iterated over 457 rng *rand.Rand // random number generator for each reducer 458 459 points floatPoints // the reservoir 460} 461 462// NewFloatSampleReducer creates a new FloatSampleReducer 463func NewFloatSampleReducer(size int) *FloatSampleReducer { 464 return &FloatSampleReducer{ 465 rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ 466 points: make(floatPoints, size), 467 } 468} 469 470// AggregateFloat aggregates a point into the reducer. 471func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) { 472 r.count++ 473 // Fill the reservoir with the first n points 474 if r.count-1 < len(r.points) { 475 p.CopyTo(&r.points[r.count-1]) 476 return 477 } 478 479 // Generate a random integer between 1 and the count and 480 // if that number is less than the length of the slice 481 // replace the point at that index rnd with p. 482 rnd := r.rng.Intn(r.count) 483 if rnd < len(r.points) { 484 p.CopyTo(&r.points[rnd]) 485 } 486} 487 488// Emit emits the reservoir sample as many points. 489func (r *FloatSampleReducer) Emit() []FloatPoint { 490 min := len(r.points) 491 if r.count < min { 492 min = r.count 493 } 494 pts := r.points[:min] 495 sort.Sort(pts) 496 return pts 497} 498 499// IntegerPointAggregator aggregates points to produce a single point. 500type IntegerPointAggregator interface { 501 AggregateInteger(p *IntegerPoint) 502} 503 504// IntegerBulkPointAggregator aggregates multiple points at a time. 505type IntegerBulkPointAggregator interface { 506 AggregateIntegerBulk(points []IntegerPoint) 507} 508 509// AggregateIntegerPoints feeds a slice of IntegerPoint into an 510// aggregator. If the aggregator is a IntegerBulkPointAggregator, it will 511// use the AggregateBulk method. 512func AggregateIntegerPoints(a IntegerPointAggregator, points []IntegerPoint) { 513 switch a := a.(type) { 514 case IntegerBulkPointAggregator: 515 a.AggregateIntegerBulk(points) 516 default: 517 for _, p := range points { 518 a.AggregateInteger(&p) 519 } 520 } 521} 522 523// IntegerPointEmitter produces a single point from an aggregate. 524type IntegerPointEmitter interface { 525 Emit() []IntegerPoint 526} 527 528// IntegerReduceFloatFunc is the function called by a IntegerPoint reducer. 529type IntegerReduceFloatFunc func(prev *FloatPoint, curr *IntegerPoint) (t int64, v float64, aux []interface{}) 530 531// IntegerFuncFloatReducer is a reducer that reduces 532// the passed in points to a single point using a reduce function. 533type IntegerFuncFloatReducer struct { 534 prev *FloatPoint 535 fn IntegerReduceFloatFunc 536} 537 538// NewIntegerFuncFloatReducer creates a new IntegerFuncFloatReducer. 539func NewIntegerFuncFloatReducer(fn IntegerReduceFloatFunc, prev *FloatPoint) *IntegerFuncFloatReducer { 540 return &IntegerFuncFloatReducer{fn: fn, prev: prev} 541} 542 543// AggregateInteger takes a IntegerPoint and invokes the reduce function with the 544// current and new point to modify the current point. 545func (r *IntegerFuncFloatReducer) AggregateInteger(p *IntegerPoint) { 546 t, v, aux := r.fn(r.prev, p) 547 if r.prev == nil { 548 r.prev = &FloatPoint{} 549 } 550 r.prev.Time = t 551 r.prev.Value = v 552 r.prev.Aux = aux 553 if p.Aggregated > 1 { 554 r.prev.Aggregated += p.Aggregated 555 } else { 556 r.prev.Aggregated++ 557 } 558} 559 560// Emit emits the point that was generated when reducing the points fed in with AggregateInteger. 561func (r *IntegerFuncFloatReducer) Emit() []FloatPoint { 562 return []FloatPoint{*r.prev} 563} 564 565// IntegerReduceFloatSliceFunc is the function called by a IntegerPoint reducer. 566type IntegerReduceFloatSliceFunc func(a []IntegerPoint) []FloatPoint 567 568// IntegerSliceFuncFloatReducer is a reducer that aggregates 569// the passed in points and then invokes the function to reduce the points when they are emitted. 570type IntegerSliceFuncFloatReducer struct { 571 points []IntegerPoint 572 fn IntegerReduceFloatSliceFunc 573} 574 575// NewIntegerSliceFuncFloatReducer creates a new IntegerSliceFuncFloatReducer. 576func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSliceFuncFloatReducer { 577 return &IntegerSliceFuncFloatReducer{fn: fn} 578} 579 580// AggregateInteger copies the IntegerPoint into the internal slice to be passed 581// to the reduce function when Emit is called. 582func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) { 583 r.points = append(r.points, *p.Clone()) 584} 585 586// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. 587// This is a more efficient version of calling AggregateInteger on each point. 588func (r *IntegerSliceFuncFloatReducer) AggregateIntegerBulk(points []IntegerPoint) { 589 r.points = append(r.points, points...) 590} 591 592// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 593// This method does not clear the points from the internal slice. 594func (r *IntegerSliceFuncFloatReducer) Emit() []FloatPoint { 595 return r.fn(r.points) 596} 597 598// IntegerReduceFunc is the function called by a IntegerPoint reducer. 599type IntegerReduceFunc func(prev *IntegerPoint, curr *IntegerPoint) (t int64, v int64, aux []interface{}) 600 601// IntegerFuncReducer is a reducer that reduces 602// the passed in points to a single point using a reduce function. 603type IntegerFuncReducer struct { 604 prev *IntegerPoint 605 fn IntegerReduceFunc 606} 607 608// NewIntegerFuncReducer creates a new IntegerFuncIntegerReducer. 609func NewIntegerFuncReducer(fn IntegerReduceFunc, prev *IntegerPoint) *IntegerFuncReducer { 610 return &IntegerFuncReducer{fn: fn, prev: prev} 611} 612 613// AggregateInteger takes a IntegerPoint and invokes the reduce function with the 614// current and new point to modify the current point. 615func (r *IntegerFuncReducer) AggregateInteger(p *IntegerPoint) { 616 t, v, aux := r.fn(r.prev, p) 617 if r.prev == nil { 618 r.prev = &IntegerPoint{} 619 } 620 r.prev.Time = t 621 r.prev.Value = v 622 r.prev.Aux = aux 623 if p.Aggregated > 1 { 624 r.prev.Aggregated += p.Aggregated 625 } else { 626 r.prev.Aggregated++ 627 } 628} 629 630// Emit emits the point that was generated when reducing the points fed in with AggregateInteger. 631func (r *IntegerFuncReducer) Emit() []IntegerPoint { 632 return []IntegerPoint{*r.prev} 633} 634 635// IntegerReduceSliceFunc is the function called by a IntegerPoint reducer. 636type IntegerReduceSliceFunc func(a []IntegerPoint) []IntegerPoint 637 638// IntegerSliceFuncReducer is a reducer that aggregates 639// the passed in points and then invokes the function to reduce the points when they are emitted. 640type IntegerSliceFuncReducer struct { 641 points []IntegerPoint 642 fn IntegerReduceSliceFunc 643} 644 645// NewIntegerSliceFuncReducer creates a new IntegerSliceFuncReducer. 646func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncReducer { 647 return &IntegerSliceFuncReducer{fn: fn} 648} 649 650// AggregateInteger copies the IntegerPoint into the internal slice to be passed 651// to the reduce function when Emit is called. 652func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) { 653 r.points = append(r.points, *p.Clone()) 654} 655 656// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. 657// This is a more efficient version of calling AggregateInteger on each point. 658func (r *IntegerSliceFuncReducer) AggregateIntegerBulk(points []IntegerPoint) { 659 r.points = append(r.points, points...) 660} 661 662// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 663// This method does not clear the points from the internal slice. 664func (r *IntegerSliceFuncReducer) Emit() []IntegerPoint { 665 return r.fn(r.points) 666} 667 668// IntegerReduceUnsignedFunc is the function called by a IntegerPoint reducer. 669type IntegerReduceUnsignedFunc func(prev *UnsignedPoint, curr *IntegerPoint) (t int64, v uint64, aux []interface{}) 670 671// IntegerFuncUnsignedReducer is a reducer that reduces 672// the passed in points to a single point using a reduce function. 673type IntegerFuncUnsignedReducer struct { 674 prev *UnsignedPoint 675 fn IntegerReduceUnsignedFunc 676} 677 678// NewIntegerFuncUnsignedReducer creates a new IntegerFuncUnsignedReducer. 679func NewIntegerFuncUnsignedReducer(fn IntegerReduceUnsignedFunc, prev *UnsignedPoint) *IntegerFuncUnsignedReducer { 680 return &IntegerFuncUnsignedReducer{fn: fn, prev: prev} 681} 682 683// AggregateInteger takes a IntegerPoint and invokes the reduce function with the 684// current and new point to modify the current point. 685func (r *IntegerFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) { 686 t, v, aux := r.fn(r.prev, p) 687 if r.prev == nil { 688 r.prev = &UnsignedPoint{} 689 } 690 r.prev.Time = t 691 r.prev.Value = v 692 r.prev.Aux = aux 693 if p.Aggregated > 1 { 694 r.prev.Aggregated += p.Aggregated 695 } else { 696 r.prev.Aggregated++ 697 } 698} 699 700// Emit emits the point that was generated when reducing the points fed in with AggregateInteger. 701func (r *IntegerFuncUnsignedReducer) Emit() []UnsignedPoint { 702 return []UnsignedPoint{*r.prev} 703} 704 705// IntegerReduceUnsignedSliceFunc is the function called by a IntegerPoint reducer. 706type IntegerReduceUnsignedSliceFunc func(a []IntegerPoint) []UnsignedPoint 707 708// IntegerSliceFuncUnsignedReducer is a reducer that aggregates 709// the passed in points and then invokes the function to reduce the points when they are emitted. 710type IntegerSliceFuncUnsignedReducer struct { 711 points []IntegerPoint 712 fn IntegerReduceUnsignedSliceFunc 713} 714 715// NewIntegerSliceFuncUnsignedReducer creates a new IntegerSliceFuncUnsignedReducer. 716func NewIntegerSliceFuncUnsignedReducer(fn IntegerReduceUnsignedSliceFunc) *IntegerSliceFuncUnsignedReducer { 717 return &IntegerSliceFuncUnsignedReducer{fn: fn} 718} 719 720// AggregateInteger copies the IntegerPoint into the internal slice to be passed 721// to the reduce function when Emit is called. 722func (r *IntegerSliceFuncUnsignedReducer) AggregateInteger(p *IntegerPoint) { 723 r.points = append(r.points, *p.Clone()) 724} 725 726// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. 727// This is a more efficient version of calling AggregateInteger on each point. 728func (r *IntegerSliceFuncUnsignedReducer) AggregateIntegerBulk(points []IntegerPoint) { 729 r.points = append(r.points, points...) 730} 731 732// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 733// This method does not clear the points from the internal slice. 734func (r *IntegerSliceFuncUnsignedReducer) Emit() []UnsignedPoint { 735 return r.fn(r.points) 736} 737 738// IntegerReduceStringFunc is the function called by a IntegerPoint reducer. 739type IntegerReduceStringFunc func(prev *StringPoint, curr *IntegerPoint) (t int64, v string, aux []interface{}) 740 741// IntegerFuncStringReducer is a reducer that reduces 742// the passed in points to a single point using a reduce function. 743type IntegerFuncStringReducer struct { 744 prev *StringPoint 745 fn IntegerReduceStringFunc 746} 747 748// NewIntegerFuncStringReducer creates a new IntegerFuncStringReducer. 749func NewIntegerFuncStringReducer(fn IntegerReduceStringFunc, prev *StringPoint) *IntegerFuncStringReducer { 750 return &IntegerFuncStringReducer{fn: fn, prev: prev} 751} 752 753// AggregateInteger takes a IntegerPoint and invokes the reduce function with the 754// current and new point to modify the current point. 755func (r *IntegerFuncStringReducer) AggregateInteger(p *IntegerPoint) { 756 t, v, aux := r.fn(r.prev, p) 757 if r.prev == nil { 758 r.prev = &StringPoint{} 759 } 760 r.prev.Time = t 761 r.prev.Value = v 762 r.prev.Aux = aux 763 if p.Aggregated > 1 { 764 r.prev.Aggregated += p.Aggregated 765 } else { 766 r.prev.Aggregated++ 767 } 768} 769 770// Emit emits the point that was generated when reducing the points fed in with AggregateInteger. 771func (r *IntegerFuncStringReducer) Emit() []StringPoint { 772 return []StringPoint{*r.prev} 773} 774 775// IntegerReduceStringSliceFunc is the function called by a IntegerPoint reducer. 776type IntegerReduceStringSliceFunc func(a []IntegerPoint) []StringPoint 777 778// IntegerSliceFuncStringReducer is a reducer that aggregates 779// the passed in points and then invokes the function to reduce the points when they are emitted. 780type IntegerSliceFuncStringReducer struct { 781 points []IntegerPoint 782 fn IntegerReduceStringSliceFunc 783} 784 785// NewIntegerSliceFuncStringReducer creates a new IntegerSliceFuncStringReducer. 786func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerSliceFuncStringReducer { 787 return &IntegerSliceFuncStringReducer{fn: fn} 788} 789 790// AggregateInteger copies the IntegerPoint into the internal slice to be passed 791// to the reduce function when Emit is called. 792func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) { 793 r.points = append(r.points, *p.Clone()) 794} 795 796// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. 797// This is a more efficient version of calling AggregateInteger on each point. 798func (r *IntegerSliceFuncStringReducer) AggregateIntegerBulk(points []IntegerPoint) { 799 r.points = append(r.points, points...) 800} 801 802// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 803// This method does not clear the points from the internal slice. 804func (r *IntegerSliceFuncStringReducer) Emit() []StringPoint { 805 return r.fn(r.points) 806} 807 808// IntegerReduceBooleanFunc is the function called by a IntegerPoint reducer. 809type IntegerReduceBooleanFunc func(prev *BooleanPoint, curr *IntegerPoint) (t int64, v bool, aux []interface{}) 810 811// IntegerFuncBooleanReducer is a reducer that reduces 812// the passed in points to a single point using a reduce function. 813type IntegerFuncBooleanReducer struct { 814 prev *BooleanPoint 815 fn IntegerReduceBooleanFunc 816} 817 818// NewIntegerFuncBooleanReducer creates a new IntegerFuncBooleanReducer. 819func NewIntegerFuncBooleanReducer(fn IntegerReduceBooleanFunc, prev *BooleanPoint) *IntegerFuncBooleanReducer { 820 return &IntegerFuncBooleanReducer{fn: fn, prev: prev} 821} 822 823// AggregateInteger takes a IntegerPoint and invokes the reduce function with the 824// current and new point to modify the current point. 825func (r *IntegerFuncBooleanReducer) AggregateInteger(p *IntegerPoint) { 826 t, v, aux := r.fn(r.prev, p) 827 if r.prev == nil { 828 r.prev = &BooleanPoint{} 829 } 830 r.prev.Time = t 831 r.prev.Value = v 832 r.prev.Aux = aux 833 if p.Aggregated > 1 { 834 r.prev.Aggregated += p.Aggregated 835 } else { 836 r.prev.Aggregated++ 837 } 838} 839 840// Emit emits the point that was generated when reducing the points fed in with AggregateInteger. 841func (r *IntegerFuncBooleanReducer) Emit() []BooleanPoint { 842 return []BooleanPoint{*r.prev} 843} 844 845// IntegerReduceBooleanSliceFunc is the function called by a IntegerPoint reducer. 846type IntegerReduceBooleanSliceFunc func(a []IntegerPoint) []BooleanPoint 847 848// IntegerSliceFuncBooleanReducer is a reducer that aggregates 849// the passed in points and then invokes the function to reduce the points when they are emitted. 850type IntegerSliceFuncBooleanReducer struct { 851 points []IntegerPoint 852 fn IntegerReduceBooleanSliceFunc 853} 854 855// NewIntegerSliceFuncBooleanReducer creates a new IntegerSliceFuncBooleanReducer. 856func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *IntegerSliceFuncBooleanReducer { 857 return &IntegerSliceFuncBooleanReducer{fn: fn} 858} 859 860// AggregateInteger copies the IntegerPoint into the internal slice to be passed 861// to the reduce function when Emit is called. 862func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) { 863 r.points = append(r.points, *p.Clone()) 864} 865 866// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. 867// This is a more efficient version of calling AggregateInteger on each point. 868func (r *IntegerSliceFuncBooleanReducer) AggregateIntegerBulk(points []IntegerPoint) { 869 r.points = append(r.points, points...) 870} 871 872// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 873// This method does not clear the points from the internal slice. 874func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint { 875 return r.fn(r.points) 876} 877 878// IntegerDistinctReducer returns the distinct points in a series. 879type IntegerDistinctReducer struct { 880 m map[int64]IntegerPoint 881} 882 883// NewIntegerDistinctReducer creates a new IntegerDistinctReducer. 884func NewIntegerDistinctReducer() *IntegerDistinctReducer { 885 return &IntegerDistinctReducer{m: make(map[int64]IntegerPoint)} 886} 887 888// AggregateInteger aggregates a point into the reducer. 889func (r *IntegerDistinctReducer) AggregateInteger(p *IntegerPoint) { 890 if _, ok := r.m[p.Value]; !ok { 891 r.m[p.Value] = *p 892 } 893} 894 895// Emit emits the distinct points that have been aggregated into the reducer. 896func (r *IntegerDistinctReducer) Emit() []IntegerPoint { 897 points := make([]IntegerPoint, 0, len(r.m)) 898 for _, p := range r.m { 899 points = append(points, IntegerPoint{Time: p.Time, Value: p.Value}) 900 } 901 sort.Sort(integerPoints(points)) 902 return points 903} 904 905// IntegerElapsedReducer calculates the elapsed of the aggregated points. 906type IntegerElapsedReducer struct { 907 unitConversion int64 908 prev IntegerPoint 909 curr IntegerPoint 910} 911 912// NewIntegerElapsedReducer creates a new IntegerElapsedReducer. 913func NewIntegerElapsedReducer(interval Interval) *IntegerElapsedReducer { 914 return &IntegerElapsedReducer{ 915 unitConversion: int64(interval.Duration), 916 prev: IntegerPoint{Nil: true}, 917 curr: IntegerPoint{Nil: true}, 918 } 919} 920 921// AggregateInteger aggregates a point into the reducer and updates the current window. 922func (r *IntegerElapsedReducer) AggregateInteger(p *IntegerPoint) { 923 r.prev = r.curr 924 r.curr = *p 925} 926 927// Emit emits the elapsed of the reducer at the current point. 928func (r *IntegerElapsedReducer) Emit() []IntegerPoint { 929 if !r.prev.Nil { 930 elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion 931 return []IntegerPoint{ 932 {Time: r.curr.Time, Value: elapsed}, 933 } 934 } 935 return nil 936} 937 938// IntegerSampleReducer implements a reservoir sampling to calculate a random subset of points 939type IntegerSampleReducer struct { 940 count int // how many points we've iterated over 941 rng *rand.Rand // random number generator for each reducer 942 943 points integerPoints // the reservoir 944} 945 946// NewIntegerSampleReducer creates a new IntegerSampleReducer 947func NewIntegerSampleReducer(size int) *IntegerSampleReducer { 948 return &IntegerSampleReducer{ 949 rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ 950 points: make(integerPoints, size), 951 } 952} 953 954// AggregateInteger aggregates a point into the reducer. 955func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) { 956 r.count++ 957 // Fill the reservoir with the first n points 958 if r.count-1 < len(r.points) { 959 p.CopyTo(&r.points[r.count-1]) 960 return 961 } 962 963 // Generate a random integer between 1 and the count and 964 // if that number is less than the length of the slice 965 // replace the point at that index rnd with p. 966 rnd := r.rng.Intn(r.count) 967 if rnd < len(r.points) { 968 p.CopyTo(&r.points[rnd]) 969 } 970} 971 972// Emit emits the reservoir sample as many points. 973func (r *IntegerSampleReducer) Emit() []IntegerPoint { 974 min := len(r.points) 975 if r.count < min { 976 min = r.count 977 } 978 pts := r.points[:min] 979 sort.Sort(pts) 980 return pts 981} 982 983// UnsignedPointAggregator aggregates points to produce a single point. 984type UnsignedPointAggregator interface { 985 AggregateUnsigned(p *UnsignedPoint) 986} 987 988// UnsignedBulkPointAggregator aggregates multiple points at a time. 989type UnsignedBulkPointAggregator interface { 990 AggregateUnsignedBulk(points []UnsignedPoint) 991} 992 993// AggregateUnsignedPoints feeds a slice of UnsignedPoint into an 994// aggregator. If the aggregator is a UnsignedBulkPointAggregator, it will 995// use the AggregateBulk method. 996func AggregateUnsignedPoints(a UnsignedPointAggregator, points []UnsignedPoint) { 997 switch a := a.(type) { 998 case UnsignedBulkPointAggregator: 999 a.AggregateUnsignedBulk(points) 1000 default: 1001 for _, p := range points { 1002 a.AggregateUnsigned(&p) 1003 } 1004 } 1005} 1006 1007// UnsignedPointEmitter produces a single point from an aggregate. 1008type UnsignedPointEmitter interface { 1009 Emit() []UnsignedPoint 1010} 1011 1012// UnsignedReduceFloatFunc is the function called by a UnsignedPoint reducer. 1013type UnsignedReduceFloatFunc func(prev *FloatPoint, curr *UnsignedPoint) (t int64, v float64, aux []interface{}) 1014 1015// UnsignedFuncFloatReducer is a reducer that reduces 1016// the passed in points to a single point using a reduce function. 1017type UnsignedFuncFloatReducer struct { 1018 prev *FloatPoint 1019 fn UnsignedReduceFloatFunc 1020} 1021 1022// NewUnsignedFuncFloatReducer creates a new UnsignedFuncFloatReducer. 1023func NewUnsignedFuncFloatReducer(fn UnsignedReduceFloatFunc, prev *FloatPoint) *UnsignedFuncFloatReducer { 1024 return &UnsignedFuncFloatReducer{fn: fn, prev: prev} 1025} 1026 1027// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the 1028// current and new point to modify the current point. 1029func (r *UnsignedFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) { 1030 t, v, aux := r.fn(r.prev, p) 1031 if r.prev == nil { 1032 r.prev = &FloatPoint{} 1033 } 1034 r.prev.Time = t 1035 r.prev.Value = v 1036 r.prev.Aux = aux 1037 if p.Aggregated > 1 { 1038 r.prev.Aggregated += p.Aggregated 1039 } else { 1040 r.prev.Aggregated++ 1041 } 1042} 1043 1044// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned. 1045func (r *UnsignedFuncFloatReducer) Emit() []FloatPoint { 1046 return []FloatPoint{*r.prev} 1047} 1048 1049// UnsignedReduceFloatSliceFunc is the function called by a UnsignedPoint reducer. 1050type UnsignedReduceFloatSliceFunc func(a []UnsignedPoint) []FloatPoint 1051 1052// UnsignedSliceFuncFloatReducer is a reducer that aggregates 1053// the passed in points and then invokes the function to reduce the points when they are emitted. 1054type UnsignedSliceFuncFloatReducer struct { 1055 points []UnsignedPoint 1056 fn UnsignedReduceFloatSliceFunc 1057} 1058 1059// NewUnsignedSliceFuncFloatReducer creates a new UnsignedSliceFuncFloatReducer. 1060func NewUnsignedSliceFuncFloatReducer(fn UnsignedReduceFloatSliceFunc) *UnsignedSliceFuncFloatReducer { 1061 return &UnsignedSliceFuncFloatReducer{fn: fn} 1062} 1063 1064// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed 1065// to the reduce function when Emit is called. 1066func (r *UnsignedSliceFuncFloatReducer) AggregateUnsigned(p *UnsignedPoint) { 1067 r.points = append(r.points, *p.Clone()) 1068} 1069 1070// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice. 1071// This is a more efficient version of calling AggregateUnsigned on each point. 1072func (r *UnsignedSliceFuncFloatReducer) AggregateUnsignedBulk(points []UnsignedPoint) { 1073 r.points = append(r.points, points...) 1074} 1075 1076// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1077// This method does not clear the points from the internal slice. 1078func (r *UnsignedSliceFuncFloatReducer) Emit() []FloatPoint { 1079 return r.fn(r.points) 1080} 1081 1082// UnsignedReduceIntegerFunc is the function called by a UnsignedPoint reducer. 1083type UnsignedReduceIntegerFunc func(prev *IntegerPoint, curr *UnsignedPoint) (t int64, v int64, aux []interface{}) 1084 1085// UnsignedFuncIntegerReducer is a reducer that reduces 1086// the passed in points to a single point using a reduce function. 1087type UnsignedFuncIntegerReducer struct { 1088 prev *IntegerPoint 1089 fn UnsignedReduceIntegerFunc 1090} 1091 1092// NewUnsignedFuncIntegerReducer creates a new UnsignedFuncIntegerReducer. 1093func NewUnsignedFuncIntegerReducer(fn UnsignedReduceIntegerFunc, prev *IntegerPoint) *UnsignedFuncIntegerReducer { 1094 return &UnsignedFuncIntegerReducer{fn: fn, prev: prev} 1095} 1096 1097// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the 1098// current and new point to modify the current point. 1099func (r *UnsignedFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) { 1100 t, v, aux := r.fn(r.prev, p) 1101 if r.prev == nil { 1102 r.prev = &IntegerPoint{} 1103 } 1104 r.prev.Time = t 1105 r.prev.Value = v 1106 r.prev.Aux = aux 1107 if p.Aggregated > 1 { 1108 r.prev.Aggregated += p.Aggregated 1109 } else { 1110 r.prev.Aggregated++ 1111 } 1112} 1113 1114// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned. 1115func (r *UnsignedFuncIntegerReducer) Emit() []IntegerPoint { 1116 return []IntegerPoint{*r.prev} 1117} 1118 1119// UnsignedReduceIntegerSliceFunc is the function called by a UnsignedPoint reducer. 1120type UnsignedReduceIntegerSliceFunc func(a []UnsignedPoint) []IntegerPoint 1121 1122// UnsignedSliceFuncIntegerReducer is a reducer that aggregates 1123// the passed in points and then invokes the function to reduce the points when they are emitted. 1124type UnsignedSliceFuncIntegerReducer struct { 1125 points []UnsignedPoint 1126 fn UnsignedReduceIntegerSliceFunc 1127} 1128 1129// NewUnsignedSliceFuncIntegerReducer creates a new UnsignedSliceFuncIntegerReducer. 1130func NewUnsignedSliceFuncIntegerReducer(fn UnsignedReduceIntegerSliceFunc) *UnsignedSliceFuncIntegerReducer { 1131 return &UnsignedSliceFuncIntegerReducer{fn: fn} 1132} 1133 1134// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed 1135// to the reduce function when Emit is called. 1136func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsigned(p *UnsignedPoint) { 1137 r.points = append(r.points, *p.Clone()) 1138} 1139 1140// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice. 1141// This is a more efficient version of calling AggregateUnsigned on each point. 1142func (r *UnsignedSliceFuncIntegerReducer) AggregateUnsignedBulk(points []UnsignedPoint) { 1143 r.points = append(r.points, points...) 1144} 1145 1146// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1147// This method does not clear the points from the internal slice. 1148func (r *UnsignedSliceFuncIntegerReducer) Emit() []IntegerPoint { 1149 return r.fn(r.points) 1150} 1151 1152// UnsignedReduceFunc is the function called by a UnsignedPoint reducer. 1153type UnsignedReduceFunc func(prev *UnsignedPoint, curr *UnsignedPoint) (t int64, v uint64, aux []interface{}) 1154 1155// UnsignedFuncReducer is a reducer that reduces 1156// the passed in points to a single point using a reduce function. 1157type UnsignedFuncReducer struct { 1158 prev *UnsignedPoint 1159 fn UnsignedReduceFunc 1160} 1161 1162// NewUnsignedFuncReducer creates a new UnsignedFuncUnsignedReducer. 1163func NewUnsignedFuncReducer(fn UnsignedReduceFunc, prev *UnsignedPoint) *UnsignedFuncReducer { 1164 return &UnsignedFuncReducer{fn: fn, prev: prev} 1165} 1166 1167// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the 1168// current and new point to modify the current point. 1169func (r *UnsignedFuncReducer) AggregateUnsigned(p *UnsignedPoint) { 1170 t, v, aux := r.fn(r.prev, p) 1171 if r.prev == nil { 1172 r.prev = &UnsignedPoint{} 1173 } 1174 r.prev.Time = t 1175 r.prev.Value = v 1176 r.prev.Aux = aux 1177 if p.Aggregated > 1 { 1178 r.prev.Aggregated += p.Aggregated 1179 } else { 1180 r.prev.Aggregated++ 1181 } 1182} 1183 1184// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned. 1185func (r *UnsignedFuncReducer) Emit() []UnsignedPoint { 1186 return []UnsignedPoint{*r.prev} 1187} 1188 1189// UnsignedReduceSliceFunc is the function called by a UnsignedPoint reducer. 1190type UnsignedReduceSliceFunc func(a []UnsignedPoint) []UnsignedPoint 1191 1192// UnsignedSliceFuncReducer is a reducer that aggregates 1193// the passed in points and then invokes the function to reduce the points when they are emitted. 1194type UnsignedSliceFuncReducer struct { 1195 points []UnsignedPoint 1196 fn UnsignedReduceSliceFunc 1197} 1198 1199// NewUnsignedSliceFuncReducer creates a new UnsignedSliceFuncReducer. 1200func NewUnsignedSliceFuncReducer(fn UnsignedReduceSliceFunc) *UnsignedSliceFuncReducer { 1201 return &UnsignedSliceFuncReducer{fn: fn} 1202} 1203 1204// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed 1205// to the reduce function when Emit is called. 1206func (r *UnsignedSliceFuncReducer) AggregateUnsigned(p *UnsignedPoint) { 1207 r.points = append(r.points, *p.Clone()) 1208} 1209 1210// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice. 1211// This is a more efficient version of calling AggregateUnsigned on each point. 1212func (r *UnsignedSliceFuncReducer) AggregateUnsignedBulk(points []UnsignedPoint) { 1213 r.points = append(r.points, points...) 1214} 1215 1216// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1217// This method does not clear the points from the internal slice. 1218func (r *UnsignedSliceFuncReducer) Emit() []UnsignedPoint { 1219 return r.fn(r.points) 1220} 1221 1222// UnsignedReduceStringFunc is the function called by a UnsignedPoint reducer. 1223type UnsignedReduceStringFunc func(prev *StringPoint, curr *UnsignedPoint) (t int64, v string, aux []interface{}) 1224 1225// UnsignedFuncStringReducer is a reducer that reduces 1226// the passed in points to a single point using a reduce function. 1227type UnsignedFuncStringReducer struct { 1228 prev *StringPoint 1229 fn UnsignedReduceStringFunc 1230} 1231 1232// NewUnsignedFuncStringReducer creates a new UnsignedFuncStringReducer. 1233func NewUnsignedFuncStringReducer(fn UnsignedReduceStringFunc, prev *StringPoint) *UnsignedFuncStringReducer { 1234 return &UnsignedFuncStringReducer{fn: fn, prev: prev} 1235} 1236 1237// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the 1238// current and new point to modify the current point. 1239func (r *UnsignedFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) { 1240 t, v, aux := r.fn(r.prev, p) 1241 if r.prev == nil { 1242 r.prev = &StringPoint{} 1243 } 1244 r.prev.Time = t 1245 r.prev.Value = v 1246 r.prev.Aux = aux 1247 if p.Aggregated > 1 { 1248 r.prev.Aggregated += p.Aggregated 1249 } else { 1250 r.prev.Aggregated++ 1251 } 1252} 1253 1254// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned. 1255func (r *UnsignedFuncStringReducer) Emit() []StringPoint { 1256 return []StringPoint{*r.prev} 1257} 1258 1259// UnsignedReduceStringSliceFunc is the function called by a UnsignedPoint reducer. 1260type UnsignedReduceStringSliceFunc func(a []UnsignedPoint) []StringPoint 1261 1262// UnsignedSliceFuncStringReducer is a reducer that aggregates 1263// the passed in points and then invokes the function to reduce the points when they are emitted. 1264type UnsignedSliceFuncStringReducer struct { 1265 points []UnsignedPoint 1266 fn UnsignedReduceStringSliceFunc 1267} 1268 1269// NewUnsignedSliceFuncStringReducer creates a new UnsignedSliceFuncStringReducer. 1270func NewUnsignedSliceFuncStringReducer(fn UnsignedReduceStringSliceFunc) *UnsignedSliceFuncStringReducer { 1271 return &UnsignedSliceFuncStringReducer{fn: fn} 1272} 1273 1274// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed 1275// to the reduce function when Emit is called. 1276func (r *UnsignedSliceFuncStringReducer) AggregateUnsigned(p *UnsignedPoint) { 1277 r.points = append(r.points, *p.Clone()) 1278} 1279 1280// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice. 1281// This is a more efficient version of calling AggregateUnsigned on each point. 1282func (r *UnsignedSliceFuncStringReducer) AggregateUnsignedBulk(points []UnsignedPoint) { 1283 r.points = append(r.points, points...) 1284} 1285 1286// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1287// This method does not clear the points from the internal slice. 1288func (r *UnsignedSliceFuncStringReducer) Emit() []StringPoint { 1289 return r.fn(r.points) 1290} 1291 1292// UnsignedReduceBooleanFunc is the function called by a UnsignedPoint reducer. 1293type UnsignedReduceBooleanFunc func(prev *BooleanPoint, curr *UnsignedPoint) (t int64, v bool, aux []interface{}) 1294 1295// UnsignedFuncBooleanReducer is a reducer that reduces 1296// the passed in points to a single point using a reduce function. 1297type UnsignedFuncBooleanReducer struct { 1298 prev *BooleanPoint 1299 fn UnsignedReduceBooleanFunc 1300} 1301 1302// NewUnsignedFuncBooleanReducer creates a new UnsignedFuncBooleanReducer. 1303func NewUnsignedFuncBooleanReducer(fn UnsignedReduceBooleanFunc, prev *BooleanPoint) *UnsignedFuncBooleanReducer { 1304 return &UnsignedFuncBooleanReducer{fn: fn, prev: prev} 1305} 1306 1307// AggregateUnsigned takes a UnsignedPoint and invokes the reduce function with the 1308// current and new point to modify the current point. 1309func (r *UnsignedFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) { 1310 t, v, aux := r.fn(r.prev, p) 1311 if r.prev == nil { 1312 r.prev = &BooleanPoint{} 1313 } 1314 r.prev.Time = t 1315 r.prev.Value = v 1316 r.prev.Aux = aux 1317 if p.Aggregated > 1 { 1318 r.prev.Aggregated += p.Aggregated 1319 } else { 1320 r.prev.Aggregated++ 1321 } 1322} 1323 1324// Emit emits the point that was generated when reducing the points fed in with AggregateUnsigned. 1325func (r *UnsignedFuncBooleanReducer) Emit() []BooleanPoint { 1326 return []BooleanPoint{*r.prev} 1327} 1328 1329// UnsignedReduceBooleanSliceFunc is the function called by a UnsignedPoint reducer. 1330type UnsignedReduceBooleanSliceFunc func(a []UnsignedPoint) []BooleanPoint 1331 1332// UnsignedSliceFuncBooleanReducer is a reducer that aggregates 1333// the passed in points and then invokes the function to reduce the points when they are emitted. 1334type UnsignedSliceFuncBooleanReducer struct { 1335 points []UnsignedPoint 1336 fn UnsignedReduceBooleanSliceFunc 1337} 1338 1339// NewUnsignedSliceFuncBooleanReducer creates a new UnsignedSliceFuncBooleanReducer. 1340func NewUnsignedSliceFuncBooleanReducer(fn UnsignedReduceBooleanSliceFunc) *UnsignedSliceFuncBooleanReducer { 1341 return &UnsignedSliceFuncBooleanReducer{fn: fn} 1342} 1343 1344// AggregateUnsigned copies the UnsignedPoint into the internal slice to be passed 1345// to the reduce function when Emit is called. 1346func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsigned(p *UnsignedPoint) { 1347 r.points = append(r.points, *p.Clone()) 1348} 1349 1350// AggregateUnsignedBulk performs a bulk copy of UnsignedPoints into the internal slice. 1351// This is a more efficient version of calling AggregateUnsigned on each point. 1352func (r *UnsignedSliceFuncBooleanReducer) AggregateUnsignedBulk(points []UnsignedPoint) { 1353 r.points = append(r.points, points...) 1354} 1355 1356// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1357// This method does not clear the points from the internal slice. 1358func (r *UnsignedSliceFuncBooleanReducer) Emit() []BooleanPoint { 1359 return r.fn(r.points) 1360} 1361 1362// UnsignedDistinctReducer returns the distinct points in a series. 1363type UnsignedDistinctReducer struct { 1364 m map[uint64]UnsignedPoint 1365} 1366 1367// NewUnsignedDistinctReducer creates a new UnsignedDistinctReducer. 1368func NewUnsignedDistinctReducer() *UnsignedDistinctReducer { 1369 return &UnsignedDistinctReducer{m: make(map[uint64]UnsignedPoint)} 1370} 1371 1372// AggregateUnsigned aggregates a point into the reducer. 1373func (r *UnsignedDistinctReducer) AggregateUnsigned(p *UnsignedPoint) { 1374 if _, ok := r.m[p.Value]; !ok { 1375 r.m[p.Value] = *p 1376 } 1377} 1378 1379// Emit emits the distinct points that have been aggregated into the reducer. 1380func (r *UnsignedDistinctReducer) Emit() []UnsignedPoint { 1381 points := make([]UnsignedPoint, 0, len(r.m)) 1382 for _, p := range r.m { 1383 points = append(points, UnsignedPoint{Time: p.Time, Value: p.Value}) 1384 } 1385 sort.Sort(unsignedPoints(points)) 1386 return points 1387} 1388 1389// UnsignedElapsedReducer calculates the elapsed of the aggregated points. 1390type UnsignedElapsedReducer struct { 1391 unitConversion int64 1392 prev UnsignedPoint 1393 curr UnsignedPoint 1394} 1395 1396// NewUnsignedElapsedReducer creates a new UnsignedElapsedReducer. 1397func NewUnsignedElapsedReducer(interval Interval) *UnsignedElapsedReducer { 1398 return &UnsignedElapsedReducer{ 1399 unitConversion: int64(interval.Duration), 1400 prev: UnsignedPoint{Nil: true}, 1401 curr: UnsignedPoint{Nil: true}, 1402 } 1403} 1404 1405// AggregateUnsigned aggregates a point into the reducer and updates the current window. 1406func (r *UnsignedElapsedReducer) AggregateUnsigned(p *UnsignedPoint) { 1407 r.prev = r.curr 1408 r.curr = *p 1409} 1410 1411// Emit emits the elapsed of the reducer at the current point. 1412func (r *UnsignedElapsedReducer) Emit() []IntegerPoint { 1413 if !r.prev.Nil { 1414 elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion 1415 return []IntegerPoint{ 1416 {Time: r.curr.Time, Value: elapsed}, 1417 } 1418 } 1419 return nil 1420} 1421 1422// UnsignedSampleReducer implements a reservoir sampling to calculate a random subset of points 1423type UnsignedSampleReducer struct { 1424 count int // how many points we've iterated over 1425 rng *rand.Rand // random number generator for each reducer 1426 1427 points unsignedPoints // the reservoir 1428} 1429 1430// NewUnsignedSampleReducer creates a new UnsignedSampleReducer 1431func NewUnsignedSampleReducer(size int) *UnsignedSampleReducer { 1432 return &UnsignedSampleReducer{ 1433 rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ 1434 points: make(unsignedPoints, size), 1435 } 1436} 1437 1438// AggregateUnsigned aggregates a point into the reducer. 1439func (r *UnsignedSampleReducer) AggregateUnsigned(p *UnsignedPoint) { 1440 r.count++ 1441 // Fill the reservoir with the first n points 1442 if r.count-1 < len(r.points) { 1443 p.CopyTo(&r.points[r.count-1]) 1444 return 1445 } 1446 1447 // Generate a random integer between 1 and the count and 1448 // if that number is less than the length of the slice 1449 // replace the point at that index rnd with p. 1450 rnd := r.rng.Intn(r.count) 1451 if rnd < len(r.points) { 1452 p.CopyTo(&r.points[rnd]) 1453 } 1454} 1455 1456// Emit emits the reservoir sample as many points. 1457func (r *UnsignedSampleReducer) Emit() []UnsignedPoint { 1458 min := len(r.points) 1459 if r.count < min { 1460 min = r.count 1461 } 1462 pts := r.points[:min] 1463 sort.Sort(pts) 1464 return pts 1465} 1466 1467// StringPointAggregator aggregates points to produce a single point. 1468type StringPointAggregator interface { 1469 AggregateString(p *StringPoint) 1470} 1471 1472// StringBulkPointAggregator aggregates multiple points at a time. 1473type StringBulkPointAggregator interface { 1474 AggregateStringBulk(points []StringPoint) 1475} 1476 1477// AggregateStringPoints feeds a slice of StringPoint into an 1478// aggregator. If the aggregator is a StringBulkPointAggregator, it will 1479// use the AggregateBulk method. 1480func AggregateStringPoints(a StringPointAggregator, points []StringPoint) { 1481 switch a := a.(type) { 1482 case StringBulkPointAggregator: 1483 a.AggregateStringBulk(points) 1484 default: 1485 for _, p := range points { 1486 a.AggregateString(&p) 1487 } 1488 } 1489} 1490 1491// StringPointEmitter produces a single point from an aggregate. 1492type StringPointEmitter interface { 1493 Emit() []StringPoint 1494} 1495 1496// StringReduceFloatFunc is the function called by a StringPoint reducer. 1497type StringReduceFloatFunc func(prev *FloatPoint, curr *StringPoint) (t int64, v float64, aux []interface{}) 1498 1499// StringFuncFloatReducer is a reducer that reduces 1500// the passed in points to a single point using a reduce function. 1501type StringFuncFloatReducer struct { 1502 prev *FloatPoint 1503 fn StringReduceFloatFunc 1504} 1505 1506// NewStringFuncFloatReducer creates a new StringFuncFloatReducer. 1507func NewStringFuncFloatReducer(fn StringReduceFloatFunc, prev *FloatPoint) *StringFuncFloatReducer { 1508 return &StringFuncFloatReducer{fn: fn, prev: prev} 1509} 1510 1511// AggregateString takes a StringPoint and invokes the reduce function with the 1512// current and new point to modify the current point. 1513func (r *StringFuncFloatReducer) AggregateString(p *StringPoint) { 1514 t, v, aux := r.fn(r.prev, p) 1515 if r.prev == nil { 1516 r.prev = &FloatPoint{} 1517 } 1518 r.prev.Time = t 1519 r.prev.Value = v 1520 r.prev.Aux = aux 1521 if p.Aggregated > 1 { 1522 r.prev.Aggregated += p.Aggregated 1523 } else { 1524 r.prev.Aggregated++ 1525 } 1526} 1527 1528// Emit emits the point that was generated when reducing the points fed in with AggregateString. 1529func (r *StringFuncFloatReducer) Emit() []FloatPoint { 1530 return []FloatPoint{*r.prev} 1531} 1532 1533// StringReduceFloatSliceFunc is the function called by a StringPoint reducer. 1534type StringReduceFloatSliceFunc func(a []StringPoint) []FloatPoint 1535 1536// StringSliceFuncFloatReducer is a reducer that aggregates 1537// the passed in points and then invokes the function to reduce the points when they are emitted. 1538type StringSliceFuncFloatReducer struct { 1539 points []StringPoint 1540 fn StringReduceFloatSliceFunc 1541} 1542 1543// NewStringSliceFuncFloatReducer creates a new StringSliceFuncFloatReducer. 1544func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceFuncFloatReducer { 1545 return &StringSliceFuncFloatReducer{fn: fn} 1546} 1547 1548// AggregateString copies the StringPoint into the internal slice to be passed 1549// to the reduce function when Emit is called. 1550func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) { 1551 r.points = append(r.points, *p.Clone()) 1552} 1553 1554// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. 1555// This is a more efficient version of calling AggregateString on each point. 1556func (r *StringSliceFuncFloatReducer) AggregateStringBulk(points []StringPoint) { 1557 r.points = append(r.points, points...) 1558} 1559 1560// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1561// This method does not clear the points from the internal slice. 1562func (r *StringSliceFuncFloatReducer) Emit() []FloatPoint { 1563 return r.fn(r.points) 1564} 1565 1566// StringReduceIntegerFunc is the function called by a StringPoint reducer. 1567type StringReduceIntegerFunc func(prev *IntegerPoint, curr *StringPoint) (t int64, v int64, aux []interface{}) 1568 1569// StringFuncIntegerReducer is a reducer that reduces 1570// the passed in points to a single point using a reduce function. 1571type StringFuncIntegerReducer struct { 1572 prev *IntegerPoint 1573 fn StringReduceIntegerFunc 1574} 1575 1576// NewStringFuncIntegerReducer creates a new StringFuncIntegerReducer. 1577func NewStringFuncIntegerReducer(fn StringReduceIntegerFunc, prev *IntegerPoint) *StringFuncIntegerReducer { 1578 return &StringFuncIntegerReducer{fn: fn, prev: prev} 1579} 1580 1581// AggregateString takes a StringPoint and invokes the reduce function with the 1582// current and new point to modify the current point. 1583func (r *StringFuncIntegerReducer) AggregateString(p *StringPoint) { 1584 t, v, aux := r.fn(r.prev, p) 1585 if r.prev == nil { 1586 r.prev = &IntegerPoint{} 1587 } 1588 r.prev.Time = t 1589 r.prev.Value = v 1590 r.prev.Aux = aux 1591 if p.Aggregated > 1 { 1592 r.prev.Aggregated += p.Aggregated 1593 } else { 1594 r.prev.Aggregated++ 1595 } 1596} 1597 1598// Emit emits the point that was generated when reducing the points fed in with AggregateString. 1599func (r *StringFuncIntegerReducer) Emit() []IntegerPoint { 1600 return []IntegerPoint{*r.prev} 1601} 1602 1603// StringReduceIntegerSliceFunc is the function called by a StringPoint reducer. 1604type StringReduceIntegerSliceFunc func(a []StringPoint) []IntegerPoint 1605 1606// StringSliceFuncIntegerReducer is a reducer that aggregates 1607// the passed in points and then invokes the function to reduce the points when they are emitted. 1608type StringSliceFuncIntegerReducer struct { 1609 points []StringPoint 1610 fn StringReduceIntegerSliceFunc 1611} 1612 1613// NewStringSliceFuncIntegerReducer creates a new StringSliceFuncIntegerReducer. 1614func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSliceFuncIntegerReducer { 1615 return &StringSliceFuncIntegerReducer{fn: fn} 1616} 1617 1618// AggregateString copies the StringPoint into the internal slice to be passed 1619// to the reduce function when Emit is called. 1620func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) { 1621 r.points = append(r.points, *p.Clone()) 1622} 1623 1624// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. 1625// This is a more efficient version of calling AggregateString on each point. 1626func (r *StringSliceFuncIntegerReducer) AggregateStringBulk(points []StringPoint) { 1627 r.points = append(r.points, points...) 1628} 1629 1630// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1631// This method does not clear the points from the internal slice. 1632func (r *StringSliceFuncIntegerReducer) Emit() []IntegerPoint { 1633 return r.fn(r.points) 1634} 1635 1636// StringReduceUnsignedFunc is the function called by a StringPoint reducer. 1637type StringReduceUnsignedFunc func(prev *UnsignedPoint, curr *StringPoint) (t int64, v uint64, aux []interface{}) 1638 1639// StringFuncUnsignedReducer is a reducer that reduces 1640// the passed in points to a single point using a reduce function. 1641type StringFuncUnsignedReducer struct { 1642 prev *UnsignedPoint 1643 fn StringReduceUnsignedFunc 1644} 1645 1646// NewStringFuncUnsignedReducer creates a new StringFuncUnsignedReducer. 1647func NewStringFuncUnsignedReducer(fn StringReduceUnsignedFunc, prev *UnsignedPoint) *StringFuncUnsignedReducer { 1648 return &StringFuncUnsignedReducer{fn: fn, prev: prev} 1649} 1650 1651// AggregateString takes a StringPoint and invokes the reduce function with the 1652// current and new point to modify the current point. 1653func (r *StringFuncUnsignedReducer) AggregateString(p *StringPoint) { 1654 t, v, aux := r.fn(r.prev, p) 1655 if r.prev == nil { 1656 r.prev = &UnsignedPoint{} 1657 } 1658 r.prev.Time = t 1659 r.prev.Value = v 1660 r.prev.Aux = aux 1661 if p.Aggregated > 1 { 1662 r.prev.Aggregated += p.Aggregated 1663 } else { 1664 r.prev.Aggregated++ 1665 } 1666} 1667 1668// Emit emits the point that was generated when reducing the points fed in with AggregateString. 1669func (r *StringFuncUnsignedReducer) Emit() []UnsignedPoint { 1670 return []UnsignedPoint{*r.prev} 1671} 1672 1673// StringReduceUnsignedSliceFunc is the function called by a StringPoint reducer. 1674type StringReduceUnsignedSliceFunc func(a []StringPoint) []UnsignedPoint 1675 1676// StringSliceFuncUnsignedReducer is a reducer that aggregates 1677// the passed in points and then invokes the function to reduce the points when they are emitted. 1678type StringSliceFuncUnsignedReducer struct { 1679 points []StringPoint 1680 fn StringReduceUnsignedSliceFunc 1681} 1682 1683// NewStringSliceFuncUnsignedReducer creates a new StringSliceFuncUnsignedReducer. 1684func NewStringSliceFuncUnsignedReducer(fn StringReduceUnsignedSliceFunc) *StringSliceFuncUnsignedReducer { 1685 return &StringSliceFuncUnsignedReducer{fn: fn} 1686} 1687 1688// AggregateString copies the StringPoint into the internal slice to be passed 1689// to the reduce function when Emit is called. 1690func (r *StringSliceFuncUnsignedReducer) AggregateString(p *StringPoint) { 1691 r.points = append(r.points, *p.Clone()) 1692} 1693 1694// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. 1695// This is a more efficient version of calling AggregateString on each point. 1696func (r *StringSliceFuncUnsignedReducer) AggregateStringBulk(points []StringPoint) { 1697 r.points = append(r.points, points...) 1698} 1699 1700// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1701// This method does not clear the points from the internal slice. 1702func (r *StringSliceFuncUnsignedReducer) Emit() []UnsignedPoint { 1703 return r.fn(r.points) 1704} 1705 1706// StringReduceFunc is the function called by a StringPoint reducer. 1707type StringReduceFunc func(prev *StringPoint, curr *StringPoint) (t int64, v string, aux []interface{}) 1708 1709// StringFuncReducer is a reducer that reduces 1710// the passed in points to a single point using a reduce function. 1711type StringFuncReducer struct { 1712 prev *StringPoint 1713 fn StringReduceFunc 1714} 1715 1716// NewStringFuncReducer creates a new StringFuncStringReducer. 1717func NewStringFuncReducer(fn StringReduceFunc, prev *StringPoint) *StringFuncReducer { 1718 return &StringFuncReducer{fn: fn, prev: prev} 1719} 1720 1721// AggregateString takes a StringPoint and invokes the reduce function with the 1722// current and new point to modify the current point. 1723func (r *StringFuncReducer) AggregateString(p *StringPoint) { 1724 t, v, aux := r.fn(r.prev, p) 1725 if r.prev == nil { 1726 r.prev = &StringPoint{} 1727 } 1728 r.prev.Time = t 1729 r.prev.Value = v 1730 r.prev.Aux = aux 1731 if p.Aggregated > 1 { 1732 r.prev.Aggregated += p.Aggregated 1733 } else { 1734 r.prev.Aggregated++ 1735 } 1736} 1737 1738// Emit emits the point that was generated when reducing the points fed in with AggregateString. 1739func (r *StringFuncReducer) Emit() []StringPoint { 1740 return []StringPoint{*r.prev} 1741} 1742 1743// StringReduceSliceFunc is the function called by a StringPoint reducer. 1744type StringReduceSliceFunc func(a []StringPoint) []StringPoint 1745 1746// StringSliceFuncReducer is a reducer that aggregates 1747// the passed in points and then invokes the function to reduce the points when they are emitted. 1748type StringSliceFuncReducer struct { 1749 points []StringPoint 1750 fn StringReduceSliceFunc 1751} 1752 1753// NewStringSliceFuncReducer creates a new StringSliceFuncReducer. 1754func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer { 1755 return &StringSliceFuncReducer{fn: fn} 1756} 1757 1758// AggregateString copies the StringPoint into the internal slice to be passed 1759// to the reduce function when Emit is called. 1760func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) { 1761 r.points = append(r.points, *p.Clone()) 1762} 1763 1764// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. 1765// This is a more efficient version of calling AggregateString on each point. 1766func (r *StringSliceFuncReducer) AggregateStringBulk(points []StringPoint) { 1767 r.points = append(r.points, points...) 1768} 1769 1770// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1771// This method does not clear the points from the internal slice. 1772func (r *StringSliceFuncReducer) Emit() []StringPoint { 1773 return r.fn(r.points) 1774} 1775 1776// StringReduceBooleanFunc is the function called by a StringPoint reducer. 1777type StringReduceBooleanFunc func(prev *BooleanPoint, curr *StringPoint) (t int64, v bool, aux []interface{}) 1778 1779// StringFuncBooleanReducer is a reducer that reduces 1780// the passed in points to a single point using a reduce function. 1781type StringFuncBooleanReducer struct { 1782 prev *BooleanPoint 1783 fn StringReduceBooleanFunc 1784} 1785 1786// NewStringFuncBooleanReducer creates a new StringFuncBooleanReducer. 1787func NewStringFuncBooleanReducer(fn StringReduceBooleanFunc, prev *BooleanPoint) *StringFuncBooleanReducer { 1788 return &StringFuncBooleanReducer{fn: fn, prev: prev} 1789} 1790 1791// AggregateString takes a StringPoint and invokes the reduce function with the 1792// current and new point to modify the current point. 1793func (r *StringFuncBooleanReducer) AggregateString(p *StringPoint) { 1794 t, v, aux := r.fn(r.prev, p) 1795 if r.prev == nil { 1796 r.prev = &BooleanPoint{} 1797 } 1798 r.prev.Time = t 1799 r.prev.Value = v 1800 r.prev.Aux = aux 1801 if p.Aggregated > 1 { 1802 r.prev.Aggregated += p.Aggregated 1803 } else { 1804 r.prev.Aggregated++ 1805 } 1806} 1807 1808// Emit emits the point that was generated when reducing the points fed in with AggregateString. 1809func (r *StringFuncBooleanReducer) Emit() []BooleanPoint { 1810 return []BooleanPoint{*r.prev} 1811} 1812 1813// StringReduceBooleanSliceFunc is the function called by a StringPoint reducer. 1814type StringReduceBooleanSliceFunc func(a []StringPoint) []BooleanPoint 1815 1816// StringSliceFuncBooleanReducer is a reducer that aggregates 1817// the passed in points and then invokes the function to reduce the points when they are emitted. 1818type StringSliceFuncBooleanReducer struct { 1819 points []StringPoint 1820 fn StringReduceBooleanSliceFunc 1821} 1822 1823// NewStringSliceFuncBooleanReducer creates a new StringSliceFuncBooleanReducer. 1824func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSliceFuncBooleanReducer { 1825 return &StringSliceFuncBooleanReducer{fn: fn} 1826} 1827 1828// AggregateString copies the StringPoint into the internal slice to be passed 1829// to the reduce function when Emit is called. 1830func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) { 1831 r.points = append(r.points, *p.Clone()) 1832} 1833 1834// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. 1835// This is a more efficient version of calling AggregateString on each point. 1836func (r *StringSliceFuncBooleanReducer) AggregateStringBulk(points []StringPoint) { 1837 r.points = append(r.points, points...) 1838} 1839 1840// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 1841// This method does not clear the points from the internal slice. 1842func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint { 1843 return r.fn(r.points) 1844} 1845 1846// StringDistinctReducer returns the distinct points in a series. 1847type StringDistinctReducer struct { 1848 m map[string]StringPoint 1849} 1850 1851// NewStringDistinctReducer creates a new StringDistinctReducer. 1852func NewStringDistinctReducer() *StringDistinctReducer { 1853 return &StringDistinctReducer{m: make(map[string]StringPoint)} 1854} 1855 1856// AggregateString aggregates a point into the reducer. 1857func (r *StringDistinctReducer) AggregateString(p *StringPoint) { 1858 if _, ok := r.m[p.Value]; !ok { 1859 r.m[p.Value] = *p 1860 } 1861} 1862 1863// Emit emits the distinct points that have been aggregated into the reducer. 1864func (r *StringDistinctReducer) Emit() []StringPoint { 1865 points := make([]StringPoint, 0, len(r.m)) 1866 for _, p := range r.m { 1867 points = append(points, StringPoint{Time: p.Time, Value: p.Value}) 1868 } 1869 sort.Sort(stringPoints(points)) 1870 return points 1871} 1872 1873// StringElapsedReducer calculates the elapsed of the aggregated points. 1874type StringElapsedReducer struct { 1875 unitConversion int64 1876 prev StringPoint 1877 curr StringPoint 1878} 1879 1880// NewStringElapsedReducer creates a new StringElapsedReducer. 1881func NewStringElapsedReducer(interval Interval) *StringElapsedReducer { 1882 return &StringElapsedReducer{ 1883 unitConversion: int64(interval.Duration), 1884 prev: StringPoint{Nil: true}, 1885 curr: StringPoint{Nil: true}, 1886 } 1887} 1888 1889// AggregateString aggregates a point into the reducer and updates the current window. 1890func (r *StringElapsedReducer) AggregateString(p *StringPoint) { 1891 r.prev = r.curr 1892 r.curr = *p 1893} 1894 1895// Emit emits the elapsed of the reducer at the current point. 1896func (r *StringElapsedReducer) Emit() []IntegerPoint { 1897 if !r.prev.Nil { 1898 elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion 1899 return []IntegerPoint{ 1900 {Time: r.curr.Time, Value: elapsed}, 1901 } 1902 } 1903 return nil 1904} 1905 1906// StringSampleReducer implements a reservoir sampling to calculate a random subset of points 1907type StringSampleReducer struct { 1908 count int // how many points we've iterated over 1909 rng *rand.Rand // random number generator for each reducer 1910 1911 points stringPoints // the reservoir 1912} 1913 1914// NewStringSampleReducer creates a new StringSampleReducer 1915func NewStringSampleReducer(size int) *StringSampleReducer { 1916 return &StringSampleReducer{ 1917 rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ 1918 points: make(stringPoints, size), 1919 } 1920} 1921 1922// AggregateString aggregates a point into the reducer. 1923func (r *StringSampleReducer) AggregateString(p *StringPoint) { 1924 r.count++ 1925 // Fill the reservoir with the first n points 1926 if r.count-1 < len(r.points) { 1927 p.CopyTo(&r.points[r.count-1]) 1928 return 1929 } 1930 1931 // Generate a random integer between 1 and the count and 1932 // if that number is less than the length of the slice 1933 // replace the point at that index rnd with p. 1934 rnd := r.rng.Intn(r.count) 1935 if rnd < len(r.points) { 1936 p.CopyTo(&r.points[rnd]) 1937 } 1938} 1939 1940// Emit emits the reservoir sample as many points. 1941func (r *StringSampleReducer) Emit() []StringPoint { 1942 min := len(r.points) 1943 if r.count < min { 1944 min = r.count 1945 } 1946 pts := r.points[:min] 1947 sort.Sort(pts) 1948 return pts 1949} 1950 1951// BooleanPointAggregator aggregates points to produce a single point. 1952type BooleanPointAggregator interface { 1953 AggregateBoolean(p *BooleanPoint) 1954} 1955 1956// BooleanBulkPointAggregator aggregates multiple points at a time. 1957type BooleanBulkPointAggregator interface { 1958 AggregateBooleanBulk(points []BooleanPoint) 1959} 1960 1961// AggregateBooleanPoints feeds a slice of BooleanPoint into an 1962// aggregator. If the aggregator is a BooleanBulkPointAggregator, it will 1963// use the AggregateBulk method. 1964func AggregateBooleanPoints(a BooleanPointAggregator, points []BooleanPoint) { 1965 switch a := a.(type) { 1966 case BooleanBulkPointAggregator: 1967 a.AggregateBooleanBulk(points) 1968 default: 1969 for _, p := range points { 1970 a.AggregateBoolean(&p) 1971 } 1972 } 1973} 1974 1975// BooleanPointEmitter produces a single point from an aggregate. 1976type BooleanPointEmitter interface { 1977 Emit() []BooleanPoint 1978} 1979 1980// BooleanReduceFloatFunc is the function called by a BooleanPoint reducer. 1981type BooleanReduceFloatFunc func(prev *FloatPoint, curr *BooleanPoint) (t int64, v float64, aux []interface{}) 1982 1983// BooleanFuncFloatReducer is a reducer that reduces 1984// the passed in points to a single point using a reduce function. 1985type BooleanFuncFloatReducer struct { 1986 prev *FloatPoint 1987 fn BooleanReduceFloatFunc 1988} 1989 1990// NewBooleanFuncFloatReducer creates a new BooleanFuncFloatReducer. 1991func NewBooleanFuncFloatReducer(fn BooleanReduceFloatFunc, prev *FloatPoint) *BooleanFuncFloatReducer { 1992 return &BooleanFuncFloatReducer{fn: fn, prev: prev} 1993} 1994 1995// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the 1996// current and new point to modify the current point. 1997func (r *BooleanFuncFloatReducer) AggregateBoolean(p *BooleanPoint) { 1998 t, v, aux := r.fn(r.prev, p) 1999 if r.prev == nil { 2000 r.prev = &FloatPoint{} 2001 } 2002 r.prev.Time = t 2003 r.prev.Value = v 2004 r.prev.Aux = aux 2005 if p.Aggregated > 1 { 2006 r.prev.Aggregated += p.Aggregated 2007 } else { 2008 r.prev.Aggregated++ 2009 } 2010} 2011 2012// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean. 2013func (r *BooleanFuncFloatReducer) Emit() []FloatPoint { 2014 return []FloatPoint{*r.prev} 2015} 2016 2017// BooleanReduceFloatSliceFunc is the function called by a BooleanPoint reducer. 2018type BooleanReduceFloatSliceFunc func(a []BooleanPoint) []FloatPoint 2019 2020// BooleanSliceFuncFloatReducer is a reducer that aggregates 2021// the passed in points and then invokes the function to reduce the points when they are emitted. 2022type BooleanSliceFuncFloatReducer struct { 2023 points []BooleanPoint 2024 fn BooleanReduceFloatSliceFunc 2025} 2026 2027// NewBooleanSliceFuncFloatReducer creates a new BooleanSliceFuncFloatReducer. 2028func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSliceFuncFloatReducer { 2029 return &BooleanSliceFuncFloatReducer{fn: fn} 2030} 2031 2032// AggregateBoolean copies the BooleanPoint into the internal slice to be passed 2033// to the reduce function when Emit is called. 2034func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) { 2035 r.points = append(r.points, *p.Clone()) 2036} 2037 2038// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. 2039// This is a more efficient version of calling AggregateBoolean on each point. 2040func (r *BooleanSliceFuncFloatReducer) AggregateBooleanBulk(points []BooleanPoint) { 2041 r.points = append(r.points, points...) 2042} 2043 2044// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 2045// This method does not clear the points from the internal slice. 2046func (r *BooleanSliceFuncFloatReducer) Emit() []FloatPoint { 2047 return r.fn(r.points) 2048} 2049 2050// BooleanReduceIntegerFunc is the function called by a BooleanPoint reducer. 2051type BooleanReduceIntegerFunc func(prev *IntegerPoint, curr *BooleanPoint) (t int64, v int64, aux []interface{}) 2052 2053// BooleanFuncIntegerReducer is a reducer that reduces 2054// the passed in points to a single point using a reduce function. 2055type BooleanFuncIntegerReducer struct { 2056 prev *IntegerPoint 2057 fn BooleanReduceIntegerFunc 2058} 2059 2060// NewBooleanFuncIntegerReducer creates a new BooleanFuncIntegerReducer. 2061func NewBooleanFuncIntegerReducer(fn BooleanReduceIntegerFunc, prev *IntegerPoint) *BooleanFuncIntegerReducer { 2062 return &BooleanFuncIntegerReducer{fn: fn, prev: prev} 2063} 2064 2065// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the 2066// current and new point to modify the current point. 2067func (r *BooleanFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) { 2068 t, v, aux := r.fn(r.prev, p) 2069 if r.prev == nil { 2070 r.prev = &IntegerPoint{} 2071 } 2072 r.prev.Time = t 2073 r.prev.Value = v 2074 r.prev.Aux = aux 2075 if p.Aggregated > 1 { 2076 r.prev.Aggregated += p.Aggregated 2077 } else { 2078 r.prev.Aggregated++ 2079 } 2080} 2081 2082// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean. 2083func (r *BooleanFuncIntegerReducer) Emit() []IntegerPoint { 2084 return []IntegerPoint{*r.prev} 2085} 2086 2087// BooleanReduceIntegerSliceFunc is the function called by a BooleanPoint reducer. 2088type BooleanReduceIntegerSliceFunc func(a []BooleanPoint) []IntegerPoint 2089 2090// BooleanSliceFuncIntegerReducer is a reducer that aggregates 2091// the passed in points and then invokes the function to reduce the points when they are emitted. 2092type BooleanSliceFuncIntegerReducer struct { 2093 points []BooleanPoint 2094 fn BooleanReduceIntegerSliceFunc 2095} 2096 2097// NewBooleanSliceFuncIntegerReducer creates a new BooleanSliceFuncIntegerReducer. 2098func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *BooleanSliceFuncIntegerReducer { 2099 return &BooleanSliceFuncIntegerReducer{fn: fn} 2100} 2101 2102// AggregateBoolean copies the BooleanPoint into the internal slice to be passed 2103// to the reduce function when Emit is called. 2104func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) { 2105 r.points = append(r.points, *p.Clone()) 2106} 2107 2108// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. 2109// This is a more efficient version of calling AggregateBoolean on each point. 2110func (r *BooleanSliceFuncIntegerReducer) AggregateBooleanBulk(points []BooleanPoint) { 2111 r.points = append(r.points, points...) 2112} 2113 2114// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 2115// This method does not clear the points from the internal slice. 2116func (r *BooleanSliceFuncIntegerReducer) Emit() []IntegerPoint { 2117 return r.fn(r.points) 2118} 2119 2120// BooleanReduceUnsignedFunc is the function called by a BooleanPoint reducer. 2121type BooleanReduceUnsignedFunc func(prev *UnsignedPoint, curr *BooleanPoint) (t int64, v uint64, aux []interface{}) 2122 2123// BooleanFuncUnsignedReducer is a reducer that reduces 2124// the passed in points to a single point using a reduce function. 2125type BooleanFuncUnsignedReducer struct { 2126 prev *UnsignedPoint 2127 fn BooleanReduceUnsignedFunc 2128} 2129 2130// NewBooleanFuncUnsignedReducer creates a new BooleanFuncUnsignedReducer. 2131func NewBooleanFuncUnsignedReducer(fn BooleanReduceUnsignedFunc, prev *UnsignedPoint) *BooleanFuncUnsignedReducer { 2132 return &BooleanFuncUnsignedReducer{fn: fn, prev: prev} 2133} 2134 2135// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the 2136// current and new point to modify the current point. 2137func (r *BooleanFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) { 2138 t, v, aux := r.fn(r.prev, p) 2139 if r.prev == nil { 2140 r.prev = &UnsignedPoint{} 2141 } 2142 r.prev.Time = t 2143 r.prev.Value = v 2144 r.prev.Aux = aux 2145 if p.Aggregated > 1 { 2146 r.prev.Aggregated += p.Aggregated 2147 } else { 2148 r.prev.Aggregated++ 2149 } 2150} 2151 2152// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean. 2153func (r *BooleanFuncUnsignedReducer) Emit() []UnsignedPoint { 2154 return []UnsignedPoint{*r.prev} 2155} 2156 2157// BooleanReduceUnsignedSliceFunc is the function called by a BooleanPoint reducer. 2158type BooleanReduceUnsignedSliceFunc func(a []BooleanPoint) []UnsignedPoint 2159 2160// BooleanSliceFuncUnsignedReducer is a reducer that aggregates 2161// the passed in points and then invokes the function to reduce the points when they are emitted. 2162type BooleanSliceFuncUnsignedReducer struct { 2163 points []BooleanPoint 2164 fn BooleanReduceUnsignedSliceFunc 2165} 2166 2167// NewBooleanSliceFuncUnsignedReducer creates a new BooleanSliceFuncUnsignedReducer. 2168func NewBooleanSliceFuncUnsignedReducer(fn BooleanReduceUnsignedSliceFunc) *BooleanSliceFuncUnsignedReducer { 2169 return &BooleanSliceFuncUnsignedReducer{fn: fn} 2170} 2171 2172// AggregateBoolean copies the BooleanPoint into the internal slice to be passed 2173// to the reduce function when Emit is called. 2174func (r *BooleanSliceFuncUnsignedReducer) AggregateBoolean(p *BooleanPoint) { 2175 r.points = append(r.points, *p.Clone()) 2176} 2177 2178// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. 2179// This is a more efficient version of calling AggregateBoolean on each point. 2180func (r *BooleanSliceFuncUnsignedReducer) AggregateBooleanBulk(points []BooleanPoint) { 2181 r.points = append(r.points, points...) 2182} 2183 2184// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 2185// This method does not clear the points from the internal slice. 2186func (r *BooleanSliceFuncUnsignedReducer) Emit() []UnsignedPoint { 2187 return r.fn(r.points) 2188} 2189 2190// BooleanReduceStringFunc is the function called by a BooleanPoint reducer. 2191type BooleanReduceStringFunc func(prev *StringPoint, curr *BooleanPoint) (t int64, v string, aux []interface{}) 2192 2193// BooleanFuncStringReducer is a reducer that reduces 2194// the passed in points to a single point using a reduce function. 2195type BooleanFuncStringReducer struct { 2196 prev *StringPoint 2197 fn BooleanReduceStringFunc 2198} 2199 2200// NewBooleanFuncStringReducer creates a new BooleanFuncStringReducer. 2201func NewBooleanFuncStringReducer(fn BooleanReduceStringFunc, prev *StringPoint) *BooleanFuncStringReducer { 2202 return &BooleanFuncStringReducer{fn: fn, prev: prev} 2203} 2204 2205// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the 2206// current and new point to modify the current point. 2207func (r *BooleanFuncStringReducer) AggregateBoolean(p *BooleanPoint) { 2208 t, v, aux := r.fn(r.prev, p) 2209 if r.prev == nil { 2210 r.prev = &StringPoint{} 2211 } 2212 r.prev.Time = t 2213 r.prev.Value = v 2214 r.prev.Aux = aux 2215 if p.Aggregated > 1 { 2216 r.prev.Aggregated += p.Aggregated 2217 } else { 2218 r.prev.Aggregated++ 2219 } 2220} 2221 2222// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean. 2223func (r *BooleanFuncStringReducer) Emit() []StringPoint { 2224 return []StringPoint{*r.prev} 2225} 2226 2227// BooleanReduceStringSliceFunc is the function called by a BooleanPoint reducer. 2228type BooleanReduceStringSliceFunc func(a []BooleanPoint) []StringPoint 2229 2230// BooleanSliceFuncStringReducer is a reducer that aggregates 2231// the passed in points and then invokes the function to reduce the points when they are emitted. 2232type BooleanSliceFuncStringReducer struct { 2233 points []BooleanPoint 2234 fn BooleanReduceStringSliceFunc 2235} 2236 2237// NewBooleanSliceFuncStringReducer creates a new BooleanSliceFuncStringReducer. 2238func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanSliceFuncStringReducer { 2239 return &BooleanSliceFuncStringReducer{fn: fn} 2240} 2241 2242// AggregateBoolean copies the BooleanPoint into the internal slice to be passed 2243// to the reduce function when Emit is called. 2244func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) { 2245 r.points = append(r.points, *p.Clone()) 2246} 2247 2248// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. 2249// This is a more efficient version of calling AggregateBoolean on each point. 2250func (r *BooleanSliceFuncStringReducer) AggregateBooleanBulk(points []BooleanPoint) { 2251 r.points = append(r.points, points...) 2252} 2253 2254// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 2255// This method does not clear the points from the internal slice. 2256func (r *BooleanSliceFuncStringReducer) Emit() []StringPoint { 2257 return r.fn(r.points) 2258} 2259 2260// BooleanReduceFunc is the function called by a BooleanPoint reducer. 2261type BooleanReduceFunc func(prev *BooleanPoint, curr *BooleanPoint) (t int64, v bool, aux []interface{}) 2262 2263// BooleanFuncReducer is a reducer that reduces 2264// the passed in points to a single point using a reduce function. 2265type BooleanFuncReducer struct { 2266 prev *BooleanPoint 2267 fn BooleanReduceFunc 2268} 2269 2270// NewBooleanFuncReducer creates a new BooleanFuncBooleanReducer. 2271func NewBooleanFuncReducer(fn BooleanReduceFunc, prev *BooleanPoint) *BooleanFuncReducer { 2272 return &BooleanFuncReducer{fn: fn, prev: prev} 2273} 2274 2275// AggregateBoolean takes a BooleanPoint and invokes the reduce function with the 2276// current and new point to modify the current point. 2277func (r *BooleanFuncReducer) AggregateBoolean(p *BooleanPoint) { 2278 t, v, aux := r.fn(r.prev, p) 2279 if r.prev == nil { 2280 r.prev = &BooleanPoint{} 2281 } 2282 r.prev.Time = t 2283 r.prev.Value = v 2284 r.prev.Aux = aux 2285 if p.Aggregated > 1 { 2286 r.prev.Aggregated += p.Aggregated 2287 } else { 2288 r.prev.Aggregated++ 2289 } 2290} 2291 2292// Emit emits the point that was generated when reducing the points fed in with AggregateBoolean. 2293func (r *BooleanFuncReducer) Emit() []BooleanPoint { 2294 return []BooleanPoint{*r.prev} 2295} 2296 2297// BooleanReduceSliceFunc is the function called by a BooleanPoint reducer. 2298type BooleanReduceSliceFunc func(a []BooleanPoint) []BooleanPoint 2299 2300// BooleanSliceFuncReducer is a reducer that aggregates 2301// the passed in points and then invokes the function to reduce the points when they are emitted. 2302type BooleanSliceFuncReducer struct { 2303 points []BooleanPoint 2304 fn BooleanReduceSliceFunc 2305} 2306 2307// NewBooleanSliceFuncReducer creates a new BooleanSliceFuncReducer. 2308func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncReducer { 2309 return &BooleanSliceFuncReducer{fn: fn} 2310} 2311 2312// AggregateBoolean copies the BooleanPoint into the internal slice to be passed 2313// to the reduce function when Emit is called. 2314func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) { 2315 r.points = append(r.points, *p.Clone()) 2316} 2317 2318// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. 2319// This is a more efficient version of calling AggregateBoolean on each point. 2320func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) { 2321 r.points = append(r.points, points...) 2322} 2323 2324// Emit invokes the reduce function on the aggregated points to generate the aggregated points. 2325// This method does not clear the points from the internal slice. 2326func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint { 2327 return r.fn(r.points) 2328} 2329 2330// BooleanDistinctReducer returns the distinct points in a series. 2331type BooleanDistinctReducer struct { 2332 m map[bool]BooleanPoint 2333} 2334 2335// NewBooleanDistinctReducer creates a new BooleanDistinctReducer. 2336func NewBooleanDistinctReducer() *BooleanDistinctReducer { 2337 return &BooleanDistinctReducer{m: make(map[bool]BooleanPoint)} 2338} 2339 2340// AggregateBoolean aggregates a point into the reducer. 2341func (r *BooleanDistinctReducer) AggregateBoolean(p *BooleanPoint) { 2342 if _, ok := r.m[p.Value]; !ok { 2343 r.m[p.Value] = *p 2344 } 2345} 2346 2347// Emit emits the distinct points that have been aggregated into the reducer. 2348func (r *BooleanDistinctReducer) Emit() []BooleanPoint { 2349 points := make([]BooleanPoint, 0, len(r.m)) 2350 for _, p := range r.m { 2351 points = append(points, BooleanPoint{Time: p.Time, Value: p.Value}) 2352 } 2353 sort.Sort(booleanPoints(points)) 2354 return points 2355} 2356 2357// BooleanElapsedReducer calculates the elapsed of the aggregated points. 2358type BooleanElapsedReducer struct { 2359 unitConversion int64 2360 prev BooleanPoint 2361 curr BooleanPoint 2362} 2363 2364// NewBooleanElapsedReducer creates a new BooleanElapsedReducer. 2365func NewBooleanElapsedReducer(interval Interval) *BooleanElapsedReducer { 2366 return &BooleanElapsedReducer{ 2367 unitConversion: int64(interval.Duration), 2368 prev: BooleanPoint{Nil: true}, 2369 curr: BooleanPoint{Nil: true}, 2370 } 2371} 2372 2373// AggregateBoolean aggregates a point into the reducer and updates the current window. 2374func (r *BooleanElapsedReducer) AggregateBoolean(p *BooleanPoint) { 2375 r.prev = r.curr 2376 r.curr = *p 2377} 2378 2379// Emit emits the elapsed of the reducer at the current point. 2380func (r *BooleanElapsedReducer) Emit() []IntegerPoint { 2381 if !r.prev.Nil { 2382 elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion 2383 return []IntegerPoint{ 2384 {Time: r.curr.Time, Value: elapsed}, 2385 } 2386 } 2387 return nil 2388} 2389 2390// BooleanSampleReducer implements a reservoir sampling to calculate a random subset of points 2391type BooleanSampleReducer struct { 2392 count int // how many points we've iterated over 2393 rng *rand.Rand // random number generator for each reducer 2394 2395 points booleanPoints // the reservoir 2396} 2397 2398// NewBooleanSampleReducer creates a new BooleanSampleReducer 2399func NewBooleanSampleReducer(size int) *BooleanSampleReducer { 2400 return &BooleanSampleReducer{ 2401 rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/ 2402 points: make(booleanPoints, size), 2403 } 2404} 2405 2406// AggregateBoolean aggregates a point into the reducer. 2407func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) { 2408 r.count++ 2409 // Fill the reservoir with the first n points 2410 if r.count-1 < len(r.points) { 2411 p.CopyTo(&r.points[r.count-1]) 2412 return 2413 } 2414 2415 // Generate a random integer between 1 and the count and 2416 // if that number is less than the length of the slice 2417 // replace the point at that index rnd with p. 2418 rnd := r.rng.Intn(r.count) 2419 if rnd < len(r.points) { 2420 p.CopyTo(&r.points[rnd]) 2421 } 2422} 2423 2424// Emit emits the reservoir sample as many points. 2425func (r *BooleanSampleReducer) Emit() []BooleanPoint { 2426 min := len(r.points) 2427 if r.count < min { 2428 min = r.count 2429 } 2430 pts := r.points[:min] 2431 sort.Sort(pts) 2432 return pts 2433} 2434