1package coordinator 2 3import ( 4 "errors" 5 "fmt" 6 "sort" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/influxdata/influxdb" 12 "github.com/influxdata/influxdb/models" 13 "github.com/influxdata/influxdb/services/meta" 14 "github.com/influxdata/influxdb/tsdb" 15 "go.uber.org/zap" 16) 17 18// The keys for statistics generated by the "write" module. 19const ( 20 statWriteReq = "req" 21 statPointWriteReq = "pointReq" 22 statPointWriteReqLocal = "pointReqLocal" 23 statWriteOK = "writeOk" 24 statWriteDrop = "writeDrop" 25 statWriteTimeout = "writeTimeout" 26 statWriteErr = "writeError" 27 statSubWriteOK = "subWriteOk" 28 statSubWriteDrop = "subWriteDrop" 29) 30 31var ( 32 // ErrTimeout is returned when a write times out. 33 ErrTimeout = errors.New("timeout") 34 35 // ErrPartialWrite is returned when a write partially succeeds but does 36 // not meet the requested consistency level. 37 ErrPartialWrite = errors.New("partial write") 38 39 // ErrWriteFailed is returned when no writes succeeded. 40 ErrWriteFailed = errors.New("write failed") 41) 42 43// PointsWriter handles writes across multiple local and remote data nodes. 44type PointsWriter struct { 45 mu sync.RWMutex 46 closing chan struct{} 47 WriteTimeout time.Duration 48 Logger *zap.Logger 49 50 Node *influxdb.Node 51 52 MetaClient interface { 53 Database(name string) (di *meta.DatabaseInfo) 54 RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) 55 CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) 56 } 57 58 TSDBStore interface { 59 CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error 60 WriteToShard(shardID uint64, points []models.Point) error 61 } 62 63 subPoints []chan<- *WritePointsRequest 64 65 stats *WriteStatistics 66} 67 68// WritePointsRequest represents a request to write point data to the cluster. 69type WritePointsRequest struct { 70 Database string 71 RetentionPolicy string 72 Points []models.Point 73} 74 75// AddPoint adds a point to the WritePointRequest with field key 'value' 76func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { 77 pt, err := models.NewPoint( 78 name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp, 79 ) 80 if err != nil { 81 return 82 } 83 w.Points = append(w.Points, pt) 84} 85 86// NewPointsWriter returns a new instance of PointsWriter for a node. 87func NewPointsWriter() *PointsWriter { 88 return &PointsWriter{ 89 closing: make(chan struct{}), 90 WriteTimeout: DefaultWriteTimeout, 91 Logger: zap.NewNop(), 92 stats: &WriteStatistics{}, 93 } 94} 95 96// ShardMapping contains a mapping of shards to points. 97type ShardMapping struct { 98 n int 99 Points map[uint64][]models.Point // The points associated with a shard ID 100 Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID 101 Dropped []models.Point // Points that were dropped 102} 103 104// NewShardMapping creates an empty ShardMapping. 105func NewShardMapping(n int) *ShardMapping { 106 return &ShardMapping{ 107 n: n, 108 Points: map[uint64][]models.Point{}, 109 Shards: map[uint64]*meta.ShardInfo{}, 110 } 111} 112 113// MapPoint adds the point to the ShardMapping, associated with the given shardInfo. 114func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) { 115 if cap(s.Points[shardInfo.ID]) < s.n { 116 s.Points[shardInfo.ID] = make([]models.Point, 0, s.n) 117 } 118 s.Points[shardInfo.ID] = append(s.Points[shardInfo.ID], p) 119 s.Shards[shardInfo.ID] = shardInfo 120} 121 122// Open opens the communication channel with the point writer. 123func (w *PointsWriter) Open() error { 124 w.mu.Lock() 125 defer w.mu.Unlock() 126 w.closing = make(chan struct{}) 127 return nil 128} 129 130// Close closes the communication channel with the point writer. 131func (w *PointsWriter) Close() error { 132 w.mu.Lock() 133 defer w.mu.Unlock() 134 if w.closing != nil { 135 close(w.closing) 136 } 137 if w.subPoints != nil { 138 // 'nil' channels always block so this makes the 139 // select statement in WritePoints hit its default case 140 // dropping any in-flight writes. 141 w.subPoints = nil 142 } 143 return nil 144} 145 146func (w *PointsWriter) AddWriteSubscriber(c chan<- *WritePointsRequest) { 147 w.subPoints = append(w.subPoints, c) 148} 149 150// WithLogger sets the Logger on w. 151func (w *PointsWriter) WithLogger(log *zap.Logger) { 152 w.Logger = log.With(zap.String("service", "write")) 153} 154 155// WriteStatistics keeps statistics related to the PointsWriter. 156type WriteStatistics struct { 157 WriteReq int64 158 PointWriteReq int64 159 PointWriteReqLocal int64 160 WriteOK int64 161 WriteDropped int64 162 WriteTimeout int64 163 WriteErr int64 164 SubWriteOK int64 165 SubWriteDrop int64 166} 167 168// Statistics returns statistics for periodic monitoring. 169func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic { 170 return []models.Statistic{{ 171 Name: "write", 172 Tags: tags, 173 Values: map[string]interface{}{ 174 statWriteReq: atomic.LoadInt64(&w.stats.WriteReq), 175 statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq), 176 statPointWriteReqLocal: atomic.LoadInt64(&w.stats.PointWriteReqLocal), 177 statWriteOK: atomic.LoadInt64(&w.stats.WriteOK), 178 statWriteDrop: atomic.LoadInt64(&w.stats.WriteDropped), 179 statWriteTimeout: atomic.LoadInt64(&w.stats.WriteTimeout), 180 statWriteErr: atomic.LoadInt64(&w.stats.WriteErr), 181 statSubWriteOK: atomic.LoadInt64(&w.stats.SubWriteOK), 182 statSubWriteDrop: atomic.LoadInt64(&w.stats.SubWriteDrop), 183 }, 184 }} 185} 186 187// MapShards maps the points contained in wp to a ShardMapping. If a point 188// maps to a shard group or shard that does not currently exist, it will be 189// created before returning the mapping. 190func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) { 191 rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy) 192 if err != nil { 193 return nil, err 194 } else if rp == nil { 195 return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy) 196 } 197 198 // Holds all the shard groups and shards that are required for writes. 199 list := make(sgList, 0, 8) 200 min := time.Unix(0, models.MinNanoTime) 201 if rp.Duration > 0 { 202 min = time.Now().Add(-rp.Duration) 203 } 204 205 for _, p := range wp.Points { 206 // Either the point is outside the scope of the RP, or we already have 207 // a suitable shard group for the point. 208 if p.Time().Before(min) || list.Covers(p.Time()) { 209 continue 210 } 211 212 // No shard groups overlap with the point's time, so we will create 213 // a new shard group for this point. 214 sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time()) 215 if err != nil { 216 return nil, err 217 } 218 219 if sg == nil { 220 return nil, errors.New("nil shard group") 221 } 222 list = list.Append(*sg) 223 } 224 225 mapping := NewShardMapping(len(wp.Points)) 226 for _, p := range wp.Points { 227 sg := list.ShardGroupAt(p.Time()) 228 if sg == nil { 229 // We didn't create a shard group because the point was outside the 230 // scope of the RP. 231 mapping.Dropped = append(mapping.Dropped, p) 232 atomic.AddInt64(&w.stats.WriteDropped, 1) 233 continue 234 } 235 236 sh := sg.ShardFor(p.HashID()) 237 mapping.MapPoint(&sh, p) 238 } 239 return mapping, nil 240} 241 242// sgList is a wrapper around a meta.ShardGroupInfos where we can also check 243// if a given time is covered by any of the shard groups in the list. 244type sgList meta.ShardGroupInfos 245 246func (l sgList) Covers(t time.Time) bool { 247 if len(l) == 0 { 248 return false 249 } 250 return l.ShardGroupAt(t) != nil 251} 252 253// ShardGroupAt attempts to find a shard group that could contain a point 254// at the given time. 255// 256// Shard groups are sorted first according to end time, and then according 257// to start time. Therefore, if there are multiple shard groups that match 258// this point's time they will be preferred in this order: 259// 260// - a shard group with the earliest end time; 261// - (assuming identical end times) the shard group with the earliest start time. 262func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo { 263 idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) }) 264 265 // We couldn't find a shard group the point falls into. 266 if idx == len(l) || t.Before(l[idx].StartTime) { 267 return nil 268 } 269 return &l[idx] 270} 271 272// Append appends a shard group to the list, and returns a sorted list. 273func (l sgList) Append(sgi meta.ShardGroupInfo) sgList { 274 next := append(l, sgi) 275 sort.Sort(meta.ShardGroupInfos(next)) 276 return next 277} 278 279// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of 280// a cluster structure for information. This is to avoid a circular dependency. 281func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error { 282 return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points) 283} 284 285// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios 286func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { 287 return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points) 288} 289 290// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios 291func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { 292 atomic.AddInt64(&w.stats.WriteReq, 1) 293 atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points))) 294 295 if retentionPolicy == "" { 296 db := w.MetaClient.Database(database) 297 if db == nil { 298 return influxdb.ErrDatabaseNotFound(database) 299 } 300 retentionPolicy = db.DefaultRetentionPolicy 301 } 302 303 shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}) 304 if err != nil { 305 return err 306 } 307 308 // Write each shard in it's own goroutine and return as soon as one fails. 309 ch := make(chan error, len(shardMappings.Points)) 310 for shardID, points := range shardMappings.Points { 311 go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) { 312 err := w.writeToShard(shard, database, retentionPolicy, points) 313 if err == tsdb.ErrShardDeletion { 314 err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)} 315 } 316 ch <- err 317 }(shardMappings.Shards[shardID], database, retentionPolicy, points) 318 } 319 320 // Send points to subscriptions if possible. 321 var ok, dropped int64 322 pts := &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points} 323 // We need to lock just in case the channel is about to be nil'ed 324 w.mu.RLock() 325 for _, ch := range w.subPoints { 326 select { 327 case ch <- pts: 328 ok++ 329 default: 330 dropped++ 331 } 332 } 333 w.mu.RUnlock() 334 335 if ok > 0 { 336 atomic.AddInt64(&w.stats.SubWriteOK, ok) 337 } 338 339 if dropped > 0 { 340 atomic.AddInt64(&w.stats.SubWriteDrop, dropped) 341 } 342 343 if err == nil && len(shardMappings.Dropped) > 0 { 344 err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)} 345 346 } 347 timeout := time.NewTimer(w.WriteTimeout) 348 defer timeout.Stop() 349 for range shardMappings.Points { 350 select { 351 case <-w.closing: 352 return ErrWriteFailed 353 case <-timeout.C: 354 atomic.AddInt64(&w.stats.WriteTimeout, 1) 355 // return timeout error to caller 356 return ErrTimeout 357 case err := <-ch: 358 if err != nil { 359 return err 360 } 361 } 362 } 363 return err 364} 365 366// writeToShards writes points to a shard. 367func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error { 368 atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points))) 369 370 err := w.TSDBStore.WriteToShard(shard.ID, points) 371 if err == nil { 372 atomic.AddInt64(&w.stats.WriteOK, 1) 373 return nil 374 } 375 376 // If this is a partial write error, that is also ok. 377 if _, ok := err.(tsdb.PartialWriteError); ok { 378 atomic.AddInt64(&w.stats.WriteErr, 1) 379 return err 380 } 381 382 // If we've written to shard that should exist on the current node, but the store has 383 // not actually created this shard, tell it to create it and retry the write 384 if err == tsdb.ErrShardNotFound { 385 err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true) 386 if err != nil { 387 w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) 388 389 atomic.AddInt64(&w.stats.WriteErr, 1) 390 return err 391 } 392 } 393 err = w.TSDBStore.WriteToShard(shard.ID, points) 394 if err != nil { 395 w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) 396 atomic.AddInt64(&w.stats.WriteErr, 1) 397 return err 398 } 399 400 atomic.AddInt64(&w.stats.WriteOK, 1) 401 return nil 402} 403