1package query 2 3import ( 4 "context" 5 "container/heap" 6 "io" 7 "sort" 8 "sync" 9 "time" 10 "sync" 11 12 "github.com/gogo/protobuf/proto" 13 "github.com/influxdata/influxql" 14) 15 16// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval. 17const DefaultStatsInterval = time.Second 18 19{{with $types := .}}{{range $k := $types}} 20 21// {{$k.Name}}Iterator represents a stream of {{$k.name}} points. 22type {{$k.Name}}Iterator interface { 23 Iterator 24 Next() (*{{$k.Name}}Point, error) 25} 26 27// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator. 28// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot 29// be cast to a {{$k.Name}}Iterator. 30func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator { 31 a := make([]{{$k.Name}}Iterator, 0, len(itrs)) 32 for _, itr := range itrs { 33 switch itr := itr.(type) { 34 case {{$k.Name}}Iterator: 35 a = append(a, itr) 36 default: 37 itr.Close() 38 } 39 } 40 return a 41} 42 43 44// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator. 45type buf{{$k.Name}}Iterator struct { 46 itr {{$k.Name}}Iterator 47 buf *{{$k.Name}}Point 48} 49 50// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator. 51func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator { 52 return &buf{{$k.Name}}Iterator{itr: itr} 53} 54 55// Stats returns statistics from the input iterator. 56func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() } 57 58// Close closes the underlying iterator. 59func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() } 60 61// peek returns the next point without removing it from the iterator. 62func (itr *buf{{$k.Name}}Iterator) peek() (*{{$k.Name}}Point, error) { 63 p, err := itr.Next() 64 if err != nil { 65 return nil, err 66 } 67 itr.unread(p) 68 return p, nil 69} 70 71// peekTime returns the time of the next point. 72// Returns zero time if no more points available. 73func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) { 74 p, err := itr.peek() 75 if p == nil || err != nil { 76 return ZeroTime, err 77 } 78 return p.Time, nil 79} 80 81// Next returns the current buffer, if exists, or calls the underlying iterator. 82func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) { 83 buf := itr.buf 84 if buf != nil { 85 itr.buf = nil 86 return buf, nil 87 } 88 return itr.itr.Next() 89} 90 91// NextInWindow returns the next value if it is between [startTime, endTime). 92// If the next value is outside the range then it is moved to the buffer. 93func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k.Name}}Point, error) { 94 v, err := itr.Next() 95 if v == nil || err != nil { 96 return nil, err 97 } else if t := v.Time; t >= endTime || t < startTime { 98 itr.unread(v) 99 return nil, nil 100 } 101 return v, nil 102} 103 104// unread sets v to the buffer. It is read on the next call to Next(). 105func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v } 106 107// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators. 108type {{$k.name}}MergeIterator struct { 109 inputs []{{$k.Name}}Iterator 110 heap *{{$k.name}}MergeHeap 111 init bool 112 113 closed bool 114 mu sync.RWMutex 115 116 // Current iterator and window. 117 curr *{{$k.name}}MergeHeapItem 118 window struct { 119 name string 120 tags string 121 startTime int64 122 endTime int64 123 } 124} 125 126// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator. 127func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator { 128 itr := &{{$k.name}}MergeIterator{ 129 inputs: inputs, 130 heap: &{{$k.name}}MergeHeap{ 131 items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)), 132 opt: opt, 133 }, 134 } 135 136 // Initialize heap items. 137 for _, input := range inputs { 138 // Wrap in buffer, ignore any inputs without anymore points. 139 bufInput := newBuf{{$k.Name}}Iterator(input) 140 141 // Append to the heap. 142 itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput}) 143 } 144 145 return itr 146} 147 148// Stats returns an aggregation of stats from the underlying iterators. 149func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats { 150 var stats IteratorStats 151 for _, input := range itr.inputs { 152 stats.Add(input.Stats()) 153 } 154 return stats 155} 156 157// Close closes the underlying iterators. 158func (itr *{{$k.name}}MergeIterator) Close() error { 159 itr.mu.Lock() 160 defer itr.mu.Unlock() 161 162 for _, input := range itr.inputs { 163 input.Close() 164 } 165 itr.curr = nil 166 itr.inputs = nil 167 itr.heap.items = nil 168 itr.closed = true 169 return nil 170} 171 172// Next returns the next point from the iterator. 173func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) { 174 itr.mu.RLock() 175 defer itr.mu.RUnlock() 176 if itr.closed { 177 return nil, nil 178 } 179 180 // Initialize the heap. This needs to be done lazily on the first call to this iterator 181 // so that iterator initialization done through the Select() call returns quickly. 182 // Queries can only be interrupted after the Select() call completes so any operations 183 // done during iterator creation cannot be interrupted, which is why we do it here 184 // instead so an interrupt can happen while initializing the heap. 185 if !itr.init { 186 items := itr.heap.items 187 itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items)) 188 for _, item := range items { 189 if p, err := item.itr.peek(); err != nil { 190 return nil, err 191 } else if p == nil { 192 continue 193 } 194 itr.heap.items = append(itr.heap.items, item) 195 } 196 heap.Init(itr.heap) 197 itr.init = true 198 } 199 200 for { 201 // Retrieve the next iterator if we don't have one. 202 if itr.curr == nil { 203 if len(itr.heap.items) == 0 { 204 return nil, nil 205 } 206 itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem) 207 208 // Read point and set current window. 209 p, err := itr.curr.itr.Next() 210 if err != nil { 211 return nil, err 212 } 213 tags := p.Tags.Subset(itr.heap.opt.Dimensions) 214 itr.window.name, itr.window.tags = p.Name, tags.ID() 215 itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time) 216 return p, nil 217 } 218 219 // Read the next point from the current iterator. 220 p, err := itr.curr.itr.Next() 221 if err != nil { 222 return nil, err 223 } 224 225 // If there are no more points then remove iterator from heap and find next. 226 if p == nil { 227 itr.curr = nil 228 continue 229 } 230 231 // Check if the point is inside of our current window. 232 inWindow := true 233 if window := itr.window; window.name != p.Name { 234 inWindow = false 235 } else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() { 236 inWindow = false 237 } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime { 238 inWindow = false 239 } else if !opt.Ascending && p.Time < window.startTime { 240 inWindow = false 241 } 242 243 // If it's outside our window then push iterator back on the heap and find new iterator. 244 if !inWindow { 245 itr.curr.itr.unread(p) 246 heap.Push(itr.heap, itr.curr) 247 itr.curr = nil 248 continue 249 } 250 251 return p, nil 252 } 253} 254 255// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems. 256// Items are sorted by their next window and then by name/tags. 257type {{$k.name}}MergeHeap struct { 258 opt IteratorOptions 259 items []*{{$k.name}}MergeHeapItem 260} 261 262func (h *{{$k.name}}MergeHeap) Len() int { return len(h.items) } 263func (h *{{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] } 264func (h *{{$k.name}}MergeHeap) Less(i, j int) bool { 265 x, err := h.items[i].itr.peek() 266 if err != nil { 267 return true 268 } 269 y, err := h.items[j].itr.peek() 270 if err != nil { 271 return false 272 } 273 274 if h.opt.Ascending { 275 if x.Name != y.Name { 276 return x.Name < y.Name 277 } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() { 278 return xTags.ID() < yTags.ID() 279 } 280 } else { 281 if x.Name != y.Name { 282 return x.Name > y.Name 283 } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() { 284 return xTags.ID() > yTags.ID() 285 } 286 } 287 288 xt, _ := h.opt.Window(x.Time) 289 yt, _ := h.opt.Window(y.Time) 290 291 if h.opt.Ascending { 292 return xt < yt 293 } 294 return xt > yt 295} 296 297 298func (h *{{$k.name}}MergeHeap) Push(x interface{}) { 299 h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem)) 300} 301 302func (h *{{$k.name}}MergeHeap) Pop() interface{} { 303 old := h.items 304 n := len(old) 305 item := old[n-1] 306 h.items = old[0 : n-1] 307 return item 308} 309 310type {{$k.name}}MergeHeapItem struct { 311 itr *buf{{$k.Name}}Iterator 312} 313 314// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one. 315type {{$k.name}}SortedMergeIterator struct { 316 inputs []{{$k.Name}}Iterator 317 heap *{{$k.name}}SortedMergeHeap 318 init bool 319} 320 321// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator. 322func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator { 323 itr := &{{$k.name}}SortedMergeIterator{ 324 inputs: inputs, 325 heap: &{{$k.name}}SortedMergeHeap{ 326 items: make([]*{{$k.name}}SortedMergeHeapItem, 0, len(inputs)), 327 opt: opt, 328 }, 329 } 330 331 // Initialize heap items. 332 for _, input := range inputs { 333 // Append to the heap. 334 itr.heap.items = append(itr.heap.items, &{{$k.name}}SortedMergeHeapItem{itr: input}) 335 } 336 337 return itr 338} 339 340// Stats returns an aggregation of stats from the underlying iterators. 341func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats { 342 var stats IteratorStats 343 for _, input := range itr.inputs { 344 stats.Add(input.Stats()) 345 } 346 return stats 347} 348 349// Close closes the underlying iterators. 350func (itr *{{$k.name}}SortedMergeIterator) Close() error { 351 for _, input := range itr.inputs { 352 input.Close() 353 } 354 return nil 355} 356 357// Next returns the next points from the iterator. 358func (itr *{{$k.name}}SortedMergeIterator) Next() (*{{$k.Name}}Point, error) { return itr.pop() } 359 360// pop returns the next point from the heap. 361// Reads the next point from item's cursor and puts it back on the heap. 362func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) { 363 // Initialize the heap. See the MergeIterator to see why this has to be done lazily. 364 if !itr.init { 365 items := itr.heap.items 366 itr.heap.items = make([]*{{$k.name}}SortedMergeHeapItem, 0, len(items)) 367 for _, item := range items { 368 var err error 369 if item.point, err = item.itr.Next(); err != nil { 370 return nil, err 371 } else if item.point == nil { 372 continue 373 } 374 itr.heap.items = append(itr.heap.items, item) 375 } 376 heap.Init(itr.heap) 377 itr.init = true 378 } 379 380 if len(itr.heap.items) == 0 { 381 return nil, nil 382 } 383 384 // Read the next item from the heap. 385 item := heap.Pop(itr.heap).(*{{$k.name}}SortedMergeHeapItem) 386 if item.err != nil { 387 return nil, item.err 388 } else if item.point == nil { 389 return nil, nil 390 } 391 392 // Copy the point for return. 393 p := item.point.Clone() 394 395 // Read the next item from the cursor. Push back to heap if one exists. 396 if item.point, item.err = item.itr.Next(); item.point != nil { 397 heap.Push(itr.heap, item) 398 } 399 400 return p, nil 401} 402 403// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems. 404// Items are sorted with the following priority: 405// - By their measurement name; 406// - By their tag keys/values; 407// - By time; or 408// - By their Aux field values. 409// 410type {{$k.name}}SortedMergeHeap struct { 411 opt IteratorOptions 412 items []*{{$k.name}}SortedMergeHeapItem 413} 414 415func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) } 416func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] } 417func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool { 418 x, y := h.items[i].point, h.items[j].point 419 420 if h.opt.Ascending { 421 if x.Name != y.Name { 422 return x.Name < y.Name 423 } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) { 424 return xTags.ID() < yTags.ID() 425 } 426 427 if x.Time != y.Time{ 428 return x.Time < y.Time 429 } 430 431 if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) { 432 for i := 0; i < len(x.Aux); i++ { 433 v1, ok1 := x.Aux[i].(string) 434 v2, ok2 := y.Aux[i].(string) 435 if !ok1 || !ok2 { 436 // Unsupported types used in Aux fields. Maybe they 437 // need to be added here? 438 return false 439 } else if v1 == v2 { 440 continue 441 } 442 return v1 < v2 443 } 444 } 445 return false // Times and/or Aux fields are equal. 446 } 447 448 if x.Name != y.Name { 449 return x.Name > y.Name 450 } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) { 451 return xTags.ID() > yTags.ID() 452 } 453 454 if x.Time != y.Time{ 455 return x.Time > y.Time 456 } 457 458 if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) { 459 for i := 0; i < len(x.Aux); i++ { 460 v1, ok1 := x.Aux[i].(string) 461 v2, ok2 := y.Aux[i].(string) 462 if !ok1 || !ok2 { 463 // Unsupported types used in Aux fields. Maybe they 464 // need to be added here? 465 return false 466 } else if v1 == v2 { 467 continue 468 } 469 return v1 > v2 470 } 471 } 472 return false // Times and/or Aux fields are equal. 473} 474 475func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) { 476 h.items = append(h.items, x.(*{{$k.name}}SortedMergeHeapItem)) 477} 478 479func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} { 480 old := h.items 481 n := len(old) 482 item := old[n-1] 483 h.items = old[0 : n-1] 484 return item 485} 486 487type {{$k.name}}SortedMergeHeapItem struct { 488 point *{{$k.Name}}Point 489 err error 490 itr {{$k.Name}}Iterator 491} 492 493// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map. 494type {{$k.name}}IteratorScanner struct { 495 input *buf{{$k.Name}}Iterator 496 err error 497 keys []influxql.VarRef 498 defaultValue interface{} 499} 500 501// new{{$k.Name}}IteratorScanner creates a new IteratorScanner. 502func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []influxql.VarRef, defaultValue interface{}) *{{$k.name}}IteratorScanner { 503 return &{{$k.name}}IteratorScanner{ 504 input: newBuf{{$k.Name}}Iterator(input), 505 keys: keys, 506 defaultValue: defaultValue, 507 } 508} 509 510func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) { 511 if s.err != nil { 512 return ZeroTime, "", Tags{} 513 } 514 515 p, err := s.input.peek() 516 if err != nil { 517 s.err = err 518 return ZeroTime, "", Tags{} 519 } else if p == nil { 520 return ZeroTime, "", Tags{} 521 } 522 return p.Time, p.Name, p.Tags 523} 524 525func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { 526 if s.err != nil { 527 return 528 } 529 530 p, err := s.input.Next() 531 if err != nil { 532 s.err = err 533 return 534 } else if p == nil { 535 s.useDefaults(m) 536 return 537 } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { 538 s.useDefaults(m) 539 s.input.unread(p) 540 return 541 } 542 543 if k := s.keys[0]; k.Val != "" { 544 if p.Nil { 545 if s.defaultValue != SkipDefault { 546 m[k.Val] = castToType(s.defaultValue, k.Type) 547 } 548 } else { 549 m[k.Val] = p.Value 550 } 551 } 552 for i, v := range p.Aux { 553 k := s.keys[i+1] 554 switch v.(type) { 555 case float64, int64, uint64, string, bool: 556 m[k.Val] = v 557 default: 558 // Insert the fill value if one was specified. 559 if s.defaultValue != SkipDefault { 560 m[k.Val] = castToType(s.defaultValue, k.Type) 561 } 562 } 563 } 564} 565 566func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) { 567 if s.defaultValue == SkipDefault { 568 return 569 } 570 for _, k := range s.keys { 571 if k.Val == "" { 572 continue 573 } 574 m[k.Val] = castToType(s.defaultValue, k.Type) 575 } 576} 577 578func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() } 579func (s *{{$k.name}}IteratorScanner) Err() error { return s.err } 580func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() } 581 582// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine. 583type {{$k.name}}ParallelIterator struct { 584 input {{$k.Name}}Iterator 585 ch chan {{$k.name}}PointError 586 587 once sync.Once 588 closing chan struct{} 589 wg sync.WaitGroup 590} 591 592// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator. 593func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator { 594 itr := &{{$k.name}}ParallelIterator{ 595 input: input, 596 ch: make(chan {{$k.name}}PointError, 256), 597 closing: make(chan struct{}), 598 } 599 itr.wg.Add(1) 600 go itr.monitor() 601 return itr 602} 603 604// Stats returns stats from the underlying iterator. 605func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() } 606 607// Close closes the underlying iterators. 608func (itr *{{$k.name}}ParallelIterator) Close() error { 609 itr.once.Do(func() { close(itr.closing) }) 610 itr.wg.Wait() 611 return itr.input.Close() 612} 613 614// Next returns the next point from the iterator. 615func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) { 616 v, ok := <-itr.ch 617 if !ok { 618 return nil, io.EOF 619 } 620 return v.point, v.err 621} 622 623// monitor runs in a separate goroutine and actively pulls the next point. 624func (itr *{{$k.name}}ParallelIterator) monitor() { 625 defer close(itr.ch) 626 defer itr.wg.Done() 627 628 for { 629 // Read next point. 630 p, err := itr.input.Next() 631 if p != nil { 632 p = p.Clone() 633 } 634 635 select { 636 case <-itr.closing: 637 return 638 case itr.ch <- {{$k.name}}PointError{point: p, err: err}: 639 } 640 } 641} 642 643type {{$k.name}}PointError struct { 644 point *{{$k.Name}}Point 645 err error 646} 647 648// {{$k.name}}LimitIterator represents an iterator that limits points per group. 649type {{$k.name}}LimitIterator struct { 650 input {{$k.Name}}Iterator 651 opt IteratorOptions 652 n int 653 654 prev struct { 655 name string 656 tags Tags 657 } 658} 659 660// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator. 661func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator { 662 return &{{$k.name}}LimitIterator{ 663 input: input, 664 opt: opt, 665 } 666} 667 668// Stats returns stats from the underlying iterator. 669func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() } 670 671// Close closes the underlying iterators. 672func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() } 673 674// Next returns the next point from the iterator. 675func (itr *{{$k.name}}LimitIterator) Next() (*{{$k.Name}}Point, error) { 676 for { 677 p, err := itr.input.Next() 678 if p == nil || err != nil { 679 return nil, err 680 } 681 682 // Reset window and counter if a new window is encountered. 683 if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) { 684 itr.prev.name = p.Name 685 itr.prev.tags = p.Tags 686 itr.n = 0 687 } 688 689 // Increment counter. 690 itr.n++ 691 692 // Read next point if not beyond the offset. 693 if itr.n <= itr.opt.Offset { 694 continue 695 } 696 697 // Read next point if we're beyond the limit. 698 if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit { 699 continue 700 } 701 702 return p, nil 703 } 704} 705 706type {{$k.name}}FillIterator struct { 707 input *buf{{$k.Name}}Iterator 708 prev {{$k.Name}}Point 709 startTime int64 710 endTime int64 711 auxFields []interface{} 712 init bool 713 opt IteratorOptions 714 715 window struct { 716 name string 717 tags Tags 718 time int64 719 offset int64 720 } 721} 722 723func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr influxql.Expr, opt IteratorOptions) *{{$k.name}}FillIterator { 724 if opt.Fill == influxql.NullFill { 725 if expr, ok := expr.(*influxql.Call); ok && expr.Name == "count" { 726 opt.Fill = influxql.NumberFill 727 opt.FillValue = {{$k.Zero}} 728 } 729 } 730 731 var startTime, endTime int64 732 if opt.Ascending { 733 startTime, _ = opt.Window(opt.StartTime) 734 endTime, _ = opt.Window(opt.EndTime) 735 } else { 736 startTime, _ = opt.Window(opt.EndTime) 737 endTime, _ = opt.Window(opt.StartTime) 738 } 739 740 var auxFields []interface{} 741 if len(opt.Aux) > 0 { 742 auxFields = make([]interface{}, len(opt.Aux)) 743 } 744 745 return &{{$k.name}}FillIterator{ 746 input: newBuf{{$k.Name}}Iterator(input), 747 prev: {{$k.Name}}Point{Nil: true}, 748 startTime: startTime, 749 endTime: endTime, 750 auxFields: auxFields, 751 opt: opt, 752 } 753} 754 755func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() } 756func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() } 757 758func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { 759 if !itr.init { 760 p, err := itr.input.peek() 761 if p == nil || err != nil { 762 return nil, err 763 } 764 itr.window.name, itr.window.tags = p.Name, p.Tags 765 itr.window.time = itr.startTime 766 if itr.startTime == influxql.MinTime { 767 itr.window.time, _ = itr.opt.Window(p.Time) 768 } 769 if itr.opt.Location != nil { 770 _, itr.window.offset = itr.opt.Zone(itr.window.time) 771 } 772 itr.init = true 773 } 774 775 p, err := itr.input.Next() 776 if err != nil { 777 return nil, err 778 } 779 780 // Check if the next point is outside of our window or is nil. 781 if p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() { 782 // If we are inside of an interval, unread the point and continue below to 783 // constructing a new point. 784 if itr.opt.Ascending && itr.window.time <= itr.endTime { 785 itr.input.unread(p) 786 p = nil 787 goto CONSTRUCT 788 } else if !itr.opt.Ascending && itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime { 789 itr.input.unread(p) 790 p = nil 791 goto CONSTRUCT 792 } 793 794 // We are *not* in a current interval. If there is no next point, 795 // we are at the end of all intervals. 796 if p == nil { 797 return nil, nil 798 } 799 800 // Set the new interval. 801 itr.window.name, itr.window.tags = p.Name, p.Tags 802 itr.window.time = itr.startTime 803 if itr.window.time == influxql.MinTime { 804 itr.window.time, _ = itr.opt.Window(p.Time) 805 } 806 if itr.opt.Location != nil { 807 _, itr.window.offset = itr.opt.Zone(itr.window.time) 808 } 809 itr.prev = {{$k.Name}}Point{Nil: true} 810 } 811 812 // Check if the point is our next expected point. 813CONSTRUCT: 814 if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { 815 if p != nil { 816 itr.input.unread(p) 817 } 818 819 p = &{{$k.Name}}Point{ 820 Name: itr.window.name, 821 Tags: itr.window.tags, 822 Time: itr.window.time, 823 Aux: itr.auxFields, 824 } 825 826 switch itr.opt.Fill { 827 case influxql.LinearFill: 828 {{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}} 829 if !itr.prev.Nil { 830 next, err := itr.input.peek() 831 if err != nil { 832 return nil, err 833 } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { 834 interval := int64(itr.opt.Interval.Duration) 835 start := itr.window.time / interval 836 p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) 837 } else { 838 p.Nil = true 839 } 840 } else { 841 p.Nil = true 842 } 843 {{else}} 844 fallthrough 845 {{- end}} 846 case influxql.NullFill: 847 p.Nil = true 848 case influxql.NumberFill: 849 p.Value, _ = castTo{{$k.Name}}(itr.opt.FillValue) 850 case influxql.PreviousFill: 851 if !itr.prev.Nil { 852 p.Value = itr.prev.Value 853 p.Nil = itr.prev.Nil 854 } else { 855 p.Nil = true 856 } 857 } 858 } else { 859 itr.prev = *p 860 } 861 862 // Advance the expected time. Do not advance to a new window here 863 // as there may be lingering points with the same timestamp in the previous 864 // window. 865 if itr.opt.Ascending { 866 itr.window.time += int64(itr.opt.Interval.Duration) 867 } else { 868 itr.window.time -= int64(itr.opt.Interval.Duration) 869 } 870 871 // Check to see if we have passed over an offset change and adjust the time 872 // to account for this new offset. 873 if itr.opt.Location != nil { 874 if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset { 875 diff := itr.window.offset - offset 876 if abs(diff) < int64(itr.opt.Interval.Duration) { 877 itr.window.time += diff 878 } 879 itr.window.offset = offset 880 } 881 } 882 return p, nil 883} 884 885// {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator. 886type {{$k.name}}IntervalIterator struct { 887 input {{$k.Name}}Iterator 888 opt IteratorOptions 889} 890 891func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator { 892 return &{{$k.name}}IntervalIterator{input: input, opt: opt} 893} 894 895func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() } 896func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() } 897 898func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) { 899 p, err := itr.input.Next() 900 if p == nil || err != nil { 901 return nil, err 902 } 903 p.Time, _ = itr.opt.Window(p.Time) 904 // If we see the minimum allowable time, set the time to zero so we don't 905 // break the default returned time for aggregate queries without times. 906 if p.Time == influxql.MinTime { 907 p.Time = 0 908 } 909 return p, nil 910} 911 912// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator. 913type {{$k.name}}InterruptIterator struct { 914 input {{$k.Name}}Iterator 915 closing <-chan struct{} 916 count int 917} 918 919func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator { 920 return &{{$k.name}}InterruptIterator{input: input, closing: closing} 921} 922 923func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() } 924func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() } 925 926func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) { 927 // Only check if the channel is closed every N points. This 928 // intentionally checks on both 0 and N so that if the iterator 929 // has been interrupted before the first point is emitted it will 930 // not emit any points. 931 if itr.count & 0xFF == 0xFF { 932 select { 933 case <-itr.closing: 934 return nil, itr.Close() 935 default: 936 // Reset iterator count to zero and fall through to emit the next point. 937 itr.count = 0 938 } 939 } 940 941 // Increment the counter for every point read. 942 itr.count++ 943 return itr.input.Next() 944} 945 946// {{$k.name}}CloseInterruptIterator represents a {{$k.name}} implementation of CloseInterruptIterator. 947type {{$k.name}}CloseInterruptIterator struct { 948 input {{$k.Name}}Iterator 949 closing <-chan struct{} 950 done chan struct{} 951 once sync.Once 952} 953 954func new{{$k.Name}}CloseInterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}CloseInterruptIterator { 955 itr := &{{$k.name}}CloseInterruptIterator{ 956 input: input, 957 closing: closing, 958 done: make(chan struct{}), 959 } 960 go itr.monitor() 961 return itr 962} 963 964func (itr *{{$k.name}}CloseInterruptIterator) monitor() { 965 select { 966 case <-itr.closing: 967 itr.Close() 968 case <-itr.done: 969 } 970} 971 972func (itr *{{$k.name}}CloseInterruptIterator) Stats() IteratorStats { 973 return itr.input.Stats() 974} 975 976func (itr *{{$k.name}}CloseInterruptIterator) Close() error { 977 itr.once.Do(func() { 978 close(itr.done) 979 itr.input.Close() 980 }) 981 return nil 982} 983 984func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) { 985 p, err := itr.input.Next() 986 if err != nil { 987 // Check if the iterator was closed. 988 select { 989 case <-itr.done: 990 return nil, nil 991 default: 992 return nil, err 993 } 994 } 995 return p, nil 996} 997 998{{range $v := $types}} 999 1000// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result. 1001type {{$k.name}}Reduce{{$v.Name}}Iterator struct { 1002 input *buf{{$k.Name}}Iterator 1003 create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) 1004 dims []string 1005 opt IteratorOptions 1006 points []{{$v.Name}}Point 1007 keepTags bool 1008} 1009 1010func new{{$k.Name}}Reduce{{$v.Name}}Iterator(input {{$k.Name}}Iterator, opt IteratorOptions, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)) *{{$k.name}}Reduce{{$v.Name}}Iterator { 1011 return &{{$k.name}}Reduce{{$v.Name}}Iterator{ 1012 input: newBuf{{$k.Name}}Iterator(input), 1013 create: createFn, 1014 dims: opt.GetDimensions(), 1015 opt: opt, 1016 } 1017} 1018 1019// Stats returns stats from the input iterator. 1020func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } 1021 1022// Close closes the iterator and all child iterators. 1023func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() } 1024 1025// Next returns the minimum value for the next available interval. 1026func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) { 1027 // Calculate next window if we have no more points. 1028 if len(itr.points) == 0 { 1029 var err error 1030 itr.points, err = itr.reduce() 1031 if len(itr.points) == 0 { 1032 return nil, err 1033 } 1034 } 1035 1036 // Pop next point off the stack. 1037 p := &itr.points[len(itr.points)-1] 1038 itr.points = itr.points[:len(itr.points)-1] 1039 return p, nil 1040} 1041 1042// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination. 1043type {{$k.name}}Reduce{{$v.Name}}Point struct { 1044 Name string 1045 Tags Tags 1046 Aggregator {{$k.Name}}PointAggregator 1047 Emitter {{$v.Name}}PointEmitter 1048} 1049 1050// reduce executes fn once for every point in the next window. 1051// The previous value for the dimension is passed to fn. 1052func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) { 1053 // Calculate next window. 1054 var ( 1055 startTime, endTime int64 1056 window struct { 1057 name string 1058 tags string 1059 } 1060 ) 1061 for { 1062 p, err := itr.input.Next() 1063 if err != nil || p == nil { 1064 return nil, err 1065 } else if p.Nil { 1066 continue 1067 } 1068 1069 // Unread the point so it can be processed. 1070 itr.input.unread(p) 1071 startTime, endTime = itr.opt.Window(p.Time) 1072 window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID() 1073 break 1074 } 1075 1076 // Create points by tags. 1077 m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point) 1078 for { 1079 // Read next point. 1080 curr, err := itr.input.NextInWindow(startTime, endTime) 1081 if err != nil { 1082 return nil, err 1083 } else if curr == nil { 1084 break 1085 } else if curr.Nil { 1086 continue 1087 } else if curr.Name != window.name { 1088 itr.input.unread(curr) 1089 break 1090 } 1091 1092 // Ensure this point is within the same final window. 1093 if curr.Name != window.name { 1094 itr.input.unread(curr) 1095 break 1096 } else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags { 1097 itr.input.unread(curr) 1098 break 1099 } 1100 1101 // Retrieve the tags on this point for this level of the query. 1102 // This may be different than the bucket dimensions. 1103 tags := curr.Tags.Subset(itr.dims) 1104 id := tags.ID() 1105 1106 // Retrieve the aggregator for this name/tag combination or create one. 1107 rp := m[id] 1108 if rp == nil { 1109 aggregator, emitter := itr.create() 1110 rp = &{{$k.name}}Reduce{{$v.Name}}Point{ 1111 Name: curr.Name, 1112 Tags: tags, 1113 Aggregator: aggregator, 1114 Emitter: emitter, 1115 } 1116 m[id] = rp 1117 } 1118 rp.Aggregator.Aggregate{{$k.Name}}(curr) 1119 } 1120 1121 keys := make([]string, 0, len(m)) 1122 for k := range m { 1123 keys = append(keys, k) 1124 } 1125 1126 // Reverse sort points by name & tag. 1127 // This ensures a consistent order of output. 1128 if len(keys) > 0 { 1129 var sorted sort.Interface = sort.StringSlice(keys) 1130 if itr.opt.Ascending { 1131 sorted = sort.Reverse(sorted) 1132 } 1133 sort.Sort(sorted) 1134 } 1135 1136 // Assume the points are already sorted until proven otherwise. 1137 sortedByTime := true 1138 // Emit the points for each name & tag combination. 1139 a := make([]{{$v.Name}}Point, 0, len(m)) 1140 for _, k := range keys { 1141 rp := m[k] 1142 points := rp.Emitter.Emit() 1143 for i := len(points)-1; i >= 0; i-- { 1144 points[i].Name = rp.Name 1145 if !itr.keepTags { 1146 points[i].Tags = rp.Tags 1147 } 1148 // Set the points time to the interval time if the reducer didn't provide one. 1149 if points[i].Time == ZeroTime { 1150 points[i].Time = startTime 1151 } else { 1152 sortedByTime = false 1153 } 1154 a = append(a, points[i]) 1155 } 1156 } 1157 // Points may be out of order. Perform a stable sort by time if requested. 1158 if !sortedByTime && itr.opt.Ordered { 1159 var sorted sort.Interface = {{$v.name}}PointsByTime(a) 1160 if itr.opt.Ascending { 1161 sorted = sort.Reverse(sorted) 1162 } 1163 sort.Stable(sorted) 1164 } 1165 return a, nil 1166} 1167 1168// {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually. 1169type {{$k.name}}Stream{{$v.Name}}Iterator struct { 1170 input *buf{{$k.Name}}Iterator 1171 create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) 1172 dims []string 1173 opt IteratorOptions 1174 m map[string]*{{$k.name}}Reduce{{$v.Name}}Point 1175 points []{{$v.Name}}Point 1176} 1177 1178// new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator. 1179func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator { 1180 return &{{$k.name}}Stream{{$v.Name}}Iterator{ 1181 input: newBuf{{$k.Name}}Iterator(input), 1182 create: createFn, 1183 dims: opt.GetDimensions(), 1184 opt: opt, 1185 m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point), 1186 } 1187} 1188 1189// Stats returns stats from the input iterator. 1190func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() } 1191 1192// Close closes the iterator and all child iterators. 1193func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() } 1194 1195// Next returns the next value for the stream iterator. 1196func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) { 1197 // Calculate next window if we have no more points. 1198 if len(itr.points) == 0 { 1199 var err error 1200 itr.points, err = itr.reduce() 1201 if len(itr.points) == 0 { 1202 return nil, err 1203 } 1204 } 1205 1206 // Pop next point off the stack. 1207 p := &itr.points[len(itr.points)-1] 1208 itr.points = itr.points[:len(itr.points)-1] 1209 return p, nil 1210} 1211 1212// reduce creates and manages aggregators for every point from the input. 1213// After aggregating a point, it always tries to emit a value using the emitter. 1214func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) { 1215 // We have already read all of the input points. 1216 if itr.m == nil { 1217 return nil, nil 1218 } 1219 1220 for { 1221 // Read next point. 1222 curr, err := itr.input.Next() 1223 if err != nil { 1224 return nil, err 1225 } else if curr == nil { 1226 // Close all of the aggregators to flush any remaining points to emit. 1227 var points []{{$v.Name}}Point 1228 for _, rp := range itr.m { 1229 if aggregator, ok := rp.Aggregator.(io.Closer); ok { 1230 if err := aggregator.Close(); err != nil { 1231 return nil, err 1232 } 1233 1234 pts := rp.Emitter.Emit() 1235 if len(pts) == 0 { 1236 continue 1237 } 1238 1239 for i := range pts { 1240 pts[i].Name = rp.Name 1241 pts[i].Tags = rp.Tags 1242 } 1243 points = append(points, pts...) 1244 } 1245 } 1246 1247 // Eliminate the aggregators and emitters. 1248 itr.m = nil 1249 return points, nil 1250 } else if curr.Nil { 1251 continue 1252 } 1253 tags := curr.Tags.Subset(itr.dims) 1254 1255 id := curr.Name 1256 if len(tags.m) > 0 { 1257 id += "\x00" + tags.ID() 1258 } 1259 1260 // Retrieve the aggregator for this name/tag combination or create one. 1261 rp := itr.m[id] 1262 if rp == nil { 1263 aggregator, emitter := itr.create() 1264 rp = &{{$k.name}}Reduce{{.Name}}Point{ 1265 Name: curr.Name, 1266 Tags: tags, 1267 Aggregator: aggregator, 1268 Emitter: emitter, 1269 } 1270 itr.m[id] = rp 1271 } 1272 rp.Aggregator.Aggregate{{$k.Name}}(curr) 1273 1274 // Attempt to emit points from the aggregator. 1275 points := rp.Emitter.Emit() 1276 if len(points) == 0 { 1277 continue 1278 } 1279 1280 for i := range points { 1281 points[i].Name = rp.Name 1282 points[i].Tags = rp.Tags 1283 } 1284 return points, nil 1285 } 1286} 1287{{end}} 1288 1289// {{$k.name}}DedupeIterator only outputs unique points. 1290// This differs from the DistinctIterator in that it compares all aux fields too. 1291// This iterator is relatively inefficient and should only be used on small 1292// datasets such as meta query results. 1293type {{$k.name}}DedupeIterator struct { 1294 input {{$k.Name}}Iterator 1295 m map[string]struct{} // lookup of points already sent 1296} 1297 1298type {{$k.name}}IteratorMapper struct { 1299 cur Cursor 1300 row Row 1301 driver IteratorMap // which iterator to use for the primary value, can be nil 1302 fields []IteratorMap // which iterator to use for an aux field 1303 point {{$k.Name}}Point 1304} 1305 1306func new{{$k.Name}}IteratorMapper(cur Cursor, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper { 1307 return &{{$k.name}}IteratorMapper{ 1308 cur: cur, 1309 driver: driver, 1310 fields: fields, 1311 point: {{$k.Name}}Point{ 1312 Aux: make([]interface{}, len(fields)), 1313 }, 1314 } 1315} 1316 1317func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { 1318 if !itr.cur.Scan(&itr.row) { 1319 if err := itr.cur.Err(); err != nil { 1320 return nil, err 1321 } 1322 return nil, nil 1323 } 1324 1325 itr.point.Time = itr.row.Time 1326 itr.point.Name = itr.row.Series.Name 1327 itr.point.Tags = itr.row.Series.Tags 1328 1329 if itr.driver != nil { 1330 if v := itr.driver.Value(&itr.row); v != nil { 1331 if v, ok := castTo{{$k.Name}}(v); ok { 1332 itr.point.Value = v 1333 itr.point.Nil = false 1334 } else { 1335 itr.point.Value = {{$k.Nil}} 1336 itr.point.Nil = true 1337 } 1338 } else { 1339 itr.point.Value = {{$k.Nil}} 1340 itr.point.Nil = true 1341 } 1342 } 1343 for i, f := range itr.fields { 1344 itr.point.Aux[i] = f.Value(&itr.row) 1345 } 1346 return &itr.point, nil 1347} 1348 1349func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats { 1350 return itr.cur.Stats() 1351} 1352 1353func (itr *{{$k.name}}IteratorMapper) Close() error { 1354 return itr.cur.Close() 1355} 1356 1357type {{$k.name}}FilterIterator struct { 1358 input {{$k.Name}}Iterator 1359 cond influxql.Expr 1360 opt IteratorOptions 1361 m map[string]interface{} 1362} 1363 1364func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond influxql.Expr, opt IteratorOptions) {{$k.Name}}Iterator { 1365 // Strip out time conditions from the WHERE clause. 1366 // TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct. 1367 n := influxql.RewriteFunc(influxql.CloneExpr(cond), func(n influxql.Node) influxql.Node { 1368 switch n := n.(type) { 1369 case *influxql.BinaryExpr: 1370 if n.LHS.String() == "time" { 1371 return &influxql.BooleanLiteral{Val: true} 1372 } 1373 } 1374 return n 1375 }) 1376 1377 cond, _ = n.(influxql.Expr) 1378 if cond == nil { 1379 return input 1380 } else if n, ok := cond.(*influxql.BooleanLiteral); ok && n.Val { 1381 return input 1382 } 1383 1384 return &{{$k.name}}FilterIterator{ 1385 input: input, 1386 cond: cond, 1387 opt: opt, 1388 m: make(map[string]interface{}), 1389 } 1390} 1391 1392func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() } 1393func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() } 1394 1395func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) { 1396 for { 1397 p, err := itr.input.Next() 1398 if err != nil || p == nil { 1399 return nil, err 1400 } 1401 1402 for i, ref := range itr.opt.Aux { 1403 itr.m[ref.Val] = p.Aux[i] 1404 } 1405 for k, v := range p.Tags.KeyValues() { 1406 itr.m[k] = v 1407 } 1408 1409 if !influxql.EvalBool(itr.cond, itr.m) { 1410 continue 1411 } 1412 return p, nil 1413 } 1414} 1415 1416type {{$k.name}}TagSubsetIterator struct { 1417 input {{$k.Name}}Iterator 1418 point {{$k.Name}}Point 1419 lastTags Tags 1420 dimensions []string 1421} 1422 1423func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator { 1424 return &{{$k.name}}TagSubsetIterator{ 1425 input: input, 1426 dimensions: opt.GetDimensions(), 1427 } 1428} 1429 1430func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) { 1431 p, err := itr.input.Next() 1432 if err != nil { 1433 return nil, err 1434 } else if p == nil { 1435 return nil, nil 1436 } 1437 1438 itr.point.Name = p.Name 1439 if !p.Tags.Equal(itr.lastTags) { 1440 itr.point.Tags = p.Tags.Subset(itr.dimensions) 1441 itr.lastTags = p.Tags 1442 } 1443 itr.point.Time = p.Time 1444 itr.point.Value = p.Value 1445 itr.point.Aux = p.Aux 1446 itr.point.Aggregated = p.Aggregated 1447 itr.point.Nil = p.Nil 1448 return &itr.point, nil 1449} 1450 1451func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats { 1452 return itr.input.Stats() 1453} 1454 1455func (itr *{{$k.name}}TagSubsetIterator) Close() error { 1456 return itr.input.Close() 1457} 1458 1459// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator. 1460func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator { 1461 return &{{$k.name}}DedupeIterator{ 1462 input: input, 1463 m: make(map[string]struct{}), 1464 } 1465} 1466 1467// Stats returns stats from the input iterator. 1468func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() } 1469 1470// Close closes the iterator and all child iterators. 1471func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() } 1472 1473// Next returns the next unique point from the input iterator. 1474func (itr *{{$k.name}}DedupeIterator) Next() (*{{$k.Name}}Point, error) { 1475 for { 1476 // Read next point. 1477 p, err := itr.input.Next() 1478 if p == nil || err != nil { 1479 return nil, err 1480 } 1481 1482 // Serialize to bytes to store in lookup. 1483 buf, err := proto.Marshal(encode{{$k.Name}}Point(p)) 1484 if err != nil { 1485 return nil, err 1486 } 1487 1488 // If the point has already been output then move to the next point. 1489 if _, ok := itr.m[string(buf)]; ok { 1490 continue 1491 } 1492 1493 // Otherwise mark it as emitted and return point. 1494 itr.m[string(buf)] = struct{}{} 1495 return p, nil 1496 } 1497} 1498 1499// {{$k.name}}ReaderIterator represents an iterator that streams from a reader. 1500type {{$k.name}}ReaderIterator struct { 1501 r io.Reader 1502 dec *{{$k.Name}}PointDecoder 1503} 1504 1505// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator. 1506func new{{$k.Name}}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator { 1507 dec := New{{$k.Name}}PointDecoder(ctx, r) 1508 dec.stats = stats 1509 1510 return &{{$k.name}}ReaderIterator{ 1511 r: r, 1512 dec: dec, 1513 } 1514} 1515 1516// Stats returns stats about points processed. 1517func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats } 1518 1519// Close closes the underlying reader, if applicable. 1520func (itr *{{$k.name}}ReaderIterator) Close() error { 1521 if r, ok := itr.r.(io.ReadCloser); ok { 1522 return r.Close() 1523 } 1524 return nil 1525} 1526 1527// Next returns the next point from the iterator. 1528func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) { 1529 // OPTIMIZE(benbjohnson): Reuse point on iterator. 1530 1531 // Unmarshal next point. 1532 p := &{{$k.Name}}Point{} 1533 if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF { 1534 return nil, nil 1535 } else if err != nil { 1536 return nil, err 1537 } 1538 return p, nil 1539} 1540{{end}} 1541 1542{{range .}} 1543// encode{{.Name}}Iterator encodes all points from itr to the underlying writer. 1544func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error { 1545 ticker := time.NewTicker(enc.StatsInterval) 1546 defer ticker.Stop() 1547 1548 // Emit initial stats. 1549 if err := enc.encodeStats(itr.Stats()); err != nil { 1550 return err 1551 } 1552 1553 // Continually stream points from the iterator into the encoder. 1554 penc := New{{.Name}}PointEncoder(enc.w) 1555 for { 1556 // Emit stats periodically. 1557 select { 1558 case <-ticker.C: 1559 if err := enc.encodeStats(itr.Stats()); err != nil { 1560 return err 1561 } 1562 default: 1563 } 1564 1565 // Retrieve the next point from the iterator. 1566 p, err := itr.Next() 1567 if err != nil { 1568 return err 1569 } else if p == nil { 1570 break 1571 } 1572 1573 // Write the point to the point encoder. 1574 if err := penc.Encode{{.Name}}Point(p); err != nil { 1575 return err 1576 } 1577 } 1578 1579 // Emit final stats. 1580 if err := enc.encodeStats(itr.Stats()); err != nil { 1581 return err 1582 } 1583 return nil 1584} 1585 1586{{end}} 1587 1588{{end}} 1589