1package ingester 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "net/http" 8 "os" 9 "sync" 10 "time" 11 12 "github.com/cortexproject/cortex/pkg/ring" 13 "github.com/cortexproject/cortex/pkg/tenant" 14 "github.com/cortexproject/cortex/pkg/util" 15 util_log "github.com/cortexproject/cortex/pkg/util/log" 16 "github.com/go-kit/kit/log/level" 17 "github.com/grafana/dskit/services" 18 "github.com/pkg/errors" 19 "github.com/prometheus/client_golang/prometheus" 20 "github.com/prometheus/client_golang/prometheus/promauto" 21 "github.com/prometheus/common/model" 22 "github.com/prometheus/prometheus/pkg/labels" 23 "google.golang.org/grpc/health/grpc_health_v1" 24 25 "github.com/grafana/loki/pkg/chunkenc" 26 "github.com/grafana/loki/pkg/ingester/client" 27 "github.com/grafana/loki/pkg/ingester/index" 28 "github.com/grafana/loki/pkg/iter" 29 "github.com/grafana/loki/pkg/logproto" 30 "github.com/grafana/loki/pkg/logql" 31 "github.com/grafana/loki/pkg/logqlmodel/stats" 32 "github.com/grafana/loki/pkg/runtime" 33 "github.com/grafana/loki/pkg/storage" 34 "github.com/grafana/loki/pkg/storage/chunk" 35 "github.com/grafana/loki/pkg/storage/stores/shipper" 36 errUtil "github.com/grafana/loki/pkg/util" 37 listutil "github.com/grafana/loki/pkg/util" 38 "github.com/grafana/loki/pkg/validation" 39) 40 41// ErrReadOnly is returned when the ingester is shutting down and a push was 42// attempted. 43var ErrReadOnly = errors.New("Ingester is shutting down") 44 45var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ 46 Name: "cortex_ingester_flush_queue_length", 47 Help: "The total number of series pending in the flush queue.", 48}) 49 50// Config for an ingester. 51type Config struct { 52 LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` 53 54 // Config for transferring chunks. 55 MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` 56 57 ConcurrentFlushes int `yaml:"concurrent_flushes"` 58 FlushCheckPeriod time.Duration `yaml:"flush_check_period"` 59 FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` 60 RetainPeriod time.Duration `yaml:"chunk_retain_period"` 61 MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` 62 BlockSize int `yaml:"chunk_block_size"` 63 TargetChunkSize int `yaml:"chunk_target_size"` 64 ChunkEncoding string `yaml:"chunk_encoding"` 65 parsedEncoding chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding 66 MaxChunkAge time.Duration `yaml:"max_chunk_age"` 67 AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` 68 69 // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. 70 SyncPeriod time.Duration `yaml:"sync_period"` 71 SyncMinUtilization float64 `yaml:"sync_min_utilization"` 72 73 MaxReturnedErrors int `yaml:"max_returned_stream_errors"` 74 75 // For testing, you can override the address and ID of this ingester. 76 ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) 77 78 QueryStore bool `yaml:"-"` 79 QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"` 80 81 WAL WALConfig `yaml:"wal,omitempty"` 82 83 ChunkFilterer storage.RequestChunkFilterer `yaml:"-"` 84 85 IndexShards int `yaml:"index_shards"` 86} 87 88// RegisterFlags registers the flags. 89func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 90 cfg.LifecyclerConfig.RegisterFlags(f) 91 cfg.WAL.RegisterFlags(f) 92 93 f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.") 94 f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 16, "") 95 f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "") 96 f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "") 97 f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "") 98 f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "") 99 f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "") 100 f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "") //1.5 MB 101 f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding())) 102 f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.") 103 f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.") 104 f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.") 105 f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.") 106 f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.") 107 f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`") 108 f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.") 109} 110 111func (cfg *Config) Validate() error { 112 enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding) 113 if err != nil { 114 return err 115 } 116 cfg.parsedEncoding = enc 117 118 if err = cfg.WAL.Validate(); err != nil { 119 return err 120 } 121 122 if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled { 123 return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers") 124 } 125 126 if cfg.IndexShards <= 0 { 127 return fmt.Errorf("Invalid ingester index shard factor: %d", cfg.IndexShards) 128 } 129 130 return nil 131} 132 133// Ingester builds chunks for incoming log streams. 134type Ingester struct { 135 services.Service 136 137 cfg Config 138 clientConfig client.Config 139 tenantConfigs *runtime.TenantConfigs 140 141 shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown 142 instancesMtx sync.RWMutex 143 instances map[string]*instance 144 readonly bool 145 146 lifecycler *ring.Lifecycler 147 lifecyclerWatcher *services.FailureWatcher 148 149 store ChunkStore 150 periodicConfigs []chunk.PeriodConfig 151 152 loopDone sync.WaitGroup 153 loopQuit chan struct{} 154 tailersQuit chan struct{} 155 156 // One queue per flush thread. Fingerprint is used to 157 // pick a queue. 158 flushQueues []*util.PriorityQueue 159 flushQueuesDone sync.WaitGroup 160 161 limiter *Limiter 162 163 // Denotes whether the ingester should flush on shutdown. 164 // Currently only used by the WAL to signal when the disk is full. 165 flushOnShutdownSwitch *OnceSwitch 166 167 // Only used by WAL & flusher to coordinate backpressure during replay. 168 replayController *replayController 169 170 metrics *ingesterMetrics 171 172 wal WAL 173 174 chunkFilter storage.RequestChunkFilterer 175} 176 177// ChunkStore is the interface we need to store chunks. 178type ChunkStore interface { 179 Put(ctx context.Context, chunks []chunk.Chunk) error 180 SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) 181 SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) 182 GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) 183 GetSchemaConfigs() []chunk.PeriodConfig 184} 185 186// New makes a new Ingester. 187func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { 188 if cfg.ingesterClientFactory == nil { 189 cfg.ingesterClientFactory = client.New 190 } 191 192 metrics := newIngesterMetrics(registerer) 193 194 i := &Ingester{ 195 cfg: cfg, 196 clientConfig: clientConfig, 197 tenantConfigs: configs, 198 instances: map[string]*instance{}, 199 store: store, 200 periodicConfigs: store.GetSchemaConfigs(), 201 loopQuit: make(chan struct{}), 202 flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), 203 tailersQuit: make(chan struct{}), 204 metrics: metrics, 205 flushOnShutdownSwitch: &OnceSwitch{}, 206 } 207 i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) 208 209 if cfg.WAL.Enabled { 210 if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil { 211 return nil, err 212 } 213 } 214 215 wal, err := newWAL(cfg.WAL, registerer, metrics, newIngesterSeriesIter(i)) 216 if err != nil { 217 return nil, err 218 } 219 i.wal = wal 220 221 i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WAL.Enabled || cfg.WAL.FlushOnShutdown, registerer) 222 if err != nil { 223 return nil, err 224 } 225 226 i.lifecyclerWatcher = services.NewFailureWatcher() 227 i.lifecyclerWatcher.WatchService(i.lifecycler) 228 229 // Now that the lifecycler has been created, we can create the limiter 230 // which depends on it. 231 i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) 232 233 i.Service = services.NewBasicService(i.starting, i.running, i.stopping) 234 235 i.setupAutoForget() 236 237 if i.cfg.ChunkFilterer != nil { 238 i.SetChunkFilterer(i.cfg.ChunkFilterer) 239 } 240 241 return i, nil 242} 243 244func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) { 245 i.chunkFilter = chunkFilter 246} 247 248// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled 249// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod` 250func (i *Ingester) setupAutoForget() { 251 if !i.cfg.AutoForgetUnhealthy { 252 return 253 } 254 255 go func() { 256 ctx := context.Background() 257 err := i.Service.AwaitRunning(ctx) 258 if err != nil { 259 level.Error(util_log.Logger).Log("msg", fmt.Sprintf("autoforget received error %s, autoforget is disabled", err.Error())) 260 return 261 } 262 263 level.Info(util_log.Logger).Log("msg", fmt.Sprintf("autoforget is enabled and will remove unhealthy instances from the ring after %v with no heartbeat", i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout)) 264 265 ticker := time.NewTicker(i.cfg.LifecyclerConfig.HeartbeatPeriod) 266 defer ticker.Stop() 267 268 var forgetList []string 269 for range ticker.C { 270 err := i.lifecycler.KVStore.CAS(ctx, ring.IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { 271 forgetList = forgetList[:0] 272 if in == nil { 273 return nil, false, nil 274 } 275 276 ringDesc, ok := in.(*ring.Desc) 277 if !ok { 278 level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget saw a KV store value that was not `ring.Desc`, got `%T`", in)) 279 return nil, false, nil 280 } 281 282 for id, ingester := range ringDesc.Ingesters { 283 if !ingester.IsHealthy(ring.Reporting, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout, time.Now()) { 284 if i.lifecycler.ID == id { 285 level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget has seen our ID `%s` as unhealthy in the ring, network may be partitioned, skip forgeting ingesters this round", id)) 286 return nil, false, nil 287 } 288 forgetList = append(forgetList, id) 289 } 290 } 291 292 if len(forgetList) == len(ringDesc.Ingesters)-1 { 293 level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget have seen %d unhealthy ingesters out of %d, network may be partioned, skip forgeting ingesters this round", len(forgetList), len(ringDesc.Ingesters))) 294 forgetList = forgetList[:0] 295 return nil, false, nil 296 } 297 298 if len(forgetList) > 0 { 299 for _, id := range forgetList { 300 ringDesc.RemoveIngester(id) 301 } 302 return ringDesc, true, nil 303 } 304 return nil, false, nil 305 }) 306 if err != nil { 307 level.Warn(util_log.Logger).Log("msg", err) 308 continue 309 } 310 311 for _, id := range forgetList { 312 level.Info(util_log.Logger).Log("msg", fmt.Sprintf("autoforget removed ingester %v from the ring because it was not healthy after %v", id, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout)) 313 } 314 i.metrics.autoForgetUnhealthyIngestersTotal.Add(float64(len(forgetList))) 315 } 316 }() 317} 318 319func (i *Ingester) starting(ctx context.Context) error { 320 if i.cfg.WAL.Enabled { 321 start := time.Now() 322 323 // Ignore retain period during wal replay. 324 oldRetain := i.cfg.RetainPeriod 325 i.cfg.RetainPeriod = 0 326 327 // Disable the in process stream limit checks while replaying the WAL. 328 // It is re-enabled in the recover's Close() method. 329 i.limiter.DisableForWALReplay() 330 331 recoverer := newIngesterRecoverer(i) 332 333 i.metrics.walReplayActive.Set(1) 334 335 endReplay := func() func() { 336 var once sync.Once 337 return func() { 338 once.Do(func() { 339 level.Info(util_log.Logger).Log("msg", "closing recoverer") 340 recoverer.Close() 341 342 elapsed := time.Since(start) 343 344 i.metrics.walReplayActive.Set(0) 345 i.metrics.walReplayDuration.Set(elapsed.Seconds()) 346 i.cfg.RetainPeriod = oldRetain 347 level.Info(util_log.Logger).Log("msg", "WAL recovery finished", "time", elapsed.String()) 348 }) 349 } 350 }() 351 defer endReplay() 352 353 level.Info(util_log.Logger).Log("msg", "recovering from checkpoint") 354 checkpointReader, checkpointCloser, err := newCheckpointReader(i.cfg.WAL.Dir) 355 if err != nil { 356 return err 357 } 358 defer checkpointCloser.Close() 359 360 checkpointRecoveryErr := RecoverCheckpoint(checkpointReader, recoverer) 361 if checkpointRecoveryErr != nil { 362 i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc() 363 level.Error(util_log.Logger).Log( 364 "msg", 365 `Recovered from checkpoint with errors. Some streams were likely not recovered due to WAL checkpoint file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.`, 366 "elapsed", time.Since(start).String(), 367 ) 368 } 369 level.Info(util_log.Logger).Log( 370 "msg", "recovered WAL checkpoint recovery finished", 371 "elapsed", time.Since(start).String(), 372 "errors", checkpointRecoveryErr != nil, 373 ) 374 375 level.Info(util_log.Logger).Log("msg", "recovering from WAL") 376 segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1) 377 if err != nil { 378 return err 379 } 380 defer segmentCloser.Close() 381 382 segmentRecoveryErr := RecoverWAL(segmentReader, recoverer) 383 if segmentRecoveryErr != nil { 384 i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc() 385 level.Error(util_log.Logger).Log( 386 "msg", 387 "Recovered from WAL segments with errors. Some streams and/or entries were likely not recovered due to WAL segment file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.", 388 "elapsed", time.Since(start).String(), 389 ) 390 } 391 level.Info(util_log.Logger).Log( 392 "msg", "WAL segment recovery finished", 393 "elapsed", time.Since(start).String(), 394 "errors", segmentRecoveryErr != nil, 395 ) 396 397 endReplay() 398 399 i.wal.Start() 400 } 401 402 i.InitFlushQueues() 403 404 // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done 405 err := i.lifecycler.StartAsync(context.Background()) 406 if err != nil { 407 return err 408 } 409 410 err = i.lifecycler.AwaitRunning(ctx) 411 if err != nil { 412 return err 413 } 414 415 // start our loop 416 i.loopDone.Add(1) 417 go i.loop() 418 return nil 419} 420 421func (i *Ingester) running(ctx context.Context) error { 422 var serviceError error 423 select { 424 // wait until service is asked to stop 425 case <-ctx.Done(): 426 // stop 427 case err := <-i.lifecyclerWatcher.Chan(): 428 serviceError = fmt.Errorf("lifecycler failed: %w", err) 429 } 430 431 // close tailers before stopping our loop 432 close(i.tailersQuit) 433 for _, instance := range i.getInstances() { 434 instance.closeTailers() 435 } 436 437 close(i.loopQuit) 438 i.loopDone.Wait() 439 return serviceError 440} 441 442// Called after running exits, when Ingester transitions to Stopping state. 443// At this point, loop no longer runs, but flushers are still running. 444func (i *Ingester) stopping(_ error) error { 445 i.stopIncomingRequests() 446 var errs errUtil.MultiError 447 errs.Add(i.wal.Stop()) 448 449 if i.flushOnShutdownSwitch.Get() { 450 i.lifecycler.SetFlushOnShutdown(true) 451 } 452 errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) 453 454 // Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails, 455 // we better stop them. 456 for _, flushQueue := range i.flushQueues { 457 flushQueue.Close() 458 } 459 i.flushQueuesDone.Wait() 460 461 return errs.Err() 462} 463 464func (i *Ingester) loop() { 465 defer i.loopDone.Done() 466 467 flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod) 468 defer flushTicker.Stop() 469 470 for { 471 select { 472 case <-flushTicker.C: 473 i.sweepUsers(false, true) 474 475 case <-i.loopQuit: 476 return 477 } 478 } 479} 480 481// ShutdownHandler triggers the following set of operations in order: 482// * Change the state of ring to stop accepting writes. 483// * Flush all the chunks. 484func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { 485 originalState := i.lifecycler.FlushOnShutdown() 486 // We want to flush the chunks if transfer fails irrespective of original flag. 487 i.lifecycler.SetFlushOnShutdown(true) 488 _ = services.StopAndAwaitTerminated(context.Background(), i) 489 i.lifecycler.SetFlushOnShutdown(originalState) 490 w.WriteHeader(http.StatusNoContent) 491} 492 493// Push implements logproto.Pusher. 494func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { 495 instanceID, err := tenant.TenantID(ctx) 496 if err != nil { 497 return nil, err 498 } else if i.readonly { 499 return nil, ErrReadOnly 500 } 501 502 instance := i.getOrCreateInstance(instanceID) 503 err = instance.Push(ctx, req) 504 return &logproto.PushResponse{}, err 505} 506 507func (i *Ingester) getOrCreateInstance(instanceID string) *instance { 508 inst, ok := i.getInstanceByID(instanceID) 509 if ok { 510 return inst 511 } 512 513 i.instancesMtx.Lock() 514 defer i.instancesMtx.Unlock() 515 inst, ok = i.instances[instanceID] 516 if !ok { 517 inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) 518 i.instances[instanceID] = inst 519 } 520 return inst 521} 522 523// Query the ingests for log streams matching a set of matchers. 524func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { 525 // initialize stats collection for ingester queries and set grpc trailer with stats. 526 ctx := stats.NewContext(queryServer.Context()) 527 defer stats.SendAsTrailer(ctx, queryServer) 528 529 instanceID, err := tenant.TenantID(ctx) 530 if err != nil { 531 return err 532 } 533 534 instance := i.getOrCreateInstance(instanceID) 535 itrs, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) 536 if err != nil { 537 return err 538 } 539 540 if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { 541 storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ 542 Selector: req.Selector, 543 Direction: req.Direction, 544 Start: start, 545 End: end, 546 Limit: req.Limit, 547 Shards: req.Shards, 548 }} 549 storeItr, err := i.store.SelectLogs(ctx, storeReq) 550 if err != nil { 551 return err 552 } 553 554 itrs = append(itrs, storeItr) 555 } 556 557 heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction) 558 559 defer listutil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close) 560 561 return sendBatches(ctx, heapItr, queryServer, req.Limit) 562} 563 564// QuerySample the ingesters for series from logs matching a set of matchers. 565func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error { 566 // initialize stats collection for ingester queries and set grpc trailer with stats. 567 ctx := stats.NewContext(queryServer.Context()) 568 defer stats.SendAsTrailer(ctx, queryServer) 569 570 instanceID, err := tenant.TenantID(ctx) 571 if err != nil { 572 return err 573 } 574 575 instance := i.getOrCreateInstance(instanceID) 576 itrs, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) 577 if err != nil { 578 return err 579 } 580 581 if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { 582 storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ 583 Start: start, 584 End: end, 585 Selector: req.Selector, 586 Shards: req.Shards, 587 }} 588 storeItr, err := i.store.SelectSamples(ctx, storeReq) 589 if err != nil { 590 return err 591 } 592 593 itrs = append(itrs, storeItr) 594 } 595 596 heapItr := iter.NewHeapSampleIterator(ctx, itrs) 597 598 defer listutil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close) 599 600 return sendSampleBatches(ctx, heapItr, queryServer) 601} 602 603// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper. 604// max look back is limited to from time of boltdb-shipper config. 605// It considers previous periodic config's from time if that also has index type set to boltdb-shipper. 606func (i *Ingester) boltdbShipperMaxLookBack() time.Duration { 607 activePeriodicConfigIndex := storage.ActivePeriodConfig(i.periodicConfigs) 608 activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex] 609 if activePeriodicConfig.IndexType != shipper.BoltDBShipperType { 610 return 0 611 } 612 613 startTime := activePeriodicConfig.From 614 if activePeriodicConfigIndex != 0 && i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == shipper.BoltDBShipperType { 615 startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From 616 } 617 618 maxLookBack := time.Since(startTime.Time.Time()) 619 return maxLookBack 620} 621 622// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper. 623func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { 624 orgID, err := tenant.TenantID(ctx) 625 if err != nil { 626 return nil, err 627 } 628 629 boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack() 630 if boltdbShipperMaxLookBack == 0 { 631 return &logproto.GetChunkIDsResponse{}, nil 632 } 633 634 reqStart := req.Start 635 reqStart = adjustQueryStartTime(boltdbShipperMaxLookBack, reqStart, time.Now()) 636 637 // parse the request 638 start, end := listutil.RoundToMilliseconds(reqStart, req.End) 639 matchers, err := logql.ParseMatchers(req.Matchers) 640 if err != nil { 641 return nil, err 642 } 643 644 // get chunk references 645 chunksGroups, _, err := i.store.GetChunkRefs(ctx, orgID, start, end, matchers...) 646 if err != nil { 647 return nil, err 648 } 649 650 // build the response 651 resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}} 652 for _, chunks := range chunksGroups { 653 for _, chk := range chunks { 654 resp.ChunkIDs = append(resp.ChunkIDs, chk.ExternalKey()) 655 } 656 } 657 658 return &resp, nil 659} 660 661// Label returns the set of labels for the stream this ingester knows about. 662func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { 663 userID, err := tenant.TenantID(ctx) 664 if err != nil { 665 return nil, err 666 } 667 668 instance := i.getOrCreateInstance(userID) 669 resp, err := instance.Label(ctx, req) 670 if err != nil { 671 return nil, err 672 } 673 674 // Only continue if the active index type is boltdb-shipper or QueryStore flag is true. 675 boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack() 676 if boltdbShipperMaxLookBack == 0 && !i.cfg.QueryStore { 677 return resp, nil 678 } 679 680 // Only continue if the store is a chunk.Store 681 var cs chunk.Store 682 var ok bool 683 if cs, ok = i.store.(chunk.Store); !ok { 684 return resp, nil 685 } 686 687 maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod 688 if boltdbShipperMaxLookBack != 0 { 689 maxLookBackPeriod = boltdbShipperMaxLookBack 690 } 691 // Adjust the start time based on QueryStoreMaxLookBackPeriod. 692 start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) 693 if start.After(*req.End) { 694 // The request is older than we are allowed to query the store, just return what we have. 695 return resp, nil 696 } 697 from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) 698 var storeValues []string 699 if req.Values { 700 storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name) 701 if err != nil { 702 return nil, err 703 } 704 } else { 705 storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs") 706 if err != nil { 707 return nil, err 708 } 709 } 710 711 return &logproto.LabelResponse{ 712 Values: listutil.MergeStringLists(resp.Values, storeValues), 713 }, nil 714} 715 716// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers 717func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { 718 instanceID, err := tenant.TenantID(ctx) 719 if err != nil { 720 return nil, err 721 } 722 723 instance := i.getOrCreateInstance(instanceID) 724 return instance.Series(ctx, req) 725} 726 727// Check implements grpc_health_v1.HealthCheck. 728func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { 729 return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil 730} 731 732// Watch implements grpc_health_v1.HealthCheck. 733func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error { 734 return nil 735} 736 737// ReadinessHandler is used to indicate to k8s when the ingesters are ready for 738// the addition removal of another ingester. Returns 204 when the ingester is 739// ready, 500 otherwise. 740func (i *Ingester) CheckReady(ctx context.Context) error { 741 if s := i.State(); s != services.Running && s != services.Stopping { 742 return fmt.Errorf("ingester not ready: %v", s) 743 } 744 return i.lifecycler.CheckReady(ctx) 745} 746 747func (i *Ingester) getInstanceByID(id string) (*instance, bool) { 748 i.instancesMtx.RLock() 749 defer i.instancesMtx.RUnlock() 750 751 inst, ok := i.instances[id] 752 return inst, ok 753} 754 755func (i *Ingester) getInstances() []*instance { 756 i.instancesMtx.RLock() 757 defer i.instancesMtx.RUnlock() 758 759 instances := make([]*instance, 0, len(i.instances)) 760 for _, instance := range i.instances { 761 instances = append(instances, instance) 762 } 763 return instances 764} 765 766// Tail logs matching given query 767func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { 768 select { 769 case <-i.tailersQuit: 770 return errors.New("Ingester is stopping") 771 default: 772 } 773 774 instanceID, err := tenant.TenantID(queryServer.Context()) 775 if err != nil { 776 return err 777 } 778 779 instance := i.getOrCreateInstance(instanceID) 780 tailer, err := newTailer(instanceID, req.Query, queryServer) 781 if err != nil { 782 return err 783 } 784 785 if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil { 786 return err 787 } 788 tailer.loop() 789 return nil 790} 791 792// TailersCount returns count of active tail requests from a user 793func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { 794 instanceID, err := tenant.TenantID(ctx) 795 if err != nil { 796 return nil, err 797 } 798 799 resp := logproto.TailersCountResponse{} 800 801 instance, ok := i.getInstanceByID(instanceID) 802 if ok { 803 resp.Count = instance.openTailersCount() 804 } 805 806 return &resp, nil 807} 808 809// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration. 810// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure 811// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query. 812func buildStoreRequest(cfg Config, start, end, now time.Time) (time.Time, time.Time, bool) { 813 if !cfg.QueryStore { 814 return time.Time{}, time.Time{}, false 815 } 816 start = adjustQueryStartTime(cfg.QueryStoreMaxLookBackPeriod, start, now) 817 818 if start.After(end) { 819 return time.Time{}, time.Time{}, false 820 } 821 return start, end, true 822} 823 824func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time) time.Time { 825 if maxLookBackPeriod > 0 { 826 oldestStartTime := now.Add(-maxLookBackPeriod) 827 if oldestStartTime.After(start) { 828 return oldestStartTime 829 } 830 } 831 return start 832} 833