1package tsm1
2
3import (
4	"context"
5	"fmt"
6
7	"github.com/influxdata/influxdb/pkg/metrics"
8	"github.com/influxdata/influxdb/pkg/tracing"
9	"github.com/influxdata/influxdb/query"
10	"github.com/influxdata/influxdb/tsdb"
11	"go.uber.org/zap"
12)
13
14func newLimitIterator(input query.Iterator, opt query.IteratorOptions) query.Iterator {
15	switch input := input.(type) {
16	case query.FloatIterator:
17		return newFloatLimitIterator(input, opt)
18	case query.IntegerIterator:
19		return newIntegerLimitIterator(input, opt)
20	case query.UnsignedIterator:
21		return newUnsignedLimitIterator(input, opt)
22	case query.StringIterator:
23		return newStringLimitIterator(input, opt)
24	case query.BooleanIterator:
25		return newBooleanLimitIterator(input, opt)
26	default:
27		panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
28	}
29}
30
31type floatCastIntegerCursor struct {
32	cursor integerCursor
33}
34
35func (c *floatCastIntegerCursor) close() error { return c.cursor.close() }
36
37func (c *floatCastIntegerCursor) next() (t int64, v interface{}) { return c.nextFloat() }
38
39func (c *floatCastIntegerCursor) nextFloat() (int64, float64) {
40	t, v := c.cursor.nextInteger()
41	return t, float64(v)
42}
43
44type floatCastUnsignedCursor struct {
45	cursor unsignedCursor
46}
47
48func (c *floatCastUnsignedCursor) close() error { return c.cursor.close() }
49
50func (c *floatCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextFloat() }
51
52func (c *floatCastUnsignedCursor) nextFloat() (int64, float64) {
53	t, v := c.cursor.nextUnsigned()
54	return t, float64(v)
55}
56
57type integerCastFloatCursor struct {
58	cursor floatCursor
59}
60
61func (c *integerCastFloatCursor) close() error { return c.cursor.close() }
62
63func (c *integerCastFloatCursor) next() (t int64, v interface{}) { return c.nextInteger() }
64
65func (c *integerCastFloatCursor) nextInteger() (int64, int64) {
66	t, v := c.cursor.nextFloat()
67	return t, int64(v)
68}
69
70type integerCastUnsignedCursor struct {
71	cursor unsignedCursor
72}
73
74func (c *integerCastUnsignedCursor) close() error { return c.cursor.close() }
75
76func (c *integerCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextInteger() }
77
78func (c *integerCastUnsignedCursor) nextInteger() (int64, int64) {
79	t, v := c.cursor.nextUnsigned()
80	return t, int64(v)
81}
82
83type unsignedCastFloatCursor struct {
84	cursor floatCursor
85}
86
87func (c *unsignedCastFloatCursor) close() error { return c.cursor.close() }
88
89func (c *unsignedCastFloatCursor) next() (t int64, v interface{}) { return c.nextUnsigned() }
90
91func (c *unsignedCastFloatCursor) nextUnsigned() (int64, uint64) {
92	t, v := c.cursor.nextFloat()
93	return t, uint64(v)
94}
95
96type unsignedCastIntegerCursor struct {
97	cursor integerCursor
98}
99
100func (c *unsignedCastIntegerCursor) close() error { return c.cursor.close() }
101
102func (c *unsignedCastIntegerCursor) next() (t int64, v interface{}) { return c.nextUnsigned() }
103
104func (c *unsignedCastIntegerCursor) nextUnsigned() (int64, uint64) {
105	t, v := c.cursor.nextInteger()
106	return t, uint64(v)
107}
108
109// literalValueCursor represents a cursor that always returns a single value.
110// It doesn't not have a time value so it can only be used with nextAt().
111type literalValueCursor struct {
112	value interface{}
113}
114
115func (c *literalValueCursor) close() error                   { return nil }
116func (c *literalValueCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
117func (c *literalValueCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
118func (c *literalValueCursor) nextAt(seek int64) interface{}  { return c.value }
119
120// preallocate and cast to cursorAt to avoid allocations
121var (
122	nilFloatLiteralValueCursor    cursorAt = &literalValueCursor{value: (*float64)(nil)}
123	nilIntegerLiteralValueCursor  cursorAt = &literalValueCursor{value: (*int64)(nil)}
124	nilUnsignedLiteralValueCursor cursorAt = &literalValueCursor{value: (*uint64)(nil)}
125	nilStringLiteralValueCursor   cursorAt = &literalValueCursor{value: (*string)(nil)}
126	nilBooleanLiteralValueCursor  cursorAt = &literalValueCursor{value: (*bool)(nil)}
127)
128
129// stringSliceCursor is a cursor that outputs a slice of string values.
130type stringSliceCursor struct {
131	values []string
132}
133
134func (c *stringSliceCursor) close() error { return nil }
135
136func (c *stringSliceCursor) next() (int64, interface{}) { return c.nextString() }
137
138func (c *stringSliceCursor) nextString() (int64, string) {
139	if len(c.values) == 0 {
140		return tsdb.EOF, ""
141	}
142
143	value := c.values[0]
144	c.values = c.values[1:]
145	return 0, value
146}
147
148type cursorsAt []cursorAt
149
150func (c cursorsAt) close() {
151	for _, cur := range c {
152		cur.close()
153	}
154}
155
156// newMergeFinalizerIterator creates a new Merge iterator from the inputs. If the call to Merge succeeds,
157// the resulting Iterator will be wrapped in a finalizer iterator.
158// If Merge returns an error, the inputs will be closed.
159func newMergeFinalizerIterator(ctx context.Context, inputs []query.Iterator, opt query.IteratorOptions, log *zap.Logger) (query.Iterator, error) {
160	itr, err := query.Iterators(inputs).Merge(opt)
161	if err != nil {
162		query.Iterators(inputs).Close()
163		return nil, err
164	}
165	return newInstrumentedIterator(ctx, newFinalizerIterator(itr, log)), nil
166}
167
168// newFinalizerIterator creates a new iterator that installs a runtime finalizer
169// to ensure close is eventually called if the iterator is garbage collected.
170// This additional guard attempts to protect against clients of CreateIterator not
171// correctly closing them and leaking cursors.
172func newFinalizerIterator(itr query.Iterator, log *zap.Logger) query.Iterator {
173	if itr == nil {
174		return nil
175	}
176
177	switch inner := itr.(type) {
178	case query.FloatIterator:
179		return newFloatFinalizerIterator(inner, log)
180	case query.IntegerIterator:
181		return newIntegerFinalizerIterator(inner, log)
182	case query.UnsignedIterator:
183		return newUnsignedFinalizerIterator(inner, log)
184	case query.StringIterator:
185		return newStringFinalizerIterator(inner, log)
186	case query.BooleanIterator:
187		return newBooleanFinalizerIterator(inner, log)
188	default:
189		panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr))
190	}
191}
192
193func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iterator {
194	if itr == nil {
195		return nil
196	}
197
198	span := tracing.SpanFromContext(ctx)
199	grp := metrics.GroupFromContext(ctx)
200	if span == nil || grp == nil {
201		return itr
202	}
203
204	switch inner := itr.(type) {
205	case query.FloatIterator:
206		return newFloatInstrumentedIterator(inner, span, grp)
207	case query.IntegerIterator:
208		return newIntegerInstrumentedIterator(inner, span, grp)
209	case query.UnsignedIterator:
210		return newUnsignedInstrumentedIterator(inner, span, grp)
211	case query.StringIterator:
212		return newStringInstrumentedIterator(inner, span, grp)
213	case query.BooleanIterator:
214		return newBooleanInstrumentedIterator(inner, span, grp)
215	default:
216		panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr))
217	}
218}
219