1package reads
2
3import (
4	"context"
5	"fmt"
6	"strings"
7
8	"github.com/gogo/protobuf/types"
9	"github.com/influxdata/flux"
10	"github.com/influxdata/flux/execute"
11	"github.com/influxdata/flux/memory"
12	"github.com/influxdata/flux/values"
13	"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
14	"github.com/influxdata/influxdb/models"
15	"github.com/influxdata/influxdb/storage/reads/datatypes"
16	"github.com/influxdata/influxdb/tsdb/cursors"
17)
18
19type storageTable interface {
20	flux.Table
21	Close()
22	Cancel()
23	Statistics() cursors.CursorStats
24}
25
26type storeReader struct {
27	s Store
28}
29
30func NewReader(s Store) influxdb.Reader {
31	return &storeReader{s: s}
32}
33
34func (r *storeReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
35	return &filterIterator{
36		ctx:   ctx,
37		s:     r.s,
38		spec:  spec,
39		cache: newTagsCache(0),
40		alloc: alloc,
41	}, nil
42}
43
44func (r *storeReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
45	return &groupIterator{
46		ctx:   ctx,
47		s:     r.s,
48		spec:  spec,
49		cache: newTagsCache(0),
50		alloc: alloc,
51	}, nil
52}
53
54func (r *storeReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
55	var predicate *datatypes.Predicate
56	if spec.Predicate != nil {
57		p, err := toStoragePredicate(spec.Predicate)
58		if err != nil {
59			return nil, err
60		}
61		predicate = p
62	}
63
64	return &tagKeysIterator{
65		ctx:       ctx,
66		bounds:    spec.Bounds,
67		s:         r.s,
68		readSpec:  spec,
69		predicate: predicate,
70		alloc:     alloc,
71	}, nil
72}
73
74func (r *storeReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
75	var predicate *datatypes.Predicate
76	if spec.Predicate != nil {
77		p, err := toStoragePredicate(spec.Predicate)
78		if err != nil {
79			return nil, err
80		}
81		predicate = p
82	}
83
84	return &tagValuesIterator{
85		ctx:       ctx,
86		bounds:    spec.Bounds,
87		s:         r.s,
88		readSpec:  spec,
89		predicate: predicate,
90		alloc:     alloc,
91	}, nil
92}
93
94func (r *storeReader) Close() {}
95
96type filterIterator struct {
97	ctx   context.Context
98	s     Store
99	spec  influxdb.ReadFilterSpec
100	stats cursors.CursorStats
101	cache *tagsCache
102	alloc *memory.Allocator
103}
104
105func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
106
107func (fi *filterIterator) Do(f func(flux.Table) error) error {
108	src := fi.s.GetSource(
109		fi.spec.Database,
110		fi.spec.RetentionPolicy,
111	)
112
113	// Setup read request
114	any, err := types.MarshalAny(src)
115	if err != nil {
116		return err
117	}
118
119	var predicate *datatypes.Predicate
120	if fi.spec.Predicate != nil {
121		p, err := toStoragePredicate(fi.spec.Predicate)
122		if err != nil {
123			return err
124		}
125		predicate = p
126	}
127
128	var req datatypes.ReadFilterRequest
129	req.ReadSource = any
130	req.Predicate = predicate
131	req.Range.Start = int64(fi.spec.Bounds.Start)
132	req.Range.End = int64(fi.spec.Bounds.Stop)
133
134	rs, err := fi.s.ReadFilter(fi.ctx, &req)
135	if err != nil {
136		return err
137	}
138
139	if rs == nil {
140		return nil
141	}
142
143	return fi.handleRead(f, rs)
144}
145
146func (fi *filterIterator) handleRead(f func(flux.Table) error, rs ResultSet) error {
147	// these resources must be closed if not nil on return
148	var (
149		cur   cursors.Cursor
150		table storageTable
151	)
152
153	defer func() {
154		if table != nil {
155			table.Close()
156		}
157		if cur != nil {
158			cur.Close()
159		}
160		rs.Close()
161		fi.cache.Release()
162	}()
163
164READ:
165	for rs.Next() {
166		cur = rs.Cursor()
167		if cur == nil {
168			// no data for series key + field combination
169			continue
170		}
171
172		bnds := fi.spec.Bounds
173		key := defaultGroupKeyForSeries(rs.Tags(), bnds)
174		done := make(chan struct{})
175		switch typedCur := cur.(type) {
176		case cursors.IntegerArrayCursor:
177			cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
178			table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
179		case cursors.FloatArrayCursor:
180			cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
181			table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
182		case cursors.UnsignedArrayCursor:
183			cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
184			table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
185		case cursors.BooleanArrayCursor:
186			cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
187			table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
188		case cursors.StringArrayCursor:
189			cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
190			table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
191		default:
192			panic(fmt.Sprintf("unreachable: %T", typedCur))
193		}
194
195		cur = nil
196
197		if !table.Empty() {
198			if err := f(table); err != nil {
199				table.Close()
200				table = nil
201				return err
202			}
203			select {
204			case <-done:
205			case <-fi.ctx.Done():
206				table.Cancel()
207				break READ
208			}
209		}
210
211		stats := table.Statistics()
212		fi.stats.ScannedValues += stats.ScannedValues
213		fi.stats.ScannedBytes += stats.ScannedBytes
214		table.Close()
215		table = nil
216	}
217	return rs.Err()
218}
219
220type groupIterator struct {
221	ctx   context.Context
222	s     Store
223	spec  influxdb.ReadGroupSpec
224	stats cursors.CursorStats
225	cache *tagsCache
226	alloc *memory.Allocator
227}
228
229func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
230
231func (gi *groupIterator) Do(f func(flux.Table) error) error {
232	src := gi.s.GetSource(
233		gi.spec.Database,
234		gi.spec.RetentionPolicy,
235	)
236
237	// Setup read request
238	any, err := types.MarshalAny(src)
239	if err != nil {
240		return err
241	}
242
243	var predicate *datatypes.Predicate
244	if gi.spec.Predicate != nil {
245		p, err := toStoragePredicate(gi.spec.Predicate)
246		if err != nil {
247			return err
248		}
249		predicate = p
250	}
251
252	var req datatypes.ReadGroupRequest
253	req.ReadSource = any
254	req.Predicate = predicate
255	req.Range.Start = int64(gi.spec.Bounds.Start)
256	req.Range.End = int64(gi.spec.Bounds.Stop)
257
258	req.Group = convertGroupMode(gi.spec.GroupMode)
259	req.GroupKeys = gi.spec.GroupKeys
260
261	if agg, err := determineAggregateMethod(gi.spec.AggregateMethod); err != nil {
262		return err
263	} else if agg != datatypes.AggregateTypeNone {
264		req.Aggregate = &datatypes.Aggregate{Type: agg}
265	}
266
267	rs, err := gi.s.ReadGroup(gi.ctx, &req)
268	if err != nil {
269		return err
270	}
271
272	if rs == nil {
273		return nil
274	}
275	return gi.handleRead(f, rs)
276}
277
278func (gi *groupIterator) handleRead(f func(flux.Table) error, rs GroupResultSet) error {
279	// these resources must be closed if not nil on return
280	var (
281		gc    GroupCursor
282		cur   cursors.Cursor
283		table storageTable
284	)
285
286	defer func() {
287		if table != nil {
288			table.Close()
289		}
290		if cur != nil {
291			cur.Close()
292		}
293		if gc != nil {
294			gc.Close()
295		}
296		rs.Close()
297		gi.cache.Release()
298	}()
299
300	gc = rs.Next()
301READ:
302	for gc != nil {
303		for gc.Next() {
304			cur = gc.Cursor()
305			if cur != nil {
306				break
307			}
308		}
309
310		if cur == nil {
311			gc.Close()
312			gc = rs.Next()
313			continue
314		}
315
316		bnds := gi.spec.Bounds
317		key := groupKeyForGroup(gc.PartitionKeyVals(), &gi.spec, bnds)
318		done := make(chan struct{})
319		switch typedCur := cur.(type) {
320		case cursors.IntegerArrayCursor:
321			cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt)
322			table = newIntegerGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
323		case cursors.FloatArrayCursor:
324			cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat)
325			table = newFloatGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
326		case cursors.UnsignedArrayCursor:
327			cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt)
328			table = newUnsignedGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
329		case cursors.BooleanArrayCursor:
330			cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool)
331			table = newBooleanGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
332		case cursors.StringArrayCursor:
333			cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
334			table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
335		default:
336			panic(fmt.Sprintf("unreachable: %T", typedCur))
337		}
338
339		// table owns these resources and is responsible for closing them
340		cur = nil
341		gc = nil
342
343		if err := f(table); err != nil {
344			table.Close()
345			table = nil
346			return err
347		}
348		select {
349		case <-done:
350		case <-gi.ctx.Done():
351			table.Cancel()
352			break READ
353		}
354
355		stats := table.Statistics()
356		gi.stats.ScannedValues += stats.ScannedValues
357		gi.stats.ScannedBytes += stats.ScannedBytes
358		table.Close()
359		table = nil
360
361		gc = rs.Next()
362	}
363	return rs.Err()
364}
365
366func determineAggregateMethod(agg string) (datatypes.Aggregate_AggregateType, error) {
367	if agg == "" {
368		return datatypes.AggregateTypeNone, nil
369	}
370
371	if t, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(agg)]; ok {
372		return datatypes.Aggregate_AggregateType(t), nil
373	}
374	return 0, fmt.Errorf("unknown aggregate type %q", agg)
375}
376
377func convertGroupMode(m influxdb.GroupMode) datatypes.ReadGroupRequest_Group {
378	switch m {
379	case influxdb.GroupModeNone:
380		return datatypes.GroupNone
381	case influxdb.GroupModeBy:
382		return datatypes.GroupBy
383	}
384	panic(fmt.Sprint("invalid group mode: ", m))
385}
386
387const (
388	startColIdx = 0
389	stopColIdx  = 1
390	timeColIdx  = 2
391	valueColIdx = 3
392)
393
394func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
395	cols := make([]flux.ColMeta, 4+len(tags))
396	defs := make([][]byte, 4+len(tags))
397	cols[startColIdx] = flux.ColMeta{
398		Label: execute.DefaultStartColLabel,
399		Type:  flux.TTime,
400	}
401	cols[stopColIdx] = flux.ColMeta{
402		Label: execute.DefaultStopColLabel,
403		Type:  flux.TTime,
404	}
405	cols[timeColIdx] = flux.ColMeta{
406		Label: execute.DefaultTimeColLabel,
407		Type:  flux.TTime,
408	}
409	cols[valueColIdx] = flux.ColMeta{
410		Label: execute.DefaultValueColLabel,
411		Type:  typ,
412	}
413	for j, tag := range tags {
414		cols[4+j] = flux.ColMeta{
415			Label: string(tag.Key),
416			Type:  flux.TString,
417		}
418		defs[4+j] = []byte("")
419	}
420	return cols, defs
421}
422
423func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupKey {
424	cols := make([]flux.ColMeta, 2, len(tags)+2)
425	vs := make([]values.Value, 2, len(tags)+2)
426	cols[0] = flux.ColMeta{
427		Label: execute.DefaultStartColLabel,
428		Type:  flux.TTime,
429	}
430	vs[0] = values.NewTime(bnds.Start)
431	cols[1] = flux.ColMeta{
432		Label: execute.DefaultStopColLabel,
433		Type:  flux.TTime,
434	}
435	vs[1] = values.NewTime(bnds.Stop)
436	for i := range tags {
437		cols = append(cols, flux.ColMeta{
438			Label: string(tags[i].Key),
439			Type:  flux.TString,
440		})
441		vs = append(vs, values.NewString(string(tags[i].Value)))
442	}
443	return execute.NewGroupKey(cols, vs)
444}
445
446func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
447	cols := make([]flux.ColMeta, 4+len(tagKeys))
448	defs := make([][]byte, 4+len(tagKeys))
449	cols[startColIdx] = flux.ColMeta{
450		Label: execute.DefaultStartColLabel,
451		Type:  flux.TTime,
452	}
453	cols[stopColIdx] = flux.ColMeta{
454		Label: execute.DefaultStopColLabel,
455		Type:  flux.TTime,
456	}
457	cols[timeColIdx] = flux.ColMeta{
458		Label: execute.DefaultTimeColLabel,
459		Type:  flux.TTime,
460	}
461	cols[valueColIdx] = flux.ColMeta{
462		Label: execute.DefaultValueColLabel,
463		Type:  typ,
464	}
465	for j, tag := range tagKeys {
466		cols[4+j] = flux.ColMeta{
467			Label: string(tag),
468			Type:  flux.TString,
469		}
470		defs[4+j] = []byte("")
471
472	}
473	return cols, defs
474}
475
476func groupKeyForGroup(kv [][]byte, spec *influxdb.ReadGroupSpec, bnds execute.Bounds) flux.GroupKey {
477	cols := make([]flux.ColMeta, 2, len(spec.GroupKeys)+2)
478	vs := make([]values.Value, 2, len(spec.GroupKeys)+2)
479	cols[0] = flux.ColMeta{
480		Label: execute.DefaultStartColLabel,
481		Type:  flux.TTime,
482	}
483	vs[0] = values.NewTime(bnds.Start)
484	cols[1] = flux.ColMeta{
485		Label: execute.DefaultStopColLabel,
486		Type:  flux.TTime,
487	}
488	vs[1] = values.NewTime(bnds.Stop)
489	for i := range spec.GroupKeys {
490		if spec.GroupKeys[i] == execute.DefaultStartColLabel || spec.GroupKeys[i] == execute.DefaultStopColLabel {
491			continue
492		}
493		cols = append(cols, flux.ColMeta{
494			Label: spec.GroupKeys[i],
495			Type:  flux.TString,
496		})
497		vs = append(vs, values.NewString(string(kv[i])))
498	}
499	return execute.NewGroupKey(cols, vs)
500}
501
502type tagKeysIterator struct {
503	ctx       context.Context
504	bounds    execute.Bounds
505	s         Store
506	readSpec  influxdb.ReadTagKeysSpec
507	predicate *datatypes.Predicate
508	alloc     *memory.Allocator
509}
510
511func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
512	src := ti.s.GetSource(
513		ti.readSpec.Database,
514		ti.readSpec.RetentionPolicy,
515	)
516
517	var req datatypes.TagKeysRequest
518	if any, err := types.MarshalAny(src); err != nil {
519		return err
520	} else {
521		req.TagsSource = any
522	}
523	req.Predicate = ti.predicate
524	req.Range.Start = int64(ti.bounds.Start)
525	req.Range.End = int64(ti.bounds.Stop)
526
527	rs, err := ti.s.TagKeys(ti.ctx, &req)
528	if err != nil {
529		return err
530	}
531	return ti.handleRead(f, rs)
532}
533
534func (ti *tagKeysIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error {
535	key := execute.NewGroupKey(nil, nil)
536	builder := execute.NewColListTableBuilder(key, ti.alloc)
537	valueIdx, err := builder.AddCol(flux.ColMeta{
538		Label: execute.DefaultValueColLabel,
539		Type:  flux.TString,
540	})
541	if err != nil {
542		return err
543	}
544	defer builder.ClearData()
545
546	// Add the _start and _stop columns that come from storage.
547	if err := builder.AppendString(valueIdx, "_start"); err != nil {
548		return err
549	}
550	if err := builder.AppendString(valueIdx, "_stop"); err != nil {
551		return err
552	}
553
554	for rs.Next() {
555		v := rs.Value()
556		switch v {
557		case models.MeasurementTagKey:
558			v = "_measurement"
559		case models.FieldKeyTagKey:
560			v = "_field"
561		}
562
563		if err := builder.AppendString(valueIdx, v); err != nil {
564			return err
565		}
566	}
567
568	// Construct the table and add to the reference count
569	// so we can free the table later.
570	tbl, err := builder.Table()
571	if err != nil {
572		return err
573	}
574
575	// Release the references to the arrays held by the builder.
576	builder.ClearData()
577	return f(tbl)
578}
579
580func (ti *tagKeysIterator) Statistics() cursors.CursorStats {
581	return cursors.CursorStats{}
582}
583
584type tagValuesIterator struct {
585	ctx       context.Context
586	bounds    execute.Bounds
587	s         Store
588	readSpec  influxdb.ReadTagValuesSpec
589	predicate *datatypes.Predicate
590	alloc     *memory.Allocator
591}
592
593func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
594	src := ti.s.GetSource(
595		ti.readSpec.Database,
596		ti.readSpec.RetentionPolicy,
597	)
598
599	var req datatypes.TagValuesRequest
600	if any, err := types.MarshalAny(src); err != nil {
601		return err
602	} else {
603		req.TagsSource = any
604	}
605	switch ti.readSpec.TagKey {
606	case "_measurement":
607		req.TagKey = models.MeasurementTagKey
608	case "_field":
609		req.TagKey = models.FieldKeyTagKey
610	default:
611		req.TagKey = ti.readSpec.TagKey
612	}
613	req.Predicate = ti.predicate
614	req.Range.Start = int64(ti.bounds.Start)
615	req.Range.End = int64(ti.bounds.Stop)
616
617	rs, err := ti.s.TagValues(ti.ctx, &req)
618	if err != nil {
619		return err
620	}
621	return ti.handleRead(f, rs)
622}
623
624func (ti *tagValuesIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error {
625	key := execute.NewGroupKey(nil, nil)
626	builder := execute.NewColListTableBuilder(key, ti.alloc)
627	valueIdx, err := builder.AddCol(flux.ColMeta{
628		Label: execute.DefaultValueColLabel,
629		Type:  flux.TString,
630	})
631	if err != nil {
632		return err
633	}
634	defer builder.ClearData()
635
636	for rs.Next() {
637		if err := builder.AppendString(valueIdx, rs.Value()); err != nil {
638			return err
639		}
640	}
641
642	// Construct the table and add to the reference count
643	// so we can free the table later.
644	tbl, err := builder.Table()
645	if err != nil {
646		return err
647	}
648
649	// Release the references to the arrays held by the builder.
650	builder.ClearData()
651	return f(tbl)
652}
653
654func (ti *tagValuesIterator) Statistics() cursors.CursorStats {
655	return cursors.CursorStats{}
656}
657