1package query
2
3import (
4	"context"
5	"container/heap"
6	"io"
7	"sort"
8	"sync"
9	"time"
10	"sync"
11
12	"github.com/gogo/protobuf/proto"
13	"github.com/influxdata/influxql"
14)
15
16// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
17const DefaultStatsInterval = time.Second
18
19{{with $types := .}}{{range $k := $types}}
20
21// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
22type {{$k.Name}}Iterator interface {
23	Iterator
24	Next() (*{{$k.Name}}Point, error)
25}
26
27// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
28// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
29// be cast to a {{$k.Name}}Iterator.
30func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
31	a := make([]{{$k.Name}}Iterator, 0, len(itrs))
32	for _, itr := range itrs {
33		switch itr := itr.(type) {
34		case {{$k.Name}}Iterator:
35			a = append(a, itr)
36		default:
37			itr.Close()
38		}
39	}
40	return a
41}
42
43
44// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
45type buf{{$k.Name}}Iterator struct {
46	itr {{$k.Name}}Iterator
47	buf *{{$k.Name}}Point
48}
49
50// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
51func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
52	return &buf{{$k.Name}}Iterator{itr: itr}
53}
54
55// Stats returns statistics from the input iterator.
56func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() }
57
58// Close closes the underlying iterator.
59func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
60
61// peek returns the next point without removing it from the iterator.
62func (itr *buf{{$k.Name}}Iterator) peek() (*{{$k.Name}}Point, error) {
63	p, err := itr.Next()
64	if err != nil {
65		return nil, err
66	}
67	itr.unread(p)
68	return p, nil
69}
70
71// peekTime returns the time of the next point.
72// Returns zero time if no more points available.
73func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) {
74	p, err := itr.peek()
75	if p == nil || err != nil {
76		return ZeroTime, err
77	}
78	return p.Time, nil
79}
80
81// Next returns the current buffer, if exists, or calls the underlying iterator.
82func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
83	buf := itr.buf
84	if buf != nil {
85		itr.buf = nil
86		return buf, nil
87	}
88	return itr.itr.Next()
89}
90
91// NextInWindow returns the next value if it is between [startTime, endTime).
92// If the next value is outside the range then it is moved to the buffer.
93func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k.Name}}Point, error) {
94	v, err := itr.Next()
95	if v == nil || err != nil {
96		return nil, err
97	} else if t := v.Time; t >= endTime || t < startTime {
98		itr.unread(v)
99		return nil, nil
100	}
101	return v, nil
102}
103
104// unread sets v to the buffer. It is read on the next call to Next().
105func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
106
107// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
108type {{$k.name}}MergeIterator struct {
109	inputs []{{$k.Name}}Iterator
110	heap   *{{$k.name}}MergeHeap
111	init   bool
112
113	closed bool
114	mu     sync.RWMutex
115
116	// Current iterator and window.
117	curr   *{{$k.name}}MergeHeapItem
118	window struct {
119		name      string
120		tags      string
121		startTime int64
122		endTime   int64
123	}
124}
125
126// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
127func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
128	itr := &{{$k.name}}MergeIterator{
129		inputs: inputs,
130		heap: &{{$k.name}}MergeHeap{
131			items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
132			opt:   opt,
133		},
134	}
135
136	// Initialize heap items.
137	for _, input := range inputs {
138		// Wrap in buffer, ignore any inputs without anymore points.
139		bufInput := newBuf{{$k.Name}}Iterator(input)
140
141		// Append to the heap.
142		itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
143	}
144
145	return itr
146}
147
148// Stats returns an aggregation of stats from the underlying iterators.
149func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
150	var stats IteratorStats
151	for _, input := range itr.inputs {
152		stats.Add(input.Stats())
153	}
154	return stats
155}
156
157// Close closes the underlying iterators.
158func (itr *{{$k.name}}MergeIterator) Close() error {
159	itr.mu.Lock()
160	defer itr.mu.Unlock()
161
162	for _, input := range itr.inputs {
163		input.Close()
164	}
165	itr.curr = nil
166	itr.inputs = nil
167	itr.heap.items = nil
168	itr.closed = true
169	return nil
170}
171
172// Next returns the next point from the iterator.
173func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
174	itr.mu.RLock()
175	defer itr.mu.RUnlock()
176	if itr.closed {
177		return nil, nil
178	}
179
180	// Initialize the heap. This needs to be done lazily on the first call to this iterator
181	// so that iterator initialization done through the Select() call returns quickly.
182	// Queries can only be interrupted after the Select() call completes so any operations
183	// done during iterator creation cannot be interrupted, which is why we do it here
184	// instead so an interrupt can happen while initializing the heap.
185	if !itr.init {
186		items := itr.heap.items
187		itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items))
188		for _, item := range items {
189			if p, err := item.itr.peek(); err != nil {
190				return nil, err
191			} else if p == nil {
192				continue
193			}
194			itr.heap.items = append(itr.heap.items, item)
195		}
196		heap.Init(itr.heap)
197		itr.init = true
198	}
199
200	for {
201		// Retrieve the next iterator if we don't have one.
202		if itr.curr == nil {
203			if len(itr.heap.items) == 0 {
204				return nil, nil
205			}
206			itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
207
208			// Read point and set current window.
209			p, err := itr.curr.itr.Next()
210			if err != nil {
211				return nil, err
212			}
213			tags := p.Tags.Subset(itr.heap.opt.Dimensions)
214			itr.window.name, itr.window.tags = p.Name, tags.ID()
215			itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
216			return p, nil
217		}
218
219		// Read the next point from the current iterator.
220		p, err := itr.curr.itr.Next()
221		if err != nil {
222			return nil, err
223		}
224
225		// If there are no more points then remove iterator from heap and find next.
226		if p == nil {
227			itr.curr = nil
228			continue
229		}
230
231		// Check if the point is inside of our current window.
232		inWindow := true
233		if window := itr.window; window.name != p.Name {
234			inWindow = false
235		} else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() {
236			inWindow = false
237		} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
238			inWindow = false
239		} else if !opt.Ascending && p.Time < window.startTime {
240			inWindow = false
241		}
242
243		// If it's outside our window then push iterator back on the heap and find new iterator.
244		if !inWindow {
245			itr.curr.itr.unread(p)
246			heap.Push(itr.heap, itr.curr)
247			itr.curr = nil
248			continue
249		}
250
251		return p, nil
252	}
253}
254
255// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
256// Items are sorted by their next window and then by name/tags.
257type {{$k.name}}MergeHeap struct {
258	opt   IteratorOptions
259	items []*{{$k.name}}MergeHeapItem
260}
261
262func (h *{{$k.name}}MergeHeap) Len() int      { return len(h.items) }
263func (h *{{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
264func (h *{{$k.name}}MergeHeap) Less(i, j int) bool {
265	x, err := h.items[i].itr.peek()
266	if err != nil {
267		return true
268	}
269	y, err := h.items[j].itr.peek()
270	if err != nil {
271		return false
272	}
273
274	if h.opt.Ascending {
275		if x.Name != y.Name {
276			return x.Name < y.Name
277		} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
278			return xTags.ID() < yTags.ID()
279		}
280	} else {
281		if x.Name != y.Name {
282			return x.Name > y.Name
283		} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
284			return xTags.ID() > yTags.ID()
285		}
286	}
287
288	xt, _ := h.opt.Window(x.Time)
289	yt, _ := h.opt.Window(y.Time)
290
291	if h.opt.Ascending {
292		return xt < yt
293	}
294	return xt > yt
295}
296
297
298func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
299	h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
300}
301
302func (h *{{$k.name}}MergeHeap) Pop() interface{} {
303	old := h.items
304	n := len(old)
305	item := old[n-1]
306	h.items = old[0 : n-1]
307	return item
308}
309
310type {{$k.name}}MergeHeapItem struct {
311	itr *buf{{$k.Name}}Iterator
312}
313
314// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
315type {{$k.name}}SortedMergeIterator struct {
316	inputs []{{$k.Name}}Iterator
317	heap   *{{$k.name}}SortedMergeHeap
318	init   bool
319}
320
321// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
322func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
323	itr := &{{$k.name}}SortedMergeIterator{
324		inputs: inputs,
325		heap:   &{{$k.name}}SortedMergeHeap{
326			items: make([]*{{$k.name}}SortedMergeHeapItem, 0, len(inputs)),
327			opt:   opt,
328		},
329	}
330
331	// Initialize heap items.
332	for _, input := range inputs {
333		// Append to the heap.
334		itr.heap.items = append(itr.heap.items, &{{$k.name}}SortedMergeHeapItem{itr: input})
335	}
336
337	return itr
338}
339
340// Stats returns an aggregation of stats from the underlying iterators.
341func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats {
342	var stats IteratorStats
343	for _, input := range itr.inputs {
344		stats.Add(input.Stats())
345	}
346	return stats
347}
348
349// Close closes the underlying iterators.
350func (itr *{{$k.name}}SortedMergeIterator) Close() error {
351	for _, input := range itr.inputs {
352		input.Close()
353	}
354	return nil
355}
356
357// Next returns the next points from the iterator.
358func (itr *{{$k.name}}SortedMergeIterator) Next() (*{{$k.Name}}Point, error) { return itr.pop() }
359
360// pop returns the next point from the heap.
361// Reads the next point from item's cursor and puts it back on the heap.
362func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
363	// Initialize the heap. See the MergeIterator to see why this has to be done lazily.
364	if !itr.init {
365		items := itr.heap.items
366		itr.heap.items = make([]*{{$k.name}}SortedMergeHeapItem, 0, len(items))
367		for _, item := range items {
368			var err error
369			if item.point, err = item.itr.Next(); err != nil {
370				return nil, err
371			} else if item.point == nil {
372				continue
373			}
374			itr.heap.items = append(itr.heap.items, item)
375		}
376		heap.Init(itr.heap)
377		itr.init = true
378	}
379
380	if len(itr.heap.items) == 0 {
381		return nil, nil
382	}
383
384	// Read the next item from the heap.
385	item := heap.Pop(itr.heap).(*{{$k.name}}SortedMergeHeapItem)
386	if item.err != nil {
387		return nil, item.err
388	} else if item.point == nil {
389		return nil, nil
390	}
391
392	// Copy the point for return.
393	p := item.point.Clone()
394
395	// Read the next item from the cursor. Push back to heap if one exists.
396	if item.point, item.err = item.itr.Next(); item.point != nil {
397		heap.Push(itr.heap, item)
398	}
399
400	return p, nil
401}
402
403// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
404// Items are sorted with the following priority:
405//     - By their measurement name;
406//     - By their tag keys/values;
407//     - By time; or
408//     - By their Aux field values.
409//
410type {{$k.name}}SortedMergeHeap struct {
411	opt   IteratorOptions
412	items []*{{$k.name}}SortedMergeHeapItem
413}
414
415func (h *{{$k.name}}SortedMergeHeap) Len() int      { return len(h.items) }
416func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
417func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
418	x, y := h.items[i].point, h.items[j].point
419
420	if h.opt.Ascending {
421		if x.Name != y.Name {
422			return x.Name < y.Name
423		} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
424			return xTags.ID() < yTags.ID()
425		}
426
427		if x.Time != y.Time{
428			return x.Time < y.Time
429		}
430
431		if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
432			for i := 0; i < len(x.Aux); i++ {
433				v1, ok1 := x.Aux[i].(string)
434				v2, ok2 := y.Aux[i].(string)
435				if !ok1 || !ok2 {
436					// Unsupported types used in Aux fields. Maybe they
437					// need to be added here?
438					return false
439				} else if v1 == v2 {
440					continue
441				}
442				return v1 < v2
443			}
444		}
445		return false // Times and/or Aux fields are equal.
446	}
447
448	if x.Name != y.Name {
449		return x.Name > y.Name
450	} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
451		return xTags.ID() > yTags.ID()
452	}
453
454	if x.Time != y.Time{
455		return x.Time > y.Time
456	}
457
458	if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
459		for i := 0; i < len(x.Aux); i++ {
460			v1, ok1 := x.Aux[i].(string)
461			v2, ok2 := y.Aux[i].(string)
462			if !ok1 || !ok2 {
463				// Unsupported types used in Aux fields. Maybe they
464				// need to be added here?
465				return false
466			} else if v1 == v2 {
467				continue
468			}
469			return v1 > v2
470		}
471	}
472	return false // Times and/or Aux fields are equal.
473}
474
475func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
476	h.items = append(h.items, x.(*{{$k.name}}SortedMergeHeapItem))
477}
478
479func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
480	old := h.items
481	n := len(old)
482	item := old[n-1]
483	h.items = old[0 : n-1]
484	return item
485}
486
487type {{$k.name}}SortedMergeHeapItem struct {
488	point     *{{$k.Name}}Point
489	err       error
490	itr       {{$k.Name}}Iterator
491}
492
493// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map.
494type {{$k.name}}IteratorScanner struct {
495	input        *buf{{$k.Name}}Iterator
496	err          error
497	keys         []influxql.VarRef
498	defaultValue interface{}
499}
500
501// new{{$k.Name}}IteratorScanner creates a new IteratorScanner.
502func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []influxql.VarRef, defaultValue interface{}) *{{$k.name}}IteratorScanner {
503	return &{{$k.name}}IteratorScanner{
504		input: newBuf{{$k.Name}}Iterator(input),
505		keys: keys,
506		defaultValue: defaultValue,
507	}
508}
509
510func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) {
511	if s.err != nil {
512		return ZeroTime, "", Tags{}
513	}
514
515	p, err := s.input.peek()
516	if err != nil {
517		s.err = err
518		return ZeroTime, "", Tags{}
519	} else if p == nil {
520		return ZeroTime, "", Tags{}
521	}
522	return p.Time, p.Name, p.Tags
523}
524
525func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) {
526	if s.err != nil {
527		return
528	}
529
530	p, err := s.input.Next()
531	if err != nil {
532		s.err = err
533		return
534	} else if p == nil {
535		s.useDefaults(m)
536		return
537	} else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) {
538		s.useDefaults(m)
539		s.input.unread(p)
540		return
541	}
542
543	if k := s.keys[0]; k.Val != "" {
544		if p.Nil {
545			if s.defaultValue != SkipDefault {
546				m[k.Val] = castToType(s.defaultValue, k.Type)
547			}
548		} else {
549			m[k.Val] = p.Value
550		}
551	}
552	for i, v := range p.Aux {
553		k := s.keys[i+1]
554		switch v.(type) {
555		case float64, int64, uint64, string, bool:
556			m[k.Val] = v
557		default:
558			// Insert the fill value if one was specified.
559			if s.defaultValue != SkipDefault {
560				m[k.Val] = castToType(s.defaultValue, k.Type)
561			}
562		}
563	}
564}
565
566func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) {
567	if s.defaultValue == SkipDefault {
568		return
569	}
570	for _, k := range s.keys {
571		if k.Val == "" {
572		  continue
573		}
574		m[k.Val] = castToType(s.defaultValue, k.Type)
575	}
576}
577
578func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() }
579func (s *{{$k.name}}IteratorScanner) Err() error { return s.err }
580func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() }
581
582// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine.
583type {{$k.name}}ParallelIterator struct {
584	input   {{$k.Name}}Iterator
585	ch      chan {{$k.name}}PointError
586
587	once    sync.Once
588	closing chan struct{}
589	wg sync.WaitGroup
590}
591
592// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
593func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator {
594	itr := &{{$k.name}}ParallelIterator{
595		input:   input,
596		ch:      make(chan {{$k.name}}PointError, 256),
597		closing: make(chan struct{}),
598	}
599	itr.wg.Add(1)
600	go itr.monitor()
601	return itr
602}
603
604// Stats returns stats from the underlying iterator.
605func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() }
606
607// Close closes the underlying iterators.
608func (itr *{{$k.name}}ParallelIterator) Close() error {
609	itr.once.Do(func() { close(itr.closing) })
610	itr.wg.Wait()
611	return itr.input.Close()
612}
613
614// Next returns the next point from the iterator.
615func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
616	v, ok := <-itr.ch
617	if !ok {
618		return nil, io.EOF
619	}
620	return v.point, v.err
621}
622
623// monitor runs in a separate goroutine and actively pulls the next point.
624func (itr *{{$k.name}}ParallelIterator) monitor()  {
625	defer close(itr.ch)
626	defer itr.wg.Done()
627
628	for {
629		// Read next point.
630		p, err := itr.input.Next()
631		if p != nil {
632			p = p.Clone()
633		}
634
635		select {
636		case <-itr.closing:
637			return
638		case itr.ch <- {{$k.name}}PointError{point: p, err: err}:
639		}
640	}
641}
642
643type {{$k.name}}PointError struct {
644	point *{{$k.Name}}Point
645	err   error
646}
647
648// {{$k.name}}LimitIterator represents an iterator that limits points per group.
649type {{$k.name}}LimitIterator struct {
650	input {{$k.Name}}Iterator
651	opt   IteratorOptions
652	n     int
653
654	prev struct {
655		name string
656		tags Tags
657	}
658}
659
660// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
661func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
662	return &{{$k.name}}LimitIterator{
663		input: input,
664		opt:   opt,
665	}
666}
667
668// Stats returns stats from the underlying iterator.
669func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() }
670
671// Close closes the underlying iterators.
672func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
673
674// Next returns the next point from the iterator.
675func (itr *{{$k.name}}LimitIterator) Next() (*{{$k.Name}}Point, error) {
676	for {
677		p, err := itr.input.Next()
678		if p == nil || err != nil {
679			return nil, err
680		}
681
682		// Reset window and counter if a new window is encountered.
683		if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
684			itr.prev.name = p.Name
685			itr.prev.tags = p.Tags
686			itr.n = 0
687		}
688
689		// Increment counter.
690		itr.n++
691
692		// Read next point if not beyond the offset.
693		if itr.n <= itr.opt.Offset {
694			continue
695		}
696
697		// Read next point if we're beyond the limit.
698		if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
699			continue
700		}
701
702		return p, nil
703	}
704}
705
706type {{$k.name}}FillIterator struct {
707	input     *buf{{$k.Name}}Iterator
708	prev      {{$k.Name}}Point
709	startTime int64
710	endTime   int64
711	auxFields []interface{}
712	init      bool
713	opt       IteratorOptions
714
715	window struct {
716		name   string
717		tags   Tags
718		time   int64
719		offset int64
720	}
721}
722
723func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr influxql.Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
724	if opt.Fill == influxql.NullFill {
725		if expr, ok := expr.(*influxql.Call); ok && expr.Name == "count" {
726			opt.Fill = influxql.NumberFill
727			opt.FillValue = {{$k.Zero}}
728		}
729	}
730
731	var startTime, endTime int64
732	if opt.Ascending {
733		startTime, _ = opt.Window(opt.StartTime)
734		endTime, _ = opt.Window(opt.EndTime)
735	} else {
736		startTime, _ = opt.Window(opt.EndTime)
737		endTime, _ = opt.Window(opt.StartTime)
738	}
739
740	var auxFields []interface{}
741	if len(opt.Aux) > 0 {
742		auxFields = make([]interface{}, len(opt.Aux))
743	}
744
745	return &{{$k.name}}FillIterator{
746		input:     newBuf{{$k.Name}}Iterator(input),
747		prev:      {{$k.Name}}Point{Nil: true},
748		startTime: startTime,
749		endTime:   endTime,
750		auxFields: auxFields,
751		opt:       opt,
752	}
753}
754
755func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() }
756func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
757
758func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
759	if !itr.init {
760		p, err := itr.input.peek()
761		if p == nil || err != nil {
762			return nil, err
763		}
764		itr.window.name, itr.window.tags = p.Name, p.Tags
765		itr.window.time = itr.startTime
766		if itr.startTime == influxql.MinTime {
767			itr.window.time, _ = itr.opt.Window(p.Time)
768		}
769		if itr.opt.Location != nil {
770			_, itr.window.offset = itr.opt.Zone(itr.window.time)
771		}
772		itr.init = true
773	}
774
775	p, err := itr.input.Next()
776	if err != nil {
777		return nil, err
778	}
779
780	// Check if the next point is outside of our window or is nil.
781	if p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
782		// If we are inside of an interval, unread the point and continue below to
783		// constructing a new point.
784		if itr.opt.Ascending && itr.window.time <= itr.endTime {
785			itr.input.unread(p)
786			p = nil
787			goto CONSTRUCT
788		} else if !itr.opt.Ascending && itr.window.time >= itr.endTime && itr.endTime != influxql.MinTime {
789			itr.input.unread(p)
790			p = nil
791			goto CONSTRUCT
792		}
793
794		// We are *not* in a current interval. If there is no next point,
795		// we are at the end of all intervals.
796		if p == nil {
797			return nil, nil
798		}
799
800		// Set the new interval.
801		itr.window.name, itr.window.tags = p.Name, p.Tags
802		itr.window.time = itr.startTime
803		if itr.window.time == influxql.MinTime {
804			itr.window.time, _ = itr.opt.Window(p.Time)
805		}
806		if itr.opt.Location != nil {
807			_, itr.window.offset = itr.opt.Zone(itr.window.time)
808		}
809		itr.prev = {{$k.Name}}Point{Nil: true}
810	}
811
812	// Check if the point is our next expected point.
813CONSTRUCT:
814	if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) {
815		if p != nil {
816			itr.input.unread(p)
817		}
818
819		p = &{{$k.Name}}Point{
820			Name: itr.window.name,
821			Tags: itr.window.tags,
822			Time: itr.window.time,
823			Aux:  itr.auxFields,
824		}
825
826		switch itr.opt.Fill {
827		case influxql.LinearFill:
828			{{- if or (eq $k.Name "Float") (eq $k.Name "Integer") (eq $k.Name "Unsigned")}}
829			if !itr.prev.Nil {
830				next, err := itr.input.peek()
831				if err != nil {
832					return nil, err
833				} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
834					interval := int64(itr.opt.Interval.Duration)
835					start := itr.window.time / interval
836					p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
837				} else {
838					p.Nil = true
839				}
840			} else {
841				p.Nil = true
842			}
843			{{else}}
844			fallthrough
845			{{- end}}
846		case influxql.NullFill:
847			p.Nil = true
848		case influxql.NumberFill:
849			p.Value, _ = castTo{{$k.Name}}(itr.opt.FillValue)
850		case influxql.PreviousFill:
851			if !itr.prev.Nil {
852				p.Value = itr.prev.Value
853				p.Nil = itr.prev.Nil
854			} else {
855				p.Nil = true
856			}
857		}
858	} else {
859		itr.prev = *p
860	}
861
862	// Advance the expected time. Do not advance to a new window here
863	// as there may be lingering points with the same timestamp in the previous
864	// window.
865	if itr.opt.Ascending {
866		itr.window.time += int64(itr.opt.Interval.Duration)
867	} else {
868		itr.window.time -= int64(itr.opt.Interval.Duration)
869	}
870
871	// Check to see if we have passed over an offset change and adjust the time
872	// to account for this new offset.
873	if itr.opt.Location != nil {
874		if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset {
875			diff := itr.window.offset - offset
876			if abs(diff) < int64(itr.opt.Interval.Duration) {
877				itr.window.time += diff
878			}
879			itr.window.offset = offset
880		}
881	}
882	return p, nil
883}
884
885// {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator.
886type {{$k.name}}IntervalIterator struct {
887	input {{$k.Name}}Iterator
888	opt   IteratorOptions
889}
890
891func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator {
892	return &{{$k.name}}IntervalIterator{input: input, opt: opt}
893}
894
895func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() }
896func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() }
897
898func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
899	p, err := itr.input.Next()
900	if p == nil || err != nil {
901		return nil, err
902	}
903	p.Time, _ = itr.opt.Window(p.Time)
904	// If we see the minimum allowable time, set the time to zero so we don't
905	// break the default returned time for aggregate queries without times.
906	if p.Time == influxql.MinTime {
907		p.Time = 0
908	}
909	return p, nil
910}
911
912// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
913type {{$k.name}}InterruptIterator struct {
914	input   {{$k.Name}}Iterator
915	closing <-chan struct{}
916	count   int
917}
918
919func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
920	return &{{$k.name}}InterruptIterator{input: input, closing: closing}
921}
922
923func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() }
924func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
925
926func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) {
927	// Only check if the channel is closed every N points. This
928	// intentionally checks on both 0 and N so that if the iterator
929	// has been interrupted before the first point is emitted it will
930	// not emit any points.
931	if itr.count & 0xFF == 0xFF {
932		select {
933		case <-itr.closing:
934			return nil, itr.Close()
935		default:
936			// Reset iterator count to zero and fall through to emit the next point.
937			itr.count = 0
938		}
939	}
940
941	// Increment the counter for every point read.
942	itr.count++
943	return itr.input.Next()
944}
945
946// {{$k.name}}CloseInterruptIterator represents a {{$k.name}} implementation of CloseInterruptIterator.
947type {{$k.name}}CloseInterruptIterator struct {
948	input   {{$k.Name}}Iterator
949	closing <-chan struct{}
950	done    chan struct{}
951	once    sync.Once
952}
953
954func new{{$k.Name}}CloseInterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}CloseInterruptIterator {
955	itr := &{{$k.name}}CloseInterruptIterator{
956		input:   input,
957		closing: closing,
958		done:    make(chan struct{}),
959	}
960	go itr.monitor()
961	return itr
962}
963
964func (itr *{{$k.name}}CloseInterruptIterator) monitor() {
965	select {
966	case <-itr.closing:
967		itr.Close()
968	case <-itr.done:
969	}
970}
971
972func (itr *{{$k.name}}CloseInterruptIterator) Stats() IteratorStats {
973	return itr.input.Stats()
974}
975
976func (itr *{{$k.name}}CloseInterruptIterator) Close() error {
977	itr.once.Do(func() {
978		close(itr.done)
979		itr.input.Close()
980	})
981	return nil
982}
983
984func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) {
985	p, err := itr.input.Next()
986	if err != nil {
987		// Check if the iterator was closed.
988		select {
989		case <-itr.done:
990			return nil, nil
991		default:
992			return nil, err
993		}
994	}
995	return p, nil
996}
997
998{{range $v := $types}}
999
1000// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
1001type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
1002	input    *buf{{$k.Name}}Iterator
1003	create   func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
1004	dims     []string
1005	opt      IteratorOptions
1006	points   []{{$v.Name}}Point
1007	keepTags bool
1008}
1009
1010func new{{$k.Name}}Reduce{{$v.Name}}Iterator(input {{$k.Name}}Iterator, opt IteratorOptions, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)) *{{$k.name}}Reduce{{$v.Name}}Iterator {
1011	return &{{$k.name}}Reduce{{$v.Name}}Iterator{
1012		input:  newBuf{{$k.Name}}Iterator(input),
1013		create: createFn,
1014		dims:   opt.GetDimensions(),
1015		opt:    opt,
1016	}
1017}
1018
1019// Stats returns stats from the input iterator.
1020func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
1021
1022// Close closes the iterator and all child iterators.
1023func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
1024
1025// Next returns the minimum value for the next available interval.
1026func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
1027	// Calculate next window if we have no more points.
1028	if len(itr.points) == 0 {
1029		var err error
1030		itr.points, err = itr.reduce()
1031		if len(itr.points) == 0 {
1032			return nil, err
1033		}
1034	}
1035
1036	// Pop next point off the stack.
1037	p := &itr.points[len(itr.points)-1]
1038	itr.points = itr.points[:len(itr.points)-1]
1039	return p, nil
1040}
1041
1042// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
1043type {{$k.name}}Reduce{{$v.Name}}Point struct {
1044	Name       string
1045	Tags       Tags
1046	Aggregator {{$k.Name}}PointAggregator
1047	Emitter    {{$v.Name}}PointEmitter
1048}
1049
1050// reduce executes fn once for every point in the next window.
1051// The previous value for the dimension is passed to fn.
1052func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
1053	// Calculate next window.
1054	var (
1055		startTime, endTime int64
1056		window             struct {
1057			name string
1058			tags string
1059		}
1060	)
1061	for {
1062		p, err := itr.input.Next()
1063		if err != nil || p == nil {
1064			return nil, err
1065		} else if p.Nil {
1066			continue
1067		}
1068
1069		// Unread the point so it can be processed.
1070		itr.input.unread(p)
1071		startTime, endTime = itr.opt.Window(p.Time)
1072		window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID()
1073		break
1074	}
1075
1076	// Create points by tags.
1077	m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point)
1078	for {
1079		// Read next point.
1080		curr, err := itr.input.NextInWindow(startTime, endTime)
1081		if err != nil {
1082			return nil, err
1083		} else if curr == nil {
1084			break
1085		} else if curr.Nil {
1086			continue
1087		} else if curr.Name != window.name {
1088			itr.input.unread(curr)
1089			break
1090		}
1091
1092		// Ensure this point is within the same final window.
1093		if curr.Name != window.name {
1094			itr.input.unread(curr)
1095			break
1096		} else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags {
1097			itr.input.unread(curr)
1098			break
1099		}
1100
1101		// Retrieve the tags on this point for this level of the query.
1102		// This may be different than the bucket dimensions.
1103		tags := curr.Tags.Subset(itr.dims)
1104		id := tags.ID()
1105
1106		// Retrieve the aggregator for this name/tag combination or create one.
1107		rp := m[id]
1108		if rp == nil {
1109			aggregator, emitter := itr.create()
1110			rp = &{{$k.name}}Reduce{{$v.Name}}Point{
1111				Name:       curr.Name,
1112				Tags:       tags,
1113				Aggregator: aggregator,
1114				Emitter:    emitter,
1115			}
1116			m[id] = rp
1117		}
1118		rp.Aggregator.Aggregate{{$k.Name}}(curr)
1119	}
1120
1121	keys := make([]string, 0, len(m))
1122	for k := range m {
1123		keys = append(keys, k)
1124	}
1125
1126	// Reverse sort points by name & tag.
1127	// This ensures a consistent order of output.
1128	if len(keys) > 0 {
1129		var sorted sort.Interface = sort.StringSlice(keys)
1130		if itr.opt.Ascending {
1131			sorted = sort.Reverse(sorted)
1132		}
1133		sort.Sort(sorted)
1134	}
1135
1136	// Assume the points are already sorted until proven otherwise.
1137	sortedByTime := true
1138	// Emit the points for each name & tag combination.
1139	a := make([]{{$v.Name}}Point, 0, len(m))
1140	for _, k := range keys {
1141		rp := m[k]
1142		points := rp.Emitter.Emit()
1143		for i := len(points)-1; i >= 0; i-- {
1144			points[i].Name = rp.Name
1145			if !itr.keepTags {
1146				points[i].Tags = rp.Tags
1147			}
1148			// Set the points time to the interval time if the reducer didn't provide one.
1149			if points[i].Time == ZeroTime {
1150				points[i].Time = startTime
1151			} else {
1152				sortedByTime = false
1153			}
1154			a = append(a, points[i])
1155		}
1156	}
1157	// Points may be out of order. Perform a stable sort by time if requested.
1158	if !sortedByTime && itr.opt.Ordered {
1159		var sorted sort.Interface = {{$v.name}}PointsByTime(a)
1160		if itr.opt.Ascending {
1161			sorted = sort.Reverse(sorted)
1162		}
1163		sort.Stable(sorted)
1164	}
1165	return a, nil
1166}
1167
1168// {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually.
1169type {{$k.name}}Stream{{$v.Name}}Iterator struct {
1170	input  *buf{{$k.Name}}Iterator
1171	create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
1172	dims   []string
1173	opt    IteratorOptions
1174	m      map[string]*{{$k.name}}Reduce{{$v.Name}}Point
1175	points []{{$v.Name}}Point
1176}
1177
1178// new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator.
1179func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator {
1180	return &{{$k.name}}Stream{{$v.Name}}Iterator{
1181		input:  newBuf{{$k.Name}}Iterator(input),
1182		create: createFn,
1183		dims:   opt.GetDimensions(),
1184		opt:    opt,
1185		m:      make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point),
1186	}
1187}
1188
1189// Stats returns stats from the input iterator.
1190func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
1191
1192// Close closes the iterator and all child iterators.
1193func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
1194
1195// Next returns the next value for the stream iterator.
1196func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
1197	// Calculate next window if we have no more points.
1198	if len(itr.points) == 0 {
1199		var err error
1200		itr.points, err = itr.reduce()
1201		if len(itr.points) == 0 {
1202			return nil, err
1203		}
1204	}
1205
1206	// Pop next point off the stack.
1207	p := &itr.points[len(itr.points)-1]
1208	itr.points = itr.points[:len(itr.points)-1]
1209	return p, nil
1210}
1211
1212// reduce creates and manages aggregators for every point from the input.
1213// After aggregating a point, it always tries to emit a value using the emitter.
1214func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
1215	// We have already read all of the input points.
1216	if itr.m == nil {
1217		return nil, nil
1218	}
1219
1220	for {
1221		// Read next point.
1222		curr, err := itr.input.Next()
1223		if err != nil {
1224			return nil, err
1225		} else if curr == nil {
1226			// Close all of the aggregators to flush any remaining points to emit.
1227			var points []{{$v.Name}}Point
1228			for _, rp := range itr.m {
1229				if aggregator, ok := rp.Aggregator.(io.Closer); ok {
1230					if err := aggregator.Close(); err != nil {
1231						return nil, err
1232					}
1233
1234					pts := rp.Emitter.Emit()
1235					if len(pts) == 0 {
1236						continue
1237					}
1238
1239					for i := range pts {
1240						pts[i].Name = rp.Name
1241						pts[i].Tags = rp.Tags
1242					}
1243					points = append(points, pts...)
1244				}
1245			}
1246
1247			// Eliminate the aggregators and emitters.
1248			itr.m = nil
1249			return points, nil
1250		} else if curr.Nil {
1251			continue
1252		}
1253		tags := curr.Tags.Subset(itr.dims)
1254
1255		id := curr.Name
1256		if len(tags.m) > 0 {
1257			id += "\x00" + tags.ID()
1258		}
1259
1260		// Retrieve the aggregator for this name/tag combination or create one.
1261		rp := itr.m[id]
1262		if rp == nil {
1263			aggregator, emitter := itr.create()
1264			rp = &{{$k.name}}Reduce{{.Name}}Point{
1265				Name:       curr.Name,
1266				Tags:       tags,
1267				Aggregator: aggregator,
1268				Emitter:    emitter,
1269			}
1270			itr.m[id] = rp
1271		}
1272		rp.Aggregator.Aggregate{{$k.Name}}(curr)
1273
1274		// Attempt to emit points from the aggregator.
1275		points := rp.Emitter.Emit()
1276		if len(points) == 0 {
1277			continue
1278		}
1279
1280		for i := range points {
1281			points[i].Name = rp.Name
1282			points[i].Tags = rp.Tags
1283		}
1284		return points, nil
1285	}
1286}
1287{{end}}
1288
1289// {{$k.name}}DedupeIterator only outputs unique points.
1290// This differs from the DistinctIterator in that it compares all aux fields too.
1291// This iterator is relatively inefficient and should only be used on small
1292// datasets such as meta query results.
1293type {{$k.name}}DedupeIterator struct {
1294	input {{$k.Name}}Iterator
1295	m     map[string]struct{} // lookup of points already sent
1296}
1297
1298type {{$k.name}}IteratorMapper struct {
1299	cur       Cursor
1300	row       Row
1301	driver    IteratorMap   // which iterator to use for the primary value, can be nil
1302	fields    []IteratorMap // which iterator to use for an aux field
1303	point     {{$k.Name}}Point
1304}
1305
1306func new{{$k.Name}}IteratorMapper(cur Cursor, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper {
1307	return &{{$k.name}}IteratorMapper{
1308		cur:       cur,
1309		driver:    driver,
1310		fields:    fields,
1311		point:  {{$k.Name}}Point{
1312			Aux: make([]interface{}, len(fields)),
1313		},
1314	}
1315}
1316
1317func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
1318	if !itr.cur.Scan(&itr.row) {
1319		if err := itr.cur.Err(); err != nil {
1320			return nil, err
1321		}
1322		return nil, nil
1323	}
1324
1325	itr.point.Time = itr.row.Time
1326	itr.point.Name = itr.row.Series.Name
1327	itr.point.Tags = itr.row.Series.Tags
1328
1329	if itr.driver != nil {
1330	if v := itr.driver.Value(&itr.row); v != nil {
1331			if v, ok := castTo{{$k.Name}}(v); ok {
1332				itr.point.Value = v
1333				itr.point.Nil = false
1334			} else {
1335				itr.point.Value = {{$k.Nil}}
1336				itr.point.Nil = true
1337			}
1338		} else {
1339			itr.point.Value = {{$k.Nil}}
1340			itr.point.Nil = true
1341		}
1342	}
1343	for i, f := range itr.fields {
1344		itr.point.Aux[i] = f.Value(&itr.row)
1345	}
1346	return &itr.point, nil
1347}
1348
1349func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
1350	return itr.cur.Stats()
1351}
1352
1353func (itr *{{$k.name}}IteratorMapper) Close() error {
1354	return itr.cur.Close()
1355}
1356
1357type {{$k.name}}FilterIterator struct {
1358	input {{$k.Name}}Iterator
1359	cond  influxql.Expr
1360	opt   IteratorOptions
1361	m     map[string]interface{}
1362}
1363
1364func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond influxql.Expr, opt IteratorOptions) {{$k.Name}}Iterator {
1365	// Strip out time conditions from the WHERE clause.
1366	// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
1367	n := influxql.RewriteFunc(influxql.CloneExpr(cond), func(n influxql.Node) influxql.Node {
1368		switch n := n.(type) {
1369		case *influxql.BinaryExpr:
1370			if n.LHS.String() == "time" {
1371				return &influxql.BooleanLiteral{Val: true}
1372			}
1373		}
1374		return n
1375	})
1376
1377	cond, _ = n.(influxql.Expr)
1378	if cond == nil {
1379		return input
1380	} else if n, ok := cond.(*influxql.BooleanLiteral); ok && n.Val {
1381		return input
1382	}
1383
1384	return &{{$k.name}}FilterIterator{
1385		input: input,
1386		cond:  cond,
1387		opt:   opt,
1388		m:     make(map[string]interface{}),
1389	}
1390}
1391
1392func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
1393func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() }
1394
1395func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
1396	for {
1397		p, err := itr.input.Next()
1398		if err != nil || p == nil {
1399			return nil, err
1400		}
1401
1402		for i, ref := range itr.opt.Aux {
1403			itr.m[ref.Val] = p.Aux[i]
1404		}
1405		for k, v := range p.Tags.KeyValues() {
1406			itr.m[k] = v
1407		}
1408
1409		if !influxql.EvalBool(itr.cond, itr.m) {
1410			continue
1411		}
1412		return p, nil
1413	}
1414}
1415
1416type {{$k.name}}TagSubsetIterator struct {
1417	input      {{$k.Name}}Iterator
1418	point      {{$k.Name}}Point
1419	lastTags   Tags
1420	dimensions []string
1421}
1422
1423func new{{$k.Name}}TagSubsetIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}TagSubsetIterator {
1424	return &{{$k.name}}TagSubsetIterator{
1425		input:      input,
1426		dimensions: opt.GetDimensions(),
1427	}
1428}
1429
1430func (itr *{{$k.name}}TagSubsetIterator) Next() (*{{$k.Name}}Point, error) {
1431	p, err := itr.input.Next()
1432	if err != nil {
1433		return nil, err
1434	} else if p == nil {
1435		return nil, nil
1436	}
1437
1438	itr.point.Name = p.Name
1439	if !p.Tags.Equal(itr.lastTags) {
1440		itr.point.Tags = p.Tags.Subset(itr.dimensions)
1441		itr.lastTags = p.Tags
1442	}
1443	itr.point.Time = p.Time
1444	itr.point.Value = p.Value
1445	itr.point.Aux = p.Aux
1446	itr.point.Aggregated = p.Aggregated
1447	itr.point.Nil = p.Nil
1448	return &itr.point, nil
1449}
1450
1451func (itr *{{$k.name}}TagSubsetIterator) Stats() IteratorStats {
1452	return itr.input.Stats()
1453}
1454
1455func (itr *{{$k.name}}TagSubsetIterator) Close() error {
1456	return itr.input.Close()
1457}
1458
1459// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
1460func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
1461	return &{{$k.name}}DedupeIterator{
1462		input: input,
1463		m:     make(map[string]struct{}),
1464	}
1465}
1466
1467// Stats returns stats from the input iterator.
1468func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
1469
1470// Close closes the iterator and all child iterators.
1471func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
1472
1473// Next returns the next unique point from the input iterator.
1474func (itr *{{$k.name}}DedupeIterator) Next() (*{{$k.Name}}Point, error) {
1475	for {
1476		// Read next point.
1477		p, err := itr.input.Next()
1478		if p == nil || err != nil {
1479			return nil, err
1480		}
1481
1482		// Serialize to bytes to store in lookup.
1483		buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
1484		if err != nil {
1485			return nil, err
1486		}
1487
1488		// If the point has already been output then move to the next point.
1489		if _, ok := itr.m[string(buf)]; ok {
1490			continue
1491		}
1492
1493		// Otherwise mark it as emitted and return point.
1494		itr.m[string(buf)] = struct{}{}
1495		return p, nil
1496	}
1497}
1498
1499// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
1500type {{$k.name}}ReaderIterator struct {
1501	r     io.Reader
1502	dec   *{{$k.Name}}PointDecoder
1503}
1504
1505// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
1506func new{{$k.Name}}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator {
1507	dec := New{{$k.Name}}PointDecoder(ctx, r)
1508	dec.stats = stats
1509
1510	return &{{$k.name}}ReaderIterator{
1511		r:     r,
1512		dec:   dec,
1513	}
1514}
1515
1516// Stats returns stats about points processed.
1517func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats }
1518
1519// Close closes the underlying reader, if applicable.
1520func (itr *{{$k.name}}ReaderIterator) Close() error {
1521	if r, ok := itr.r.(io.ReadCloser); ok {
1522		return r.Close()
1523	}
1524	return nil
1525}
1526
1527// Next returns the next point from the iterator.
1528func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) {
1529	// OPTIMIZE(benbjohnson): Reuse point on iterator.
1530
1531	// Unmarshal next point.
1532	p := &{{$k.Name}}Point{}
1533	if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
1534		return nil, nil
1535	} else if err != nil {
1536		return nil, err
1537	}
1538	return p, nil
1539}
1540{{end}}
1541
1542{{range .}}
1543// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
1544func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
1545	ticker := time.NewTicker(enc.StatsInterval)
1546	defer ticker.Stop()
1547
1548	// Emit initial stats.
1549	if err := enc.encodeStats(itr.Stats()); err != nil {
1550		return err
1551	}
1552
1553	// Continually stream points from the iterator into the encoder.
1554	penc := New{{.Name}}PointEncoder(enc.w)
1555	for {
1556		// Emit stats periodically.
1557		select {
1558		case <-ticker.C:
1559			if err := enc.encodeStats(itr.Stats()); err != nil {
1560				return err
1561			}
1562		default:
1563		}
1564
1565		// Retrieve the next point from the iterator.
1566		p, err := itr.Next()
1567		if err != nil {
1568			return err
1569		} else if p == nil {
1570			break
1571		}
1572
1573		// Write the point to the point encoder.
1574		if err := penc.Encode{{.Name}}Point(p); err != nil {
1575			return err
1576		}
1577	}
1578
1579	// Emit final stats.
1580	if err := enc.encodeStats(itr.Stats()); err != nil {
1581		return err
1582	}
1583	return nil
1584}
1585
1586{{end}}
1587
1588{{end}}
1589