1package coordinator
2
3import (
4	"errors"
5	"fmt"
6	"sort"
7	"sync"
8	"sync/atomic"
9	"time"
10
11	"github.com/influxdata/influxdb"
12	"github.com/influxdata/influxdb/models"
13	"github.com/influxdata/influxdb/services/meta"
14	"github.com/influxdata/influxdb/tsdb"
15	"go.uber.org/zap"
16)
17
18// The keys for statistics generated by the "write" module.
19const (
20	statWriteReq           = "req"
21	statPointWriteReq      = "pointReq"
22	statPointWriteReqLocal = "pointReqLocal"
23	statWriteOK            = "writeOk"
24	statWriteDrop          = "writeDrop"
25	statWriteTimeout       = "writeTimeout"
26	statWriteErr           = "writeError"
27	statSubWriteOK         = "subWriteOk"
28	statSubWriteDrop       = "subWriteDrop"
29)
30
31var (
32	// ErrTimeout is returned when a write times out.
33	ErrTimeout = errors.New("timeout")
34
35	// ErrPartialWrite is returned when a write partially succeeds but does
36	// not meet the requested consistency level.
37	ErrPartialWrite = errors.New("partial write")
38
39	// ErrWriteFailed is returned when no writes succeeded.
40	ErrWriteFailed = errors.New("write failed")
41)
42
43// PointsWriter handles writes across multiple local and remote data nodes.
44type PointsWriter struct {
45	mu           sync.RWMutex
46	closing      chan struct{}
47	WriteTimeout time.Duration
48	Logger       *zap.Logger
49
50	Node *influxdb.Node
51
52	MetaClient interface {
53		Database(name string) (di *meta.DatabaseInfo)
54		RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
55		CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
56	}
57
58	TSDBStore interface {
59		CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
60		WriteToShard(shardID uint64, points []models.Point) error
61	}
62
63	subPoints []chan<- *WritePointsRequest
64
65	stats *WriteStatistics
66}
67
68// WritePointsRequest represents a request to write point data to the cluster.
69type WritePointsRequest struct {
70	Database        string
71	RetentionPolicy string
72	Points          []models.Point
73}
74
75// AddPoint adds a point to the WritePointRequest with field key 'value'
76func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
77	pt, err := models.NewPoint(
78		name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
79	)
80	if err != nil {
81		return
82	}
83	w.Points = append(w.Points, pt)
84}
85
86// NewPointsWriter returns a new instance of PointsWriter for a node.
87func NewPointsWriter() *PointsWriter {
88	return &PointsWriter{
89		closing:      make(chan struct{}),
90		WriteTimeout: DefaultWriteTimeout,
91		Logger:       zap.NewNop(),
92		stats:        &WriteStatistics{},
93	}
94}
95
96// ShardMapping contains a mapping of shards to points.
97type ShardMapping struct {
98	n       int
99	Points  map[uint64][]models.Point  // The points associated with a shard ID
100	Shards  map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
101	Dropped []models.Point             // Points that were dropped
102}
103
104// NewShardMapping creates an empty ShardMapping.
105func NewShardMapping(n int) *ShardMapping {
106	return &ShardMapping{
107		n:      n,
108		Points: map[uint64][]models.Point{},
109		Shards: map[uint64]*meta.ShardInfo{},
110	}
111}
112
113// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
114func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
115	if cap(s.Points[shardInfo.ID]) < s.n {
116		s.Points[shardInfo.ID] = make([]models.Point, 0, s.n)
117	}
118	s.Points[shardInfo.ID] = append(s.Points[shardInfo.ID], p)
119	s.Shards[shardInfo.ID] = shardInfo
120}
121
122// Open opens the communication channel with the point writer.
123func (w *PointsWriter) Open() error {
124	w.mu.Lock()
125	defer w.mu.Unlock()
126	w.closing = make(chan struct{})
127	return nil
128}
129
130// Close closes the communication channel with the point writer.
131func (w *PointsWriter) Close() error {
132	w.mu.Lock()
133	defer w.mu.Unlock()
134	if w.closing != nil {
135		close(w.closing)
136	}
137	if w.subPoints != nil {
138		// 'nil' channels always block so this makes the
139		// select statement in WritePoints hit its default case
140		// dropping any in-flight writes.
141		w.subPoints = nil
142	}
143	return nil
144}
145
146func (w *PointsWriter) AddWriteSubscriber(c chan<- *WritePointsRequest) {
147	w.subPoints = append(w.subPoints, c)
148}
149
150// WithLogger sets the Logger on w.
151func (w *PointsWriter) WithLogger(log *zap.Logger) {
152	w.Logger = log.With(zap.String("service", "write"))
153}
154
155// WriteStatistics keeps statistics related to the PointsWriter.
156type WriteStatistics struct {
157	WriteReq           int64
158	PointWriteReq      int64
159	PointWriteReqLocal int64
160	WriteOK            int64
161	WriteDropped       int64
162	WriteTimeout       int64
163	WriteErr           int64
164	SubWriteOK         int64
165	SubWriteDrop       int64
166}
167
168// Statistics returns statistics for periodic monitoring.
169func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
170	return []models.Statistic{{
171		Name: "write",
172		Tags: tags,
173		Values: map[string]interface{}{
174			statWriteReq:           atomic.LoadInt64(&w.stats.WriteReq),
175			statPointWriteReq:      atomic.LoadInt64(&w.stats.PointWriteReq),
176			statPointWriteReqLocal: atomic.LoadInt64(&w.stats.PointWriteReqLocal),
177			statWriteOK:            atomic.LoadInt64(&w.stats.WriteOK),
178			statWriteDrop:          atomic.LoadInt64(&w.stats.WriteDropped),
179			statWriteTimeout:       atomic.LoadInt64(&w.stats.WriteTimeout),
180			statWriteErr:           atomic.LoadInt64(&w.stats.WriteErr),
181			statSubWriteOK:         atomic.LoadInt64(&w.stats.SubWriteOK),
182			statSubWriteDrop:       atomic.LoadInt64(&w.stats.SubWriteDrop),
183		},
184	}}
185}
186
187// MapShards maps the points contained in wp to a ShardMapping.  If a point
188// maps to a shard group or shard that does not currently exist, it will be
189// created before returning the mapping.
190func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
191	rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
192	if err != nil {
193		return nil, err
194	} else if rp == nil {
195		return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
196	}
197
198	// Holds all the shard groups and shards that are required for writes.
199	list := make(sgList, 0, 8)
200	min := time.Unix(0, models.MinNanoTime)
201	if rp.Duration > 0 {
202		min = time.Now().Add(-rp.Duration)
203	}
204
205	for _, p := range wp.Points {
206		// Either the point is outside the scope of the RP, or we already have
207		// a suitable shard group for the point.
208		if p.Time().Before(min) || list.Covers(p.Time()) {
209			continue
210		}
211
212		// No shard groups overlap with the point's time, so we will create
213		// a new shard group for this point.
214		sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
215		if err != nil {
216			return nil, err
217		}
218
219		if sg == nil {
220			return nil, errors.New("nil shard group")
221		}
222		list = list.Append(*sg)
223	}
224
225	mapping := NewShardMapping(len(wp.Points))
226	for _, p := range wp.Points {
227		sg := list.ShardGroupAt(p.Time())
228		if sg == nil {
229			// We didn't create a shard group because the point was outside the
230			// scope of the RP.
231			mapping.Dropped = append(mapping.Dropped, p)
232			atomic.AddInt64(&w.stats.WriteDropped, 1)
233			continue
234		}
235
236		sh := sg.ShardFor(p.HashID())
237		mapping.MapPoint(&sh, p)
238	}
239	return mapping, nil
240}
241
242// sgList is a wrapper around a meta.ShardGroupInfos where we can also check
243// if a given time is covered by any of the shard groups in the list.
244type sgList meta.ShardGroupInfos
245
246func (l sgList) Covers(t time.Time) bool {
247	if len(l) == 0 {
248		return false
249	}
250	return l.ShardGroupAt(t) != nil
251}
252
253// ShardGroupAt attempts to find a shard group that could contain a point
254// at the given time.
255//
256// Shard groups are sorted first according to end time, and then according
257// to start time. Therefore, if there are multiple shard groups that match
258// this point's time they will be preferred in this order:
259//
260//  - a shard group with the earliest end time;
261//  - (assuming identical end times) the shard group with the earliest start time.
262func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
263	idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })
264
265	// We couldn't find a shard group the point falls into.
266	if idx == len(l) || t.Before(l[idx].StartTime) {
267		return nil
268	}
269	return &l[idx]
270}
271
272// Append appends a shard group to the list, and returns a sorted list.
273func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
274	next := append(l, sgi)
275	sort.Sort(meta.ShardGroupInfos(next))
276	return next
277}
278
279// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
280// a cluster structure for information. This is to avoid a circular dependency.
281func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
282	return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
283}
284
285// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios
286func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
287	return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
288}
289
290// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios
291func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
292	atomic.AddInt64(&w.stats.WriteReq, 1)
293	atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))
294
295	if retentionPolicy == "" {
296		db := w.MetaClient.Database(database)
297		if db == nil {
298			return influxdb.ErrDatabaseNotFound(database)
299		}
300		retentionPolicy = db.DefaultRetentionPolicy
301	}
302
303	shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
304	if err != nil {
305		return err
306	}
307
308	// Write each shard in it's own goroutine and return as soon as one fails.
309	ch := make(chan error, len(shardMappings.Points))
310	for shardID, points := range shardMappings.Points {
311		go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
312			err := w.writeToShard(shard, database, retentionPolicy, points)
313			if err == tsdb.ErrShardDeletion {
314				err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
315			}
316			ch <- err
317		}(shardMappings.Shards[shardID], database, retentionPolicy, points)
318	}
319
320	// Send points to subscriptions if possible.
321	var ok, dropped int64
322	pts := &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}
323	// We need to lock just in case the channel is about to be nil'ed
324	w.mu.RLock()
325	for _, ch := range w.subPoints {
326		select {
327		case ch <- pts:
328			ok++
329		default:
330			dropped++
331		}
332	}
333	w.mu.RUnlock()
334
335	if ok > 0 {
336		atomic.AddInt64(&w.stats.SubWriteOK, ok)
337	}
338
339	if dropped > 0 {
340		atomic.AddInt64(&w.stats.SubWriteDrop, dropped)
341	}
342
343	if err == nil && len(shardMappings.Dropped) > 0 {
344		err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
345
346	}
347	timeout := time.NewTimer(w.WriteTimeout)
348	defer timeout.Stop()
349	for range shardMappings.Points {
350		select {
351		case <-w.closing:
352			return ErrWriteFailed
353		case <-timeout.C:
354			atomic.AddInt64(&w.stats.WriteTimeout, 1)
355			// return timeout error to caller
356			return ErrTimeout
357		case err := <-ch:
358			if err != nil {
359				return err
360			}
361		}
362	}
363	return err
364}
365
366// writeToShards writes points to a shard.
367func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
368	atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
369
370	err := w.TSDBStore.WriteToShard(shard.ID, points)
371	if err == nil {
372		atomic.AddInt64(&w.stats.WriteOK, 1)
373		return nil
374	}
375
376	// If this is a partial write error, that is also ok.
377	if _, ok := err.(tsdb.PartialWriteError); ok {
378		atomic.AddInt64(&w.stats.WriteErr, 1)
379		return err
380	}
381
382	// If we've written to shard that should exist on the current node, but the store has
383	// not actually created this shard, tell it to create it and retry the write
384	if err == tsdb.ErrShardNotFound {
385		err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
386		if err != nil {
387			w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
388
389			atomic.AddInt64(&w.stats.WriteErr, 1)
390			return err
391		}
392	}
393	err = w.TSDBStore.WriteToShard(shard.ID, points)
394	if err != nil {
395		w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
396		atomic.AddInt64(&w.stats.WriteErr, 1)
397		return err
398	}
399
400	atomic.AddInt64(&w.stats.WriteOK, 1)
401	return nil
402}
403