1package coordinator
2
3import (
4	"context"
5	"io"
6	"time"
7
8	"github.com/influxdata/influxdb/query"
9	"github.com/influxdata/influxdb/services/meta"
10	"github.com/influxdata/influxdb/tsdb"
11	"github.com/influxdata/influxql"
12)
13
14// IteratorCreator is an interface that combines mapping fields and creating iterators.
15type IteratorCreator interface {
16	query.IteratorCreator
17	influxql.FieldMapper
18	io.Closer
19}
20
21// LocalShardMapper implements a ShardMapper for local shards.
22type LocalShardMapper struct {
23	MetaClient interface {
24		ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
25	}
26
27	TSDBStore interface {
28		ShardGroup(ids []uint64) tsdb.ShardGroup
29	}
30}
31
32// MapShards maps the sources to the appropriate shards into an IteratorCreator.
33func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions) (query.ShardGroup, error) {
34	a := &LocalShardMapping{
35		ShardMap: make(map[Source]tsdb.ShardGroup),
36	}
37
38	tmin := time.Unix(0, t.MinTimeNano())
39	tmax := time.Unix(0, t.MaxTimeNano())
40	if err := e.mapShards(a, sources, tmin, tmax); err != nil {
41		return nil, err
42	}
43	a.MinTime, a.MaxTime = tmin, tmax
44	return a, nil
45}
46
47func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sources, tmin, tmax time.Time) error {
48	for _, s := range sources {
49		switch s := s.(type) {
50		case *influxql.Measurement:
51			source := Source{
52				Database:        s.Database,
53				RetentionPolicy: s.RetentionPolicy,
54			}
55			// Retrieve the list of shards for this database. This list of
56			// shards is always the same regardless of which measurement we are
57			// using.
58			if _, ok := a.ShardMap[source]; !ok {
59				groups, err := e.MetaClient.ShardGroupsByTimeRange(s.Database, s.RetentionPolicy, tmin, tmax)
60				if err != nil {
61					return err
62				}
63
64				if len(groups) == 0 {
65					a.ShardMap[source] = nil
66					continue
67				}
68
69				shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups))
70				for _, g := range groups {
71					for _, si := range g.Shards {
72						shardIDs = append(shardIDs, si.ID)
73					}
74				}
75				a.ShardMap[source] = e.TSDBStore.ShardGroup(shardIDs)
76			}
77		case *influxql.SubQuery:
78			if err := e.mapShards(a, s.Statement.Sources, tmin, tmax); err != nil {
79				return err
80			}
81		}
82	}
83	return nil
84}
85
86// ShardMapper maps data sources to a list of shard information.
87type LocalShardMapping struct {
88	ShardMap map[Source]tsdb.ShardGroup
89
90	// MinTime is the minimum time that this shard mapper will allow.
91	// Any attempt to use a time before this one will automatically result in using
92	// this time instead.
93	MinTime time.Time
94
95	// MaxTime is the maximum time that this shard mapper will allow.
96	// Any attempt to use a time after this one will automatically result in using
97	// this time instead.
98	MaxTime time.Time
99}
100
101func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
102	source := Source{
103		Database:        m.Database,
104		RetentionPolicy: m.RetentionPolicy,
105	}
106
107	sg := a.ShardMap[source]
108	if sg == nil {
109		return
110	}
111
112	fields = make(map[string]influxql.DataType)
113	dimensions = make(map[string]struct{})
114
115	var measurements []string
116	if m.Regex != nil {
117		measurements = sg.MeasurementsByRegex(m.Regex.Val)
118	} else {
119		measurements = []string{m.Name}
120	}
121
122	f, d, err := sg.FieldDimensions(measurements)
123	if err != nil {
124		return nil, nil, err
125	}
126	for k, typ := range f {
127		fields[k] = typ
128	}
129	for k := range d {
130		dimensions[k] = struct{}{}
131	}
132	return
133}
134
135func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType {
136	source := Source{
137		Database:        m.Database,
138		RetentionPolicy: m.RetentionPolicy,
139	}
140
141	sg := a.ShardMap[source]
142	if sg == nil {
143		return influxql.Unknown
144	}
145
146	var names []string
147	if m.Regex != nil {
148		names = sg.MeasurementsByRegex(m.Regex.Val)
149	} else {
150		names = []string{m.Name}
151	}
152
153	var typ influxql.DataType
154	for _, name := range names {
155		if m.SystemIterator != "" {
156			name = m.SystemIterator
157		}
158		t := sg.MapType(name, field)
159		if typ.LessThan(t) {
160			typ = t
161		}
162	}
163	return typ
164}
165
166func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
167	source := Source{
168		Database:        m.Database,
169		RetentionPolicy: m.RetentionPolicy,
170	}
171
172	sg := a.ShardMap[source]
173	if sg == nil {
174		return nil, nil
175	}
176
177	// Override the time constraints if they don't match each other.
178	if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() {
179		opt.StartTime = a.MinTime.UnixNano()
180	}
181	if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() {
182		opt.EndTime = a.MaxTime.UnixNano()
183	}
184
185	if m.Regex != nil {
186		measurements := sg.MeasurementsByRegex(m.Regex.Val)
187		inputs := make([]query.Iterator, 0, len(measurements))
188		if err := func() error {
189			// Create a Measurement for each returned matching measurement value
190			// from the regex.
191			for _, measurement := range measurements {
192				mm := m.Clone()
193				mm.Name = measurement // Set the name to this matching regex value.
194				input, err := sg.CreateIterator(ctx, mm, opt)
195				if err != nil {
196					return err
197				}
198				inputs = append(inputs, input)
199			}
200			return nil
201		}(); err != nil {
202			query.Iterators(inputs).Close()
203			return nil, err
204		}
205
206		return query.Iterators(inputs).Merge(opt)
207	}
208	return sg.CreateIterator(ctx, m, opt)
209}
210
211func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
212	source := Source{
213		Database:        m.Database,
214		RetentionPolicy: m.RetentionPolicy,
215	}
216
217	sg := a.ShardMap[source]
218	if sg == nil {
219		return query.IteratorCost{}, nil
220	}
221
222	// Override the time constraints if they don't match each other.
223	if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() {
224		opt.StartTime = a.MinTime.UnixNano()
225	}
226	if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() {
227		opt.EndTime = a.MaxTime.UnixNano()
228	}
229
230	if m.Regex != nil {
231		var costs query.IteratorCost
232		measurements := sg.MeasurementsByRegex(m.Regex.Val)
233		for _, measurement := range measurements {
234			cost, err := sg.IteratorCost(measurement, opt)
235			if err != nil {
236				return query.IteratorCost{}, err
237			}
238			costs = costs.Combine(cost)
239		}
240		return costs, nil
241	}
242	return sg.IteratorCost(m.Name, opt)
243}
244
245// Close clears out the list of mapped shards.
246func (a *LocalShardMapping) Close() error {
247	a.ShardMap = nil
248	return nil
249}
250
251// Source contains the database and retention policy source for data.
252type Source struct {
253	Database        string
254	RetentionPolicy string
255}
256