1package coordinator 2 3import ( 4 "context" 5 "io" 6 "time" 7 8 "github.com/influxdata/influxdb/query" 9 "github.com/influxdata/influxdb/services/meta" 10 "github.com/influxdata/influxdb/tsdb" 11 "github.com/influxdata/influxql" 12) 13 14// IteratorCreator is an interface that combines mapping fields and creating iterators. 15type IteratorCreator interface { 16 query.IteratorCreator 17 influxql.FieldMapper 18 io.Closer 19} 20 21// LocalShardMapper implements a ShardMapper for local shards. 22type LocalShardMapper struct { 23 MetaClient interface { 24 ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) 25 } 26 27 TSDBStore interface { 28 ShardGroup(ids []uint64) tsdb.ShardGroup 29 } 30} 31 32// MapShards maps the sources to the appropriate shards into an IteratorCreator. 33func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions) (query.ShardGroup, error) { 34 a := &LocalShardMapping{ 35 ShardMap: make(map[Source]tsdb.ShardGroup), 36 } 37 38 tmin := time.Unix(0, t.MinTimeNano()) 39 tmax := time.Unix(0, t.MaxTimeNano()) 40 if err := e.mapShards(a, sources, tmin, tmax); err != nil { 41 return nil, err 42 } 43 a.MinTime, a.MaxTime = tmin, tmax 44 return a, nil 45} 46 47func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sources, tmin, tmax time.Time) error { 48 for _, s := range sources { 49 switch s := s.(type) { 50 case *influxql.Measurement: 51 source := Source{ 52 Database: s.Database, 53 RetentionPolicy: s.RetentionPolicy, 54 } 55 // Retrieve the list of shards for this database. This list of 56 // shards is always the same regardless of which measurement we are 57 // using. 58 if _, ok := a.ShardMap[source]; !ok { 59 groups, err := e.MetaClient.ShardGroupsByTimeRange(s.Database, s.RetentionPolicy, tmin, tmax) 60 if err != nil { 61 return err 62 } 63 64 if len(groups) == 0 { 65 a.ShardMap[source] = nil 66 continue 67 } 68 69 shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups)) 70 for _, g := range groups { 71 for _, si := range g.Shards { 72 shardIDs = append(shardIDs, si.ID) 73 } 74 } 75 a.ShardMap[source] = e.TSDBStore.ShardGroup(shardIDs) 76 } 77 case *influxql.SubQuery: 78 if err := e.mapShards(a, s.Statement.Sources, tmin, tmax); err != nil { 79 return err 80 } 81 } 82 } 83 return nil 84} 85 86// ShardMapper maps data sources to a list of shard information. 87type LocalShardMapping struct { 88 ShardMap map[Source]tsdb.ShardGroup 89 90 // MinTime is the minimum time that this shard mapper will allow. 91 // Any attempt to use a time before this one will automatically result in using 92 // this time instead. 93 MinTime time.Time 94 95 // MaxTime is the maximum time that this shard mapper will allow. 96 // Any attempt to use a time after this one will automatically result in using 97 // this time instead. 98 MaxTime time.Time 99} 100 101func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 102 source := Source{ 103 Database: m.Database, 104 RetentionPolicy: m.RetentionPolicy, 105 } 106 107 sg := a.ShardMap[source] 108 if sg == nil { 109 return 110 } 111 112 fields = make(map[string]influxql.DataType) 113 dimensions = make(map[string]struct{}) 114 115 var measurements []string 116 if m.Regex != nil { 117 measurements = sg.MeasurementsByRegex(m.Regex.Val) 118 } else { 119 measurements = []string{m.Name} 120 } 121 122 f, d, err := sg.FieldDimensions(measurements) 123 if err != nil { 124 return nil, nil, err 125 } 126 for k, typ := range f { 127 fields[k] = typ 128 } 129 for k := range d { 130 dimensions[k] = struct{}{} 131 } 132 return 133} 134 135func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType { 136 source := Source{ 137 Database: m.Database, 138 RetentionPolicy: m.RetentionPolicy, 139 } 140 141 sg := a.ShardMap[source] 142 if sg == nil { 143 return influxql.Unknown 144 } 145 146 var names []string 147 if m.Regex != nil { 148 names = sg.MeasurementsByRegex(m.Regex.Val) 149 } else { 150 names = []string{m.Name} 151 } 152 153 var typ influxql.DataType 154 for _, name := range names { 155 if m.SystemIterator != "" { 156 name = m.SystemIterator 157 } 158 t := sg.MapType(name, field) 159 if typ.LessThan(t) { 160 typ = t 161 } 162 } 163 return typ 164} 165 166func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { 167 source := Source{ 168 Database: m.Database, 169 RetentionPolicy: m.RetentionPolicy, 170 } 171 172 sg := a.ShardMap[source] 173 if sg == nil { 174 return nil, nil 175 } 176 177 // Override the time constraints if they don't match each other. 178 if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() { 179 opt.StartTime = a.MinTime.UnixNano() 180 } 181 if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() { 182 opt.EndTime = a.MaxTime.UnixNano() 183 } 184 185 if m.Regex != nil { 186 measurements := sg.MeasurementsByRegex(m.Regex.Val) 187 inputs := make([]query.Iterator, 0, len(measurements)) 188 if err := func() error { 189 // Create a Measurement for each returned matching measurement value 190 // from the regex. 191 for _, measurement := range measurements { 192 mm := m.Clone() 193 mm.Name = measurement // Set the name to this matching regex value. 194 input, err := sg.CreateIterator(ctx, mm, opt) 195 if err != nil { 196 return err 197 } 198 inputs = append(inputs, input) 199 } 200 return nil 201 }(); err != nil { 202 query.Iterators(inputs).Close() 203 return nil, err 204 } 205 206 return query.Iterators(inputs).Merge(opt) 207 } 208 return sg.CreateIterator(ctx, m, opt) 209} 210 211func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { 212 source := Source{ 213 Database: m.Database, 214 RetentionPolicy: m.RetentionPolicy, 215 } 216 217 sg := a.ShardMap[source] 218 if sg == nil { 219 return query.IteratorCost{}, nil 220 } 221 222 // Override the time constraints if they don't match each other. 223 if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() { 224 opt.StartTime = a.MinTime.UnixNano() 225 } 226 if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() { 227 opt.EndTime = a.MaxTime.UnixNano() 228 } 229 230 if m.Regex != nil { 231 var costs query.IteratorCost 232 measurements := sg.MeasurementsByRegex(m.Regex.Val) 233 for _, measurement := range measurements { 234 cost, err := sg.IteratorCost(measurement, opt) 235 if err != nil { 236 return query.IteratorCost{}, err 237 } 238 costs = costs.Combine(cost) 239 } 240 return costs, nil 241 } 242 return sg.IteratorCost(m.Name, opt) 243} 244 245// Close clears out the list of mapped shards. 246func (a *LocalShardMapping) Close() error { 247 a.ShardMap = nil 248 return nil 249} 250 251// Source contains the database and retention policy source for data. 252type Source struct { 253 Database string 254 RetentionPolicy string 255} 256