1package tsm1 2 3import ( 4 "context" 5 "fmt" 6 7 "github.com/influxdata/influxdb/pkg/metrics" 8 "github.com/influxdata/influxdb/pkg/tracing" 9 "github.com/influxdata/influxdb/query" 10 "github.com/influxdata/influxdb/tsdb" 11 "go.uber.org/zap" 12) 13 14func newLimitIterator(input query.Iterator, opt query.IteratorOptions) query.Iterator { 15 switch input := input.(type) { 16 case query.FloatIterator: 17 return newFloatLimitIterator(input, opt) 18 case query.IntegerIterator: 19 return newIntegerLimitIterator(input, opt) 20 case query.UnsignedIterator: 21 return newUnsignedLimitIterator(input, opt) 22 case query.StringIterator: 23 return newStringLimitIterator(input, opt) 24 case query.BooleanIterator: 25 return newBooleanLimitIterator(input, opt) 26 default: 27 panic(fmt.Sprintf("unsupported limit iterator type: %T", input)) 28 } 29} 30 31type floatCastIntegerCursor struct { 32 cursor integerCursor 33} 34 35func (c *floatCastIntegerCursor) close() error { return c.cursor.close() } 36 37func (c *floatCastIntegerCursor) next() (t int64, v interface{}) { return c.nextFloat() } 38 39func (c *floatCastIntegerCursor) nextFloat() (int64, float64) { 40 t, v := c.cursor.nextInteger() 41 return t, float64(v) 42} 43 44type floatCastUnsignedCursor struct { 45 cursor unsignedCursor 46} 47 48func (c *floatCastUnsignedCursor) close() error { return c.cursor.close() } 49 50func (c *floatCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextFloat() } 51 52func (c *floatCastUnsignedCursor) nextFloat() (int64, float64) { 53 t, v := c.cursor.nextUnsigned() 54 return t, float64(v) 55} 56 57type integerCastFloatCursor struct { 58 cursor floatCursor 59} 60 61func (c *integerCastFloatCursor) close() error { return c.cursor.close() } 62 63func (c *integerCastFloatCursor) next() (t int64, v interface{}) { return c.nextInteger() } 64 65func (c *integerCastFloatCursor) nextInteger() (int64, int64) { 66 t, v := c.cursor.nextFloat() 67 return t, int64(v) 68} 69 70type integerCastUnsignedCursor struct { 71 cursor unsignedCursor 72} 73 74func (c *integerCastUnsignedCursor) close() error { return c.cursor.close() } 75 76func (c *integerCastUnsignedCursor) next() (t int64, v interface{}) { return c.nextInteger() } 77 78func (c *integerCastUnsignedCursor) nextInteger() (int64, int64) { 79 t, v := c.cursor.nextUnsigned() 80 return t, int64(v) 81} 82 83type unsignedCastFloatCursor struct { 84 cursor floatCursor 85} 86 87func (c *unsignedCastFloatCursor) close() error { return c.cursor.close() } 88 89func (c *unsignedCastFloatCursor) next() (t int64, v interface{}) { return c.nextUnsigned() } 90 91func (c *unsignedCastFloatCursor) nextUnsigned() (int64, uint64) { 92 t, v := c.cursor.nextFloat() 93 return t, uint64(v) 94} 95 96type unsignedCastIntegerCursor struct { 97 cursor integerCursor 98} 99 100func (c *unsignedCastIntegerCursor) close() error { return c.cursor.close() } 101 102func (c *unsignedCastIntegerCursor) next() (t int64, v interface{}) { return c.nextUnsigned() } 103 104func (c *unsignedCastIntegerCursor) nextUnsigned() (int64, uint64) { 105 t, v := c.cursor.nextInteger() 106 return t, uint64(v) 107} 108 109// literalValueCursor represents a cursor that always returns a single value. 110// It doesn't not have a time value so it can only be used with nextAt(). 111type literalValueCursor struct { 112 value interface{} 113} 114 115func (c *literalValueCursor) close() error { return nil } 116func (c *literalValueCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } 117func (c *literalValueCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } 118func (c *literalValueCursor) nextAt(seek int64) interface{} { return c.value } 119 120// preallocate and cast to cursorAt to avoid allocations 121var ( 122 nilFloatLiteralValueCursor cursorAt = &literalValueCursor{value: (*float64)(nil)} 123 nilIntegerLiteralValueCursor cursorAt = &literalValueCursor{value: (*int64)(nil)} 124 nilUnsignedLiteralValueCursor cursorAt = &literalValueCursor{value: (*uint64)(nil)} 125 nilStringLiteralValueCursor cursorAt = &literalValueCursor{value: (*string)(nil)} 126 nilBooleanLiteralValueCursor cursorAt = &literalValueCursor{value: (*bool)(nil)} 127) 128 129// stringSliceCursor is a cursor that outputs a slice of string values. 130type stringSliceCursor struct { 131 values []string 132} 133 134func (c *stringSliceCursor) close() error { return nil } 135 136func (c *stringSliceCursor) next() (int64, interface{}) { return c.nextString() } 137 138func (c *stringSliceCursor) nextString() (int64, string) { 139 if len(c.values) == 0 { 140 return tsdb.EOF, "" 141 } 142 143 value := c.values[0] 144 c.values = c.values[1:] 145 return 0, value 146} 147 148type cursorsAt []cursorAt 149 150func (c cursorsAt) close() { 151 for _, cur := range c { 152 cur.close() 153 } 154} 155 156// newMergeFinalizerIterator creates a new Merge iterator from the inputs. If the call to Merge succeeds, 157// the resulting Iterator will be wrapped in a finalizer iterator. 158// If Merge returns an error, the inputs will be closed. 159func newMergeFinalizerIterator(ctx context.Context, inputs []query.Iterator, opt query.IteratorOptions, log *zap.Logger) (query.Iterator, error) { 160 itr, err := query.Iterators(inputs).Merge(opt) 161 if err != nil { 162 query.Iterators(inputs).Close() 163 return nil, err 164 } 165 return newInstrumentedIterator(ctx, newFinalizerIterator(itr, log)), nil 166} 167 168// newFinalizerIterator creates a new iterator that installs a runtime finalizer 169// to ensure close is eventually called if the iterator is garbage collected. 170// This additional guard attempts to protect against clients of CreateIterator not 171// correctly closing them and leaking cursors. 172func newFinalizerIterator(itr query.Iterator, log *zap.Logger) query.Iterator { 173 if itr == nil { 174 return nil 175 } 176 177 switch inner := itr.(type) { 178 case query.FloatIterator: 179 return newFloatFinalizerIterator(inner, log) 180 case query.IntegerIterator: 181 return newIntegerFinalizerIterator(inner, log) 182 case query.UnsignedIterator: 183 return newUnsignedFinalizerIterator(inner, log) 184 case query.StringIterator: 185 return newStringFinalizerIterator(inner, log) 186 case query.BooleanIterator: 187 return newBooleanFinalizerIterator(inner, log) 188 default: 189 panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr)) 190 } 191} 192 193func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iterator { 194 if itr == nil { 195 return nil 196 } 197 198 span := tracing.SpanFromContext(ctx) 199 grp := metrics.GroupFromContext(ctx) 200 if span == nil || grp == nil { 201 return itr 202 } 203 204 switch inner := itr.(type) { 205 case query.FloatIterator: 206 return newFloatInstrumentedIterator(inner, span, grp) 207 case query.IntegerIterator: 208 return newIntegerInstrumentedIterator(inner, span, grp) 209 case query.UnsignedIterator: 210 return newUnsignedInstrumentedIterator(inner, span, grp) 211 case query.StringIterator: 212 return newStringInstrumentedIterator(inner, span, grp) 213 case query.BooleanIterator: 214 return newBooleanInstrumentedIterator(inner, span, grp) 215 default: 216 panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr)) 217 } 218} 219