1package distributor 2 3import ( 4 "context" 5 "errors" 6 "flag" 7 "fmt" 8 "math/rand" 9 "strings" 10 "sync" 11 "time" 12 13 "github.com/go-kit/log" 14 "github.com/go-kit/log/level" 15 "github.com/gogo/protobuf/proto" 16 "github.com/grafana/dskit/kv" 17 "github.com/grafana/dskit/kv/codec" 18 "github.com/grafana/dskit/services" 19 "github.com/prometheus/client_golang/prometheus" 20 "github.com/prometheus/client_golang/prometheus/promauto" 21 "github.com/prometheus/prometheus/pkg/timestamp" 22 23 "github.com/cortexproject/cortex/pkg/cortexpb" 24 "github.com/cortexproject/cortex/pkg/util" 25) 26 27var ( 28 errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative") 29 errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)" 30) 31 32type haTrackerLimits interface { 33 // MaxHAClusters returns max number of clusters that HA tracker should track for a user. 34 // Samples from additional clusters are rejected. 35 MaxHAClusters(user string) int 36} 37 38// ProtoReplicaDescFactory makes new InstanceDescs 39func ProtoReplicaDescFactory() proto.Message { 40 return NewReplicaDesc() 41} 42 43// NewReplicaDesc returns an empty *distributor.ReplicaDesc. 44func NewReplicaDesc() *ReplicaDesc { 45 return &ReplicaDesc{} 46} 47 48// HATrackerConfig contains the configuration require to 49// create a HA Tracker. 50type HATrackerConfig struct { 51 EnableHATracker bool `yaml:"enable_ha_tracker"` 52 // We should only update the timestamp if the difference 53 // between the stored timestamp and the time we received a sample at 54 // is more than this duration. 55 UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` 56 UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"` 57 // We should only failover to accepting samples from a replica 58 // other than the replica written in the KVStore if the difference 59 // between the stored timestamp and the time we received a sample is 60 // more than this duration 61 FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` 62 63 KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."` 64} 65 66// RegisterFlags adds the flags required to config this to the given FlagSet. 67func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { 68 f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).") 69 f.DurationVar(&cfg.UpdateTimeout, "distributor.ha-tracker.update-timeout", 15*time.Second, "Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.") 70 f.DurationVar(&cfg.UpdateTimeoutJitterMax, "distributor.ha-tracker.update-timeout-jitter-max", 5*time.Second, "Maximum jitter applied to the update timeout, in order to spread the HA heartbeats over time.") 71 f.DurationVar(&cfg.FailoverTimeout, "distributor.ha-tracker.failover-timeout", 30*time.Second, "If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout") 72 73 // We want the ability to use different Consul instances for the ring and 74 // for HA cluster tracking. We also customize the default keys prefix, in 75 // order to not clash with the ring key if they both share the same KVStore 76 // backend (ie. run on the same consul cluster). 77 cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", "ha-tracker/", f) 78} 79 80// Validate config and returns error on failure 81func (cfg *HATrackerConfig) Validate() error { 82 if cfg.UpdateTimeoutJitterMax < 0 { 83 return errNegativeUpdateTimeoutJitterMax 84 } 85 86 minFailureTimeout := cfg.UpdateTimeout + cfg.UpdateTimeoutJitterMax + time.Second 87 if cfg.FailoverTimeout < minFailureTimeout { 88 return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) 89 } 90 91 return nil 92} 93 94func GetReplicaDescCodec() codec.Proto { 95 return codec.NewProtoCodec("replicaDesc", ProtoReplicaDescFactory) 96} 97 98// Track the replica we're accepting samples from 99// for each HA cluster we know about. 100type haTracker struct { 101 services.Service 102 103 logger log.Logger 104 cfg HATrackerConfig 105 client kv.Client 106 updateTimeoutJitter time.Duration 107 limits haTrackerLimits 108 109 electedLock sync.RWMutex 110 elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster". 111 clusters map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name. 112 113 electedReplicaChanges *prometheus.CounterVec 114 electedReplicaTimestamp *prometheus.GaugeVec 115 electedReplicaPropagationTime prometheus.Histogram 116 kvCASCalls *prometheus.CounterVec 117 118 cleanupRuns prometheus.Counter 119 replicasMarkedForDeletion prometheus.Counter 120 deletedReplicas prometheus.Counter 121 markingForDeletionsFailed prometheus.Counter 122} 123 124// NewClusterTracker returns a new HA cluster tracker using either Consul 125// or in-memory KV store. Tracker must be started via StartAsync(). 126func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Registerer, logger log.Logger) (*haTracker, error) { 127 var jitter time.Duration 128 if cfg.UpdateTimeoutJitterMax > 0 { 129 jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax 130 } 131 132 t := &haTracker{ 133 logger: logger, 134 cfg: cfg, 135 updateTimeoutJitter: jitter, 136 limits: limits, 137 elected: map[string]ReplicaDesc{}, 138 clusters: map[string]map[string]struct{}{}, 139 140 electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ 141 Name: "cortex_ha_tracker_elected_replica_changes_total", 142 Help: "The total number of times the elected replica has changed for a user ID/cluster.", 143 }, []string{"user", "cluster"}), 144 electedReplicaTimestamp: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ 145 Name: "cortex_ha_tracker_elected_replica_timestamp_seconds", 146 Help: "The timestamp stored for the currently elected replica, from the KVStore.", 147 }, []string{"user", "cluster"}), 148 electedReplicaPropagationTime: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ 149 Name: "cortex_ha_tracker_elected_replica_change_propagation_time_seconds", 150 Help: "The time it for the distributor to update the replica change.", 151 Buckets: prometheus.DefBuckets, 152 }), 153 kvCASCalls: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ 154 Name: "cortex_ha_tracker_kv_store_cas_total", 155 Help: "The total number of CAS calls to the KV store for a user ID/cluster.", 156 }, []string{"user", "cluster"}), 157 158 cleanupRuns: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 159 Name: "cortex_ha_tracker_replicas_cleanup_started_total", 160 Help: "Number of elected replicas cleanup loops started.", 161 }), 162 replicasMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 163 Name: "cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total", 164 Help: "Number of elected replicas marked for deletion.", 165 }), 166 deletedReplicas: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 167 Name: "cortex_ha_tracker_replicas_cleanup_deleted_total", 168 Help: "Number of elected replicas deleted from KV store.", 169 }), 170 markingForDeletionsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 171 Name: "cortex_ha_tracker_replicas_cleanup_delete_failed_total", 172 Help: "Number of elected replicas that failed to be marked for deletion, or deleted.", 173 }), 174 } 175 176 if cfg.EnableHATracker { 177 client, err := kv.NewClient( 178 cfg.KVStore, 179 GetReplicaDescCodec(), 180 kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("cortex_", reg), "distributor-hatracker"), 181 logger, 182 ) 183 if err != nil { 184 return nil, err 185 } 186 t.client = client 187 } 188 189 t.Service = services.NewBasicService(nil, t.loop, nil) 190 return t, nil 191} 192 193// Follows pattern used by ring for WatchKey. 194func (c *haTracker) loop(ctx context.Context) error { 195 if !c.cfg.EnableHATracker { 196 // don't do anything, but wait until asked to stop. 197 <-ctx.Done() 198 return nil 199 } 200 201 // Start cleanup loop. It will stop when context is done. 202 wg := sync.WaitGroup{} 203 wg.Add(1) 204 go func() { 205 defer wg.Done() 206 c.cleanupOldReplicasLoop(ctx) 207 }() 208 209 // The KVStore config we gave when creating c should have contained a prefix, 210 // which would have given us a prefixed KVStore client. So, we can pass empty string here. 211 c.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool { 212 replica := value.(*ReplicaDesc) 213 segments := strings.SplitN(key, "/", 2) 214 215 // Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid. 216 if len(segments) != 2 { 217 return true 218 } 219 220 user := segments[0] 221 cluster := segments[1] 222 223 c.electedLock.Lock() 224 defer c.electedLock.Unlock() 225 226 if replica.DeletedAt > 0 { 227 delete(c.elected, key) 228 c.electedReplicaChanges.DeleteLabelValues(user, cluster) 229 c.electedReplicaTimestamp.DeleteLabelValues(user, cluster) 230 231 userClusters := c.clusters[user] 232 if userClusters != nil { 233 delete(userClusters, cluster) 234 if len(userClusters) == 0 { 235 delete(c.clusters, user) 236 } 237 } 238 return true 239 } 240 241 elected, exists := c.elected[key] 242 if replica.Replica != elected.Replica { 243 c.electedReplicaChanges.WithLabelValues(user, cluster).Inc() 244 } 245 if !exists { 246 if c.clusters[user] == nil { 247 c.clusters[user] = map[string]struct{}{} 248 } 249 c.clusters[user][cluster] = struct{}{} 250 } 251 c.elected[key] = *replica 252 c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000)) 253 c.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds()) 254 return true 255 }) 256 257 wg.Wait() 258 return nil 259} 260 261const ( 262 cleanupCyclePeriod = 30 * time.Minute 263 cleanupCycleJitterVariance = 0.2 // for 30 minutes, this is ±6 min 264 265 // If we have received last sample for given cluster before this timeout, we will mark selected replica for deletion. 266 // If selected replica is marked for deletion for this time, it is deleted completely. 267 deletionTimeout = 30 * time.Minute 268) 269 270func (c *haTracker) cleanupOldReplicasLoop(ctx context.Context) { 271 tick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance)) 272 defer tick.Stop() 273 274 for { 275 select { 276 case <-ctx.Done(): 277 return 278 case t := <-tick.C: 279 c.cleanupRuns.Inc() 280 c.cleanupOldReplicas(ctx, t.Add(-deletionTimeout)) 281 } 282 } 283} 284 285// Replicas marked for deletion before deadline will be deleted. 286// Replicas with last-received timestamp before deadline will be marked for deletion. 287func (c *haTracker) cleanupOldReplicas(ctx context.Context, deadline time.Time) { 288 keys, err := c.client.List(ctx, "") 289 if err != nil { 290 level.Warn(c.logger).Log("msg", "cleanup: failed to list replica keys", "err", err) 291 return 292 } 293 294 for _, key := range keys { 295 if ctx.Err() != nil { 296 return 297 } 298 299 val, err := c.client.Get(ctx, key) 300 if err != nil { 301 level.Warn(c.logger).Log("msg", "cleanup: failed to get replica value", "key", key, "err", err) 302 continue 303 } 304 305 desc, ok := val.(*ReplicaDesc) 306 if !ok { 307 level.Error(c.logger).Log("msg", "cleanup: got invalid replica descriptor", "key", key) 308 continue 309 } 310 311 if desc.DeletedAt > 0 { 312 if timestamp.Time(desc.DeletedAt).After(deadline) { 313 continue 314 } 315 316 // We're blindly deleting a key here. It may happen that value was updated since we have read it few lines above, 317 // in which case Distributors will have updated value in memory, but Delete will remove it from KV store anyway. 318 // That's not great, but should not be a problem. If KV store sends Watch notification for Delete, distributors will 319 // delete it from memory, and recreate on next sample with matching replica. 320 // 321 // If KV store doesn't send Watch notification for Delete, distributors *with* replica in memory will keep using it, 322 // while distributors *without* replica in memory will try to write it to KV store -- which will update *all* 323 // watching distributors. 324 err = c.client.Delete(ctx, key) 325 if err != nil { 326 level.Error(c.logger).Log("msg", "cleanup: failed to delete old replica", "key", key, "err", err) 327 c.markingForDeletionsFailed.Inc() 328 } else { 329 level.Info(c.logger).Log("msg", "cleanup: deleted old replica", "key", key) 330 c.deletedReplicas.Inc() 331 } 332 continue 333 } 334 335 // Not marked as deleted yet. 336 if desc.DeletedAt == 0 && timestamp.Time(desc.ReceivedAt).Before(deadline) { 337 err := c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { 338 d, ok := in.(*ReplicaDesc) 339 if !ok || d == nil || d.DeletedAt > 0 || !timestamp.Time(desc.ReceivedAt).Before(deadline) { 340 return nil, false, nil 341 } 342 343 d.DeletedAt = timestamp.FromTime(time.Now()) 344 return d, true, nil 345 }) 346 347 if err != nil { 348 c.markingForDeletionsFailed.Inc() 349 level.Error(c.logger).Log("msg", "cleanup: failed to mark replica as deleted", "key", key, "err", err) 350 } else { 351 c.replicasMarkedForDeletion.Inc() 352 level.Info(c.logger).Log("msg", "cleanup: marked replica as deleted", "key", key) 353 } 354 } 355 } 356} 357 358// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the 359// tracker c to see if we should accept the incomming sample. It will return an error if the sample 360// should not be accepted. Note that internally this function does checks against the stored values 361// and may modify the stored data, for example to failover between replicas after a certain period of time. 362// replicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are 363// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned 364// to customers clients. 365func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string, now time.Time) error { 366 // If HA tracking isn't enabled then accept the sample 367 if !c.cfg.EnableHATracker { 368 return nil 369 } 370 key := fmt.Sprintf("%s/%s", userID, cluster) 371 372 c.electedLock.RLock() 373 entry, ok := c.elected[key] 374 clusters := len(c.clusters[userID]) 375 c.electedLock.RUnlock() 376 377 if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter { 378 if entry.Replica != replica { 379 return replicasNotMatchError{replica: replica, elected: entry.Replica} 380 } 381 return nil 382 } 383 384 if !ok { 385 // If we don't know about this cluster yet and we have reached the limit for number of clusters, we error out now. 386 if limit := c.limits.MaxHAClusters(userID); limit > 0 && clusters+1 > limit { 387 return tooManyClustersError{limit: limit} 388 } 389 } 390 391 err := c.checkKVStore(ctx, key, replica, now) 392 c.kvCASCalls.WithLabelValues(userID, cluster).Inc() 393 if err != nil { 394 // The callback within checkKVStore will return a replicasNotMatchError if the sample is being deduped, 395 // otherwise there may have been an actual error CAS'ing that we should log. 396 if !errors.Is(err, replicasNotMatchError{}) { 397 level.Error(c.logger).Log("msg", "rejecting sample", "err", err) 398 } 399 } 400 return err 401} 402 403func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error { 404 return c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { 405 if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 { 406 // We don't need to CAS and update the timestamp in the KV store if the timestamp we've received 407 // this sample at is less than updateTimeout amount of time since the timestamp in the KV store. 408 if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter { 409 return nil, false, nil 410 } 411 412 // We shouldn't failover to accepting a new replica if the timestamp we've received this sample at 413 // is less than failover timeout amount of time since the timestamp in the KV store. 414 if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout { 415 return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica} 416 } 417 } 418 419 // There was either invalid or no data for the key, so we now accept samples 420 // from this replica. Invalid could mean that the timestamp in the KV store was 421 // out of date based on the update and failover timeouts when compared to now. 422 return &ReplicaDesc{ 423 Replica: replica, 424 ReceivedAt: timestamp.FromTime(now), 425 DeletedAt: 0, 426 }, true, nil 427 }) 428} 429 430type replicasNotMatchError struct { 431 replica, elected string 432} 433 434func (e replicasNotMatchError) Error() string { 435 return fmt.Sprintf("replicas did not mach, rejecting sample: replica=%s, elected=%s", e.replica, e.elected) 436} 437 438// Needed for errors.Is to work properly. 439func (e replicasNotMatchError) Is(err error) bool { 440 _, ok1 := err.(replicasNotMatchError) 441 _, ok2 := err.(*replicasNotMatchError) 442 return ok1 || ok2 443} 444 445// IsOperationAborted returns whether the error has been caused by an operation intentionally aborted. 446func (e replicasNotMatchError) IsOperationAborted() bool { 447 return true 448} 449 450type tooManyClustersError struct { 451 limit int 452} 453 454func (e tooManyClustersError) Error() string { 455 return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit) 456} 457 458// Needed for errors.Is to work properly. 459func (e tooManyClustersError) Is(err error) bool { 460 _, ok1 := err.(tooManyClustersError) 461 _, ok2 := err.(*tooManyClustersError) 462 return ok1 || ok2 463} 464 465func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdapter) (string, string) { 466 var cluster, replica string 467 var pair cortexpb.LabelAdapter 468 469 for _, pair = range labels { 470 if pair.Name == replicaLabel { 471 replica = pair.Value 472 } 473 if pair.Name == clusterLabel { 474 cluster = pair.Value 475 } 476 } 477 478 return cluster, replica 479} 480 481func (c *haTracker) cleanupHATrackerMetricsForUser(userID string) { 482 filter := map[string]string{"user": userID} 483 484 if err := util.DeleteMatchingLabels(c.electedReplicaChanges, filter); err != nil { 485 level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_changes_total metric for user", "user", userID, "err", err) 486 } 487 if err := util.DeleteMatchingLabels(c.electedReplicaTimestamp, filter); err != nil { 488 level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_timestamp_seconds metric for user", "user", userID, "err", err) 489 } 490 if err := util.DeleteMatchingLabels(c.kvCASCalls, filter); err != nil { 491 level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_kv_store_cas_total metric for user", "user", userID, "err", err) 492 } 493} 494