1package distributor
2
3import (
4	"context"
5	"errors"
6	"flag"
7	"fmt"
8	"math/rand"
9	"strings"
10	"sync"
11	"time"
12
13	"github.com/go-kit/log"
14	"github.com/go-kit/log/level"
15	"github.com/gogo/protobuf/proto"
16	"github.com/grafana/dskit/kv"
17	"github.com/grafana/dskit/kv/codec"
18	"github.com/grafana/dskit/services"
19	"github.com/prometheus/client_golang/prometheus"
20	"github.com/prometheus/client_golang/prometheus/promauto"
21	"github.com/prometheus/prometheus/pkg/timestamp"
22
23	"github.com/cortexproject/cortex/pkg/cortexpb"
24	"github.com/cortexproject/cortex/pkg/util"
25)
26
27var (
28	errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative")
29	errInvalidFailoverTimeout         = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
30)
31
32type haTrackerLimits interface {
33	// MaxHAClusters returns max number of clusters that HA tracker should track for a user.
34	// Samples from additional clusters are rejected.
35	MaxHAClusters(user string) int
36}
37
38// ProtoReplicaDescFactory makes new InstanceDescs
39func ProtoReplicaDescFactory() proto.Message {
40	return NewReplicaDesc()
41}
42
43// NewReplicaDesc returns an empty *distributor.ReplicaDesc.
44func NewReplicaDesc() *ReplicaDesc {
45	return &ReplicaDesc{}
46}
47
48// HATrackerConfig contains the configuration require to
49// create a HA Tracker.
50type HATrackerConfig struct {
51	EnableHATracker bool `yaml:"enable_ha_tracker"`
52	// We should only update the timestamp if the difference
53	// between the stored timestamp and the time we received a sample at
54	// is more than this duration.
55	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout"`
56	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
57	// We should only failover to accepting samples from a replica
58	// other than the replica written in the KVStore if the difference
59	// between the stored timestamp and the time we received a sample is
60	// more than this duration
61	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`
62
63	KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
64}
65
66// RegisterFlags adds the flags required to config this to the given FlagSet.
67func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
68	f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
69	f.DurationVar(&cfg.UpdateTimeout, "distributor.ha-tracker.update-timeout", 15*time.Second, "Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
70	f.DurationVar(&cfg.UpdateTimeoutJitterMax, "distributor.ha-tracker.update-timeout-jitter-max", 5*time.Second, "Maximum jitter applied to the update timeout, in order to spread the HA heartbeats over time.")
71	f.DurationVar(&cfg.FailoverTimeout, "distributor.ha-tracker.failover-timeout", 30*time.Second, "If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout")
72
73	// We want the ability to use different Consul instances for the ring and
74	// for HA cluster tracking. We also customize the default keys prefix, in
75	// order to not clash with the ring key if they both share the same KVStore
76	// backend (ie. run on the same consul cluster).
77	cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", "ha-tracker/", f)
78}
79
80// Validate config and returns error on failure
81func (cfg *HATrackerConfig) Validate() error {
82	if cfg.UpdateTimeoutJitterMax < 0 {
83		return errNegativeUpdateTimeoutJitterMax
84	}
85
86	minFailureTimeout := cfg.UpdateTimeout + cfg.UpdateTimeoutJitterMax + time.Second
87	if cfg.FailoverTimeout < minFailureTimeout {
88		return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout)
89	}
90
91	return nil
92}
93
94func GetReplicaDescCodec() codec.Proto {
95	return codec.NewProtoCodec("replicaDesc", ProtoReplicaDescFactory)
96}
97
98// Track the replica we're accepting samples from
99// for each HA cluster we know about.
100type haTracker struct {
101	services.Service
102
103	logger              log.Logger
104	cfg                 HATrackerConfig
105	client              kv.Client
106	updateTimeoutJitter time.Duration
107	limits              haTrackerLimits
108
109	electedLock sync.RWMutex
110	elected     map[string]ReplicaDesc         // Replicas we are accepting samples from. Key = "user/cluster".
111	clusters    map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name.
112
113	electedReplicaChanges         *prometheus.CounterVec
114	electedReplicaTimestamp       *prometheus.GaugeVec
115	electedReplicaPropagationTime prometheus.Histogram
116	kvCASCalls                    *prometheus.CounterVec
117
118	cleanupRuns               prometheus.Counter
119	replicasMarkedForDeletion prometheus.Counter
120	deletedReplicas           prometheus.Counter
121	markingForDeletionsFailed prometheus.Counter
122}
123
124// NewClusterTracker returns a new HA cluster tracker using either Consul
125// or in-memory KV store. Tracker must be started via StartAsync().
126func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Registerer, logger log.Logger) (*haTracker, error) {
127	var jitter time.Duration
128	if cfg.UpdateTimeoutJitterMax > 0 {
129		jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
130	}
131
132	t := &haTracker{
133		logger:              logger,
134		cfg:                 cfg,
135		updateTimeoutJitter: jitter,
136		limits:              limits,
137		elected:             map[string]ReplicaDesc{},
138		clusters:            map[string]map[string]struct{}{},
139
140		electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
141			Name: "cortex_ha_tracker_elected_replica_changes_total",
142			Help: "The total number of times the elected replica has changed for a user ID/cluster.",
143		}, []string{"user", "cluster"}),
144		electedReplicaTimestamp: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
145			Name: "cortex_ha_tracker_elected_replica_timestamp_seconds",
146			Help: "The timestamp stored for the currently elected replica, from the KVStore.",
147		}, []string{"user", "cluster"}),
148		electedReplicaPropagationTime: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
149			Name:    "cortex_ha_tracker_elected_replica_change_propagation_time_seconds",
150			Help:    "The time it for the distributor to update the replica change.",
151			Buckets: prometheus.DefBuckets,
152		}),
153		kvCASCalls: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
154			Name: "cortex_ha_tracker_kv_store_cas_total",
155			Help: "The total number of CAS calls to the KV store for a user ID/cluster.",
156		}, []string{"user", "cluster"}),
157
158		cleanupRuns: promauto.With(reg).NewCounter(prometheus.CounterOpts{
159			Name: "cortex_ha_tracker_replicas_cleanup_started_total",
160			Help: "Number of elected replicas cleanup loops started.",
161		}),
162		replicasMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
163			Name: "cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total",
164			Help: "Number of elected replicas marked for deletion.",
165		}),
166		deletedReplicas: promauto.With(reg).NewCounter(prometheus.CounterOpts{
167			Name: "cortex_ha_tracker_replicas_cleanup_deleted_total",
168			Help: "Number of elected replicas deleted from KV store.",
169		}),
170		markingForDeletionsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
171			Name: "cortex_ha_tracker_replicas_cleanup_delete_failed_total",
172			Help: "Number of elected replicas that failed to be marked for deletion, or deleted.",
173		}),
174	}
175
176	if cfg.EnableHATracker {
177		client, err := kv.NewClient(
178			cfg.KVStore,
179			GetReplicaDescCodec(),
180			kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("cortex_", reg), "distributor-hatracker"),
181			logger,
182		)
183		if err != nil {
184			return nil, err
185		}
186		t.client = client
187	}
188
189	t.Service = services.NewBasicService(nil, t.loop, nil)
190	return t, nil
191}
192
193// Follows pattern used by ring for WatchKey.
194func (c *haTracker) loop(ctx context.Context) error {
195	if !c.cfg.EnableHATracker {
196		// don't do anything, but wait until asked to stop.
197		<-ctx.Done()
198		return nil
199	}
200
201	// Start cleanup loop. It will stop when context is done.
202	wg := sync.WaitGroup{}
203	wg.Add(1)
204	go func() {
205		defer wg.Done()
206		c.cleanupOldReplicasLoop(ctx)
207	}()
208
209	// The KVStore config we gave when creating c should have contained a prefix,
210	// which would have given us a prefixed KVStore client. So, we can pass empty string here.
211	c.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool {
212		replica := value.(*ReplicaDesc)
213		segments := strings.SplitN(key, "/", 2)
214
215		// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
216		if len(segments) != 2 {
217			return true
218		}
219
220		user := segments[0]
221		cluster := segments[1]
222
223		c.electedLock.Lock()
224		defer c.electedLock.Unlock()
225
226		if replica.DeletedAt > 0 {
227			delete(c.elected, key)
228			c.electedReplicaChanges.DeleteLabelValues(user, cluster)
229			c.electedReplicaTimestamp.DeleteLabelValues(user, cluster)
230
231			userClusters := c.clusters[user]
232			if userClusters != nil {
233				delete(userClusters, cluster)
234				if len(userClusters) == 0 {
235					delete(c.clusters, user)
236				}
237			}
238			return true
239		}
240
241		elected, exists := c.elected[key]
242		if replica.Replica != elected.Replica {
243			c.electedReplicaChanges.WithLabelValues(user, cluster).Inc()
244		}
245		if !exists {
246			if c.clusters[user] == nil {
247				c.clusters[user] = map[string]struct{}{}
248			}
249			c.clusters[user][cluster] = struct{}{}
250		}
251		c.elected[key] = *replica
252		c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
253		c.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())
254		return true
255	})
256
257	wg.Wait()
258	return nil
259}
260
261const (
262	cleanupCyclePeriod         = 30 * time.Minute
263	cleanupCycleJitterVariance = 0.2 // for 30 minutes, this is ±6 min
264
265	// If we have received last sample for given cluster before this timeout, we will mark selected replica for deletion.
266	// If selected replica is marked for deletion for this time, it is deleted completely.
267	deletionTimeout = 30 * time.Minute
268)
269
270func (c *haTracker) cleanupOldReplicasLoop(ctx context.Context) {
271	tick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance))
272	defer tick.Stop()
273
274	for {
275		select {
276		case <-ctx.Done():
277			return
278		case t := <-tick.C:
279			c.cleanupRuns.Inc()
280			c.cleanupOldReplicas(ctx, t.Add(-deletionTimeout))
281		}
282	}
283}
284
285// Replicas marked for deletion before deadline will be deleted.
286// Replicas with last-received timestamp before deadline will be marked for deletion.
287func (c *haTracker) cleanupOldReplicas(ctx context.Context, deadline time.Time) {
288	keys, err := c.client.List(ctx, "")
289	if err != nil {
290		level.Warn(c.logger).Log("msg", "cleanup: failed to list replica keys", "err", err)
291		return
292	}
293
294	for _, key := range keys {
295		if ctx.Err() != nil {
296			return
297		}
298
299		val, err := c.client.Get(ctx, key)
300		if err != nil {
301			level.Warn(c.logger).Log("msg", "cleanup: failed to get replica value", "key", key, "err", err)
302			continue
303		}
304
305		desc, ok := val.(*ReplicaDesc)
306		if !ok {
307			level.Error(c.logger).Log("msg", "cleanup: got invalid replica descriptor", "key", key)
308			continue
309		}
310
311		if desc.DeletedAt > 0 {
312			if timestamp.Time(desc.DeletedAt).After(deadline) {
313				continue
314			}
315
316			// We're blindly deleting a key here. It may happen that value was updated since we have read it few lines above,
317			// in which case Distributors will have updated value in memory, but Delete will remove it from KV store anyway.
318			// That's not great, but should not be a problem. If KV store sends Watch notification for Delete, distributors will
319			// delete it from memory, and recreate on next sample with matching replica.
320			//
321			// If KV store doesn't send Watch notification for Delete, distributors *with* replica in memory will keep using it,
322			// while distributors *without* replica in memory will try to write it to KV store -- which will update *all*
323			// watching distributors.
324			err = c.client.Delete(ctx, key)
325			if err != nil {
326				level.Error(c.logger).Log("msg", "cleanup: failed to delete old replica", "key", key, "err", err)
327				c.markingForDeletionsFailed.Inc()
328			} else {
329				level.Info(c.logger).Log("msg", "cleanup: deleted old replica", "key", key)
330				c.deletedReplicas.Inc()
331			}
332			continue
333		}
334
335		// Not marked as deleted yet.
336		if desc.DeletedAt == 0 && timestamp.Time(desc.ReceivedAt).Before(deadline) {
337			err := c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
338				d, ok := in.(*ReplicaDesc)
339				if !ok || d == nil || d.DeletedAt > 0 || !timestamp.Time(desc.ReceivedAt).Before(deadline) {
340					return nil, false, nil
341				}
342
343				d.DeletedAt = timestamp.FromTime(time.Now())
344				return d, true, nil
345			})
346
347			if err != nil {
348				c.markingForDeletionsFailed.Inc()
349				level.Error(c.logger).Log("msg", "cleanup: failed to mark replica as deleted", "key", key, "err", err)
350			} else {
351				c.replicasMarkedForDeletion.Inc()
352				level.Info(c.logger).Log("msg", "cleanup: marked replica as deleted", "key", key)
353			}
354		}
355	}
356}
357
358// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the
359// tracker c to see if we should accept the incomming sample. It will return an error if the sample
360// should not be accepted. Note that internally this function does checks against the stored values
361// and may modify the stored data, for example to failover between replicas after a certain period of time.
362// replicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are
363// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
364// to customers clients.
365func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string, now time.Time) error {
366	// If HA tracking isn't enabled then accept the sample
367	if !c.cfg.EnableHATracker {
368		return nil
369	}
370	key := fmt.Sprintf("%s/%s", userID, cluster)
371
372	c.electedLock.RLock()
373	entry, ok := c.elected[key]
374	clusters := len(c.clusters[userID])
375	c.electedLock.RUnlock()
376
377	if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
378		if entry.Replica != replica {
379			return replicasNotMatchError{replica: replica, elected: entry.Replica}
380		}
381		return nil
382	}
383
384	if !ok {
385		// If we don't know about this cluster yet and we have reached the limit for number of clusters, we error out now.
386		if limit := c.limits.MaxHAClusters(userID); limit > 0 && clusters+1 > limit {
387			return tooManyClustersError{limit: limit}
388		}
389	}
390
391	err := c.checkKVStore(ctx, key, replica, now)
392	c.kvCASCalls.WithLabelValues(userID, cluster).Inc()
393	if err != nil {
394		// The callback within checkKVStore will return a replicasNotMatchError if the sample is being deduped,
395		// otherwise there may have been an actual error CAS'ing that we should log.
396		if !errors.Is(err, replicasNotMatchError{}) {
397			level.Error(c.logger).Log("msg", "rejecting sample", "err", err)
398		}
399	}
400	return err
401}
402
403func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error {
404	return c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
405		if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
406			// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
407			// this sample at is less than updateTimeout amount of time since the timestamp in the KV store.
408			if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
409				return nil, false, nil
410			}
411
412			// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
413			// is less than failover timeout amount of time since the timestamp in the KV store.
414			if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
415				return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
416			}
417		}
418
419		// There was either invalid or no data for the key, so we now accept samples
420		// from this replica. Invalid could mean that the timestamp in the KV store was
421		// out of date based on the update and failover timeouts when compared to now.
422		return &ReplicaDesc{
423			Replica:    replica,
424			ReceivedAt: timestamp.FromTime(now),
425			DeletedAt:  0,
426		}, true, nil
427	})
428}
429
430type replicasNotMatchError struct {
431	replica, elected string
432}
433
434func (e replicasNotMatchError) Error() string {
435	return fmt.Sprintf("replicas did not mach, rejecting sample: replica=%s, elected=%s", e.replica, e.elected)
436}
437
438// Needed for errors.Is to work properly.
439func (e replicasNotMatchError) Is(err error) bool {
440	_, ok1 := err.(replicasNotMatchError)
441	_, ok2 := err.(*replicasNotMatchError)
442	return ok1 || ok2
443}
444
445// IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.
446func (e replicasNotMatchError) IsOperationAborted() bool {
447	return true
448}
449
450type tooManyClustersError struct {
451	limit int
452}
453
454func (e tooManyClustersError) Error() string {
455	return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
456}
457
458// Needed for errors.Is to work properly.
459func (e tooManyClustersError) Is(err error) bool {
460	_, ok1 := err.(tooManyClustersError)
461	_, ok2 := err.(*tooManyClustersError)
462	return ok1 || ok2
463}
464
465func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdapter) (string, string) {
466	var cluster, replica string
467	var pair cortexpb.LabelAdapter
468
469	for _, pair = range labels {
470		if pair.Name == replicaLabel {
471			replica = pair.Value
472		}
473		if pair.Name == clusterLabel {
474			cluster = pair.Value
475		}
476	}
477
478	return cluster, replica
479}
480
481func (c *haTracker) cleanupHATrackerMetricsForUser(userID string) {
482	filter := map[string]string{"user": userID}
483
484	if err := util.DeleteMatchingLabels(c.electedReplicaChanges, filter); err != nil {
485		level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_changes_total metric for user", "user", userID, "err", err)
486	}
487	if err := util.DeleteMatchingLabels(c.electedReplicaTimestamp, filter); err != nil {
488		level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_timestamp_seconds metric for user", "user", userID, "err", err)
489	}
490	if err := util.DeleteMatchingLabels(c.kvCASCalls, filter); err != nil {
491		level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_kv_store_cas_total metric for user", "user", userID, "err", err)
492	}
493}
494