1package query 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9 "sort" 10 11 "github.com/gogo/protobuf/proto" 12 internal "github.com/influxdata/influxdb/query/internal" 13 "github.com/influxdata/influxql" 14) 15 16// ZeroTime is the Unix nanosecond timestamp for no time. 17// This time is not used by the query engine or the storage engine as a valid time. 18const ZeroTime = int64(math.MinInt64) 19 20// Point represents a value in a series that occurred at a given time. 21type Point interface { 22 // Name and tags uniquely identify the series the value belongs to. 23 name() string 24 tags() Tags 25 26 // The time that the value occurred at. 27 time() int64 28 29 // The value at the given time. 30 value() interface{} 31 32 // Auxillary values passed along with the value. 33 aux() []interface{} 34} 35 36// Points represents a list of points. 37type Points []Point 38 39// Clone returns a deep copy of a. 40func (a Points) Clone() []Point { 41 other := make([]Point, len(a)) 42 for i, p := range a { 43 if p == nil { 44 other[i] = nil 45 continue 46 } 47 48 switch p := p.(type) { 49 case *FloatPoint: 50 other[i] = p.Clone() 51 case *IntegerPoint: 52 other[i] = p.Clone() 53 case *UnsignedPoint: 54 other[i] = p.Clone() 55 case *StringPoint: 56 other[i] = p.Clone() 57 case *BooleanPoint: 58 other[i] = p.Clone() 59 default: 60 panic(fmt.Sprintf("unable to clone point: %T", p)) 61 } 62 } 63 return other 64} 65 66// Tags represent a map of keys and values. 67// It memoizes its key so it can be used efficiently during query execution. 68type Tags struct { 69 id string 70 m map[string]string 71} 72 73// NewTags returns a new instance of Tags. 74func NewTags(m map[string]string) Tags { 75 if len(m) == 0 { 76 return Tags{} 77 } 78 return Tags{ 79 id: string(encodeTags(m)), 80 m: m, 81 } 82} 83 84// newTagsID returns a new instance of Tags by parsing the given tag ID. 85func newTagsID(id string) Tags { 86 m := decodeTags([]byte(id)) 87 if len(m) == 0 { 88 return Tags{} 89 } 90 return Tags{id: id, m: m} 91} 92 93// Equal compares if the Tags are equal to each other. 94func (t Tags) Equal(other Tags) bool { 95 return t.ID() == other.ID() 96} 97 98// ID returns the string identifier for the tags. 99func (t Tags) ID() string { return t.id } 100 101// KeyValues returns the underlying map for the tags. 102func (t Tags) KeyValues() map[string]string { return t.m } 103 104// Keys returns a sorted list of all keys on the tag. 105func (t *Tags) Keys() []string { 106 if t == nil { 107 return nil 108 } 109 110 var a []string 111 for k := range t.m { 112 a = append(a, k) 113 } 114 sort.Strings(a) 115 return a 116} 117 118// Values returns a sorted list of all values on the tag. 119func (t *Tags) Values() []string { 120 if t == nil { 121 return nil 122 } 123 124 a := make([]string, 0, len(t.m)) 125 for _, v := range t.m { 126 a = append(a, v) 127 } 128 sort.Strings(a) 129 return a 130} 131 132// Value returns the value for a given key. 133func (t *Tags) Value(k string) string { 134 if t == nil { 135 return "" 136 } 137 return t.m[k] 138} 139 140// Subset returns a new tags object with a subset of the keys. 141func (t *Tags) Subset(keys []string) Tags { 142 if len(keys) == 0 { 143 return Tags{} 144 } 145 146 // If keys match existing keys, simply return this tagset. 147 if keysMatch(t.m, keys) { 148 return *t 149 } 150 151 // Otherwise create new tag set. 152 m := make(map[string]string, len(keys)) 153 for _, k := range keys { 154 m[k] = t.m[k] 155 } 156 return NewTags(m) 157} 158 159// Equals returns true if t equals other. 160func (t *Tags) Equals(other *Tags) bool { 161 if t == nil && other == nil { 162 return true 163 } else if t == nil || other == nil { 164 return false 165 } 166 return t.id == other.id 167} 168 169// keysMatch returns true if m has exactly the same keys as listed in keys. 170func keysMatch(m map[string]string, keys []string) bool { 171 if len(keys) != len(m) { 172 return false 173 } 174 175 for _, k := range keys { 176 if _, ok := m[k]; !ok { 177 return false 178 } 179 } 180 181 return true 182} 183 184// encodeTags converts a map of strings to an identifier. 185func encodeTags(m map[string]string) []byte { 186 // Empty maps marshal to empty bytes. 187 if len(m) == 0 { 188 return nil 189 } 190 191 // Extract keys and determine final size. 192 sz := (len(m) * 2) - 1 // separators 193 keys := make([]string, 0, len(m)) 194 for k, v := range m { 195 keys = append(keys, k) 196 sz += len(k) + len(v) 197 } 198 sort.Strings(keys) 199 200 // Generate marshaled bytes. 201 b := make([]byte, sz) 202 buf := b 203 for _, k := range keys { 204 copy(buf, k) 205 buf[len(k)] = '\x00' 206 buf = buf[len(k)+1:] 207 } 208 for i, k := range keys { 209 v := m[k] 210 copy(buf, v) 211 if i < len(keys)-1 { 212 buf[len(v)] = '\x00' 213 buf = buf[len(v)+1:] 214 } 215 } 216 return b 217} 218 219// decodeTags parses an identifier into a map of tags. 220func decodeTags(id []byte) map[string]string { 221 a := bytes.Split(id, []byte{'\x00'}) 222 223 // There must be an even number of segments. 224 if len(a) > 0 && len(a)%2 == 1 { 225 a = a[:len(a)-1] 226 } 227 228 // Return nil if there are no segments. 229 if len(a) == 0 { 230 return nil 231 } 232 mid := len(a) / 2 233 234 // Decode key/value tags. 235 m := make(map[string]string) 236 for i := 0; i < mid; i++ { 237 m[string(a[i])] = string(a[i+mid]) 238 } 239 return m 240} 241 242func encodeAux(aux []interface{}) []*internal.Aux { 243 pb := make([]*internal.Aux, len(aux)) 244 for i := range aux { 245 switch v := aux[i].(type) { 246 case float64: 247 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Float)), FloatValue: proto.Float64(v)} 248 case *float64: 249 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Float))} 250 case int64: 251 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Integer)), IntegerValue: proto.Int64(v)} 252 case *int64: 253 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Integer))} 254 case uint64: 255 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unsigned)), UnsignedValue: proto.Uint64(v)} 256 case *uint64: 257 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unsigned))} 258 case string: 259 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.String)), StringValue: proto.String(v)} 260 case *string: 261 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.String))} 262 case bool: 263 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Boolean)), BooleanValue: proto.Bool(v)} 264 case *bool: 265 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Boolean))} 266 default: 267 pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unknown))} 268 } 269 } 270 return pb 271} 272 273func decodeAux(pb []*internal.Aux) []interface{} { 274 if len(pb) == 0 { 275 return nil 276 } 277 278 aux := make([]interface{}, len(pb)) 279 for i := range pb { 280 switch influxql.DataType(pb[i].GetDataType()) { 281 case influxql.Float: 282 if pb[i].FloatValue != nil { 283 aux[i] = *pb[i].FloatValue 284 } else { 285 aux[i] = (*float64)(nil) 286 } 287 case influxql.Integer: 288 if pb[i].IntegerValue != nil { 289 aux[i] = *pb[i].IntegerValue 290 } else { 291 aux[i] = (*int64)(nil) 292 } 293 case influxql.Unsigned: 294 if pb[i].UnsignedValue != nil { 295 aux[i] = *pb[i].UnsignedValue 296 } else { 297 aux[i] = (*uint64)(nil) 298 } 299 case influxql.String: 300 if pb[i].StringValue != nil { 301 aux[i] = *pb[i].StringValue 302 } else { 303 aux[i] = (*string)(nil) 304 } 305 case influxql.Boolean: 306 if pb[i].BooleanValue != nil { 307 aux[i] = *pb[i].BooleanValue 308 } else { 309 aux[i] = (*bool)(nil) 310 } 311 default: 312 aux[i] = nil 313 } 314 } 315 return aux 316} 317 318func cloneAux(src []interface{}) []interface{} { 319 if src == nil { 320 return src 321 } 322 dest := make([]interface{}, len(src)) 323 copy(dest, src) 324 return dest 325} 326 327// PointDecoder decodes generic points from a reader. 328type PointDecoder struct { 329 r io.Reader 330 stats IteratorStats 331} 332 333// NewPointDecoder returns a new instance of PointDecoder that reads from r. 334func NewPointDecoder(r io.Reader) *PointDecoder { 335 return &PointDecoder{r: r} 336} 337 338// Stats returns iterator stats embedded within the stream. 339func (dec *PointDecoder) Stats() IteratorStats { return dec.stats } 340 341// DecodePoint reads from the underlying reader and unmarshals into p. 342func (dec *PointDecoder) DecodePoint(p *Point) error { 343 for { 344 // Read length. 345 var sz uint32 346 if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { 347 return err 348 } 349 350 // Read point data. 351 buf := make([]byte, sz) 352 if _, err := io.ReadFull(dec.r, buf); err != nil { 353 return err 354 } 355 356 // Unmarshal into point. 357 var pb internal.Point 358 if err := proto.Unmarshal(buf, &pb); err != nil { 359 return err 360 } 361 362 // If the point contains stats then read stats and retry. 363 if pb.Stats != nil { 364 dec.stats = decodeIteratorStats(pb.Stats) 365 continue 366 } 367 368 if pb.IntegerValue != nil { 369 *p = decodeIntegerPoint(&pb) 370 } else if pb.UnsignedValue != nil { 371 *p = decodeUnsignedPoint(&pb) 372 } else if pb.StringValue != nil { 373 *p = decodeStringPoint(&pb) 374 } else if pb.BooleanValue != nil { 375 *p = decodeBooleanPoint(&pb) 376 } else { 377 *p = decodeFloatPoint(&pb) 378 } 379 380 return nil 381 } 382} 383