1package query
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"math"
9	"sort"
10
11	"github.com/gogo/protobuf/proto"
12	internal "github.com/influxdata/influxdb/query/internal"
13	"github.com/influxdata/influxql"
14)
15
16// ZeroTime is the Unix nanosecond timestamp for no time.
17// This time is not used by the query engine or the storage engine as a valid time.
18const ZeroTime = int64(math.MinInt64)
19
20// Point represents a value in a series that occurred at a given time.
21type Point interface {
22	// Name and tags uniquely identify the series the value belongs to.
23	name() string
24	tags() Tags
25
26	// The time that the value occurred at.
27	time() int64
28
29	// The value at the given time.
30	value() interface{}
31
32	// Auxillary values passed along with the value.
33	aux() []interface{}
34}
35
36// Points represents a list of points.
37type Points []Point
38
39// Clone returns a deep copy of a.
40func (a Points) Clone() []Point {
41	other := make([]Point, len(a))
42	for i, p := range a {
43		if p == nil {
44			other[i] = nil
45			continue
46		}
47
48		switch p := p.(type) {
49		case *FloatPoint:
50			other[i] = p.Clone()
51		case *IntegerPoint:
52			other[i] = p.Clone()
53		case *UnsignedPoint:
54			other[i] = p.Clone()
55		case *StringPoint:
56			other[i] = p.Clone()
57		case *BooleanPoint:
58			other[i] = p.Clone()
59		default:
60			panic(fmt.Sprintf("unable to clone point: %T", p))
61		}
62	}
63	return other
64}
65
66// Tags represent a map of keys and values.
67// It memoizes its key so it can be used efficiently during query execution.
68type Tags struct {
69	id string
70	m  map[string]string
71}
72
73// NewTags returns a new instance of Tags.
74func NewTags(m map[string]string) Tags {
75	if len(m) == 0 {
76		return Tags{}
77	}
78	return Tags{
79		id: string(encodeTags(m)),
80		m:  m,
81	}
82}
83
84// newTagsID returns a new instance of Tags by parsing the given tag ID.
85func newTagsID(id string) Tags {
86	m := decodeTags([]byte(id))
87	if len(m) == 0 {
88		return Tags{}
89	}
90	return Tags{id: id, m: m}
91}
92
93// Equal compares if the Tags are equal to each other.
94func (t Tags) Equal(other Tags) bool {
95	return t.ID() == other.ID()
96}
97
98// ID returns the string identifier for the tags.
99func (t Tags) ID() string { return t.id }
100
101// KeyValues returns the underlying map for the tags.
102func (t Tags) KeyValues() map[string]string { return t.m }
103
104// Keys returns a sorted list of all keys on the tag.
105func (t *Tags) Keys() []string {
106	if t == nil {
107		return nil
108	}
109
110	var a []string
111	for k := range t.m {
112		a = append(a, k)
113	}
114	sort.Strings(a)
115	return a
116}
117
118// Values returns a sorted list of all values on the tag.
119func (t *Tags) Values() []string {
120	if t == nil {
121		return nil
122	}
123
124	a := make([]string, 0, len(t.m))
125	for _, v := range t.m {
126		a = append(a, v)
127	}
128	sort.Strings(a)
129	return a
130}
131
132// Value returns the value for a given key.
133func (t *Tags) Value(k string) string {
134	if t == nil {
135		return ""
136	}
137	return t.m[k]
138}
139
140// Subset returns a new tags object with a subset of the keys.
141func (t *Tags) Subset(keys []string) Tags {
142	if len(keys) == 0 {
143		return Tags{}
144	}
145
146	// If keys match existing keys, simply return this tagset.
147	if keysMatch(t.m, keys) {
148		return *t
149	}
150
151	// Otherwise create new tag set.
152	m := make(map[string]string, len(keys))
153	for _, k := range keys {
154		m[k] = t.m[k]
155	}
156	return NewTags(m)
157}
158
159// Equals returns true if t equals other.
160func (t *Tags) Equals(other *Tags) bool {
161	if t == nil && other == nil {
162		return true
163	} else if t == nil || other == nil {
164		return false
165	}
166	return t.id == other.id
167}
168
169// keysMatch returns true if m has exactly the same keys as listed in keys.
170func keysMatch(m map[string]string, keys []string) bool {
171	if len(keys) != len(m) {
172		return false
173	}
174
175	for _, k := range keys {
176		if _, ok := m[k]; !ok {
177			return false
178		}
179	}
180
181	return true
182}
183
184// encodeTags converts a map of strings to an identifier.
185func encodeTags(m map[string]string) []byte {
186	// Empty maps marshal to empty bytes.
187	if len(m) == 0 {
188		return nil
189	}
190
191	// Extract keys and determine final size.
192	sz := (len(m) * 2) - 1 // separators
193	keys := make([]string, 0, len(m))
194	for k, v := range m {
195		keys = append(keys, k)
196		sz += len(k) + len(v)
197	}
198	sort.Strings(keys)
199
200	// Generate marshaled bytes.
201	b := make([]byte, sz)
202	buf := b
203	for _, k := range keys {
204		copy(buf, k)
205		buf[len(k)] = '\x00'
206		buf = buf[len(k)+1:]
207	}
208	for i, k := range keys {
209		v := m[k]
210		copy(buf, v)
211		if i < len(keys)-1 {
212			buf[len(v)] = '\x00'
213			buf = buf[len(v)+1:]
214		}
215	}
216	return b
217}
218
219// decodeTags parses an identifier into a map of tags.
220func decodeTags(id []byte) map[string]string {
221	a := bytes.Split(id, []byte{'\x00'})
222
223	// There must be an even number of segments.
224	if len(a) > 0 && len(a)%2 == 1 {
225		a = a[:len(a)-1]
226	}
227
228	// Return nil if there are no segments.
229	if len(a) == 0 {
230		return nil
231	}
232	mid := len(a) / 2
233
234	// Decode key/value tags.
235	m := make(map[string]string)
236	for i := 0; i < mid; i++ {
237		m[string(a[i])] = string(a[i+mid])
238	}
239	return m
240}
241
242func encodeAux(aux []interface{}) []*internal.Aux {
243	pb := make([]*internal.Aux, len(aux))
244	for i := range aux {
245		switch v := aux[i].(type) {
246		case float64:
247			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Float)), FloatValue: proto.Float64(v)}
248		case *float64:
249			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Float))}
250		case int64:
251			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Integer)), IntegerValue: proto.Int64(v)}
252		case *int64:
253			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Integer))}
254		case uint64:
255			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unsigned)), UnsignedValue: proto.Uint64(v)}
256		case *uint64:
257			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unsigned))}
258		case string:
259			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.String)), StringValue: proto.String(v)}
260		case *string:
261			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.String))}
262		case bool:
263			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Boolean)), BooleanValue: proto.Bool(v)}
264		case *bool:
265			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Boolean))}
266		default:
267			pb[i] = &internal.Aux{DataType: proto.Int32(int32(influxql.Unknown))}
268		}
269	}
270	return pb
271}
272
273func decodeAux(pb []*internal.Aux) []interface{} {
274	if len(pb) == 0 {
275		return nil
276	}
277
278	aux := make([]interface{}, len(pb))
279	for i := range pb {
280		switch influxql.DataType(pb[i].GetDataType()) {
281		case influxql.Float:
282			if pb[i].FloatValue != nil {
283				aux[i] = *pb[i].FloatValue
284			} else {
285				aux[i] = (*float64)(nil)
286			}
287		case influxql.Integer:
288			if pb[i].IntegerValue != nil {
289				aux[i] = *pb[i].IntegerValue
290			} else {
291				aux[i] = (*int64)(nil)
292			}
293		case influxql.Unsigned:
294			if pb[i].UnsignedValue != nil {
295				aux[i] = *pb[i].UnsignedValue
296			} else {
297				aux[i] = (*uint64)(nil)
298			}
299		case influxql.String:
300			if pb[i].StringValue != nil {
301				aux[i] = *pb[i].StringValue
302			} else {
303				aux[i] = (*string)(nil)
304			}
305		case influxql.Boolean:
306			if pb[i].BooleanValue != nil {
307				aux[i] = *pb[i].BooleanValue
308			} else {
309				aux[i] = (*bool)(nil)
310			}
311		default:
312			aux[i] = nil
313		}
314	}
315	return aux
316}
317
318func cloneAux(src []interface{}) []interface{} {
319	if src == nil {
320		return src
321	}
322	dest := make([]interface{}, len(src))
323	copy(dest, src)
324	return dest
325}
326
327// PointDecoder decodes generic points from a reader.
328type PointDecoder struct {
329	r     io.Reader
330	stats IteratorStats
331}
332
333// NewPointDecoder returns a new instance of PointDecoder that reads from r.
334func NewPointDecoder(r io.Reader) *PointDecoder {
335	return &PointDecoder{r: r}
336}
337
338// Stats returns iterator stats embedded within the stream.
339func (dec *PointDecoder) Stats() IteratorStats { return dec.stats }
340
341// DecodePoint reads from the underlying reader and unmarshals into p.
342func (dec *PointDecoder) DecodePoint(p *Point) error {
343	for {
344		// Read length.
345		var sz uint32
346		if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
347			return err
348		}
349
350		// Read point data.
351		buf := make([]byte, sz)
352		if _, err := io.ReadFull(dec.r, buf); err != nil {
353			return err
354		}
355
356		// Unmarshal into point.
357		var pb internal.Point
358		if err := proto.Unmarshal(buf, &pb); err != nil {
359			return err
360		}
361
362		// If the point contains stats then read stats and retry.
363		if pb.Stats != nil {
364			dec.stats = decodeIteratorStats(pb.Stats)
365			continue
366		}
367
368		if pb.IntegerValue != nil {
369			*p = decodeIntegerPoint(&pb)
370		} else if pb.UnsignedValue != nil {
371			*p = decodeUnsignedPoint(&pb)
372		} else if pb.StringValue != nil {
373			*p = decodeStringPoint(&pb)
374		} else if pb.BooleanValue != nil {
375			*p = decodeBooleanPoint(&pb)
376		} else {
377			*p = decodeFloatPoint(&pb)
378		}
379
380		return nil
381	}
382}
383