1package ingester
2
3import (
4	"context"
5	"flag"
6	"fmt"
7	"net/http"
8	"os"
9	"sync"
10	"time"
11
12	"github.com/cortexproject/cortex/pkg/ring"
13	"github.com/cortexproject/cortex/pkg/tenant"
14	"github.com/cortexproject/cortex/pkg/util"
15	util_log "github.com/cortexproject/cortex/pkg/util/log"
16	"github.com/go-kit/kit/log/level"
17	"github.com/grafana/dskit/services"
18	"github.com/pkg/errors"
19	"github.com/prometheus/client_golang/prometheus"
20	"github.com/prometheus/client_golang/prometheus/promauto"
21	"github.com/prometheus/common/model"
22	"github.com/prometheus/prometheus/pkg/labels"
23	"google.golang.org/grpc/health/grpc_health_v1"
24
25	"github.com/grafana/loki/pkg/chunkenc"
26	"github.com/grafana/loki/pkg/ingester/client"
27	"github.com/grafana/loki/pkg/ingester/index"
28	"github.com/grafana/loki/pkg/iter"
29	"github.com/grafana/loki/pkg/logproto"
30	"github.com/grafana/loki/pkg/logql"
31	"github.com/grafana/loki/pkg/logqlmodel/stats"
32	"github.com/grafana/loki/pkg/runtime"
33	"github.com/grafana/loki/pkg/storage"
34	"github.com/grafana/loki/pkg/storage/chunk"
35	"github.com/grafana/loki/pkg/storage/stores/shipper"
36	errUtil "github.com/grafana/loki/pkg/util"
37	listutil "github.com/grafana/loki/pkg/util"
38	"github.com/grafana/loki/pkg/validation"
39)
40
41// ErrReadOnly is returned when the ingester is shutting down and a push was
42// attempted.
43var ErrReadOnly = errors.New("Ingester is shutting down")
44
45var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
46	Name: "cortex_ingester_flush_queue_length",
47	Help: "The total number of series pending in the flush queue.",
48})
49
50// Config for an ingester.
51type Config struct {
52	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
53
54	// Config for transferring chunks.
55	MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`
56
57	ConcurrentFlushes   int               `yaml:"concurrent_flushes"`
58	FlushCheckPeriod    time.Duration     `yaml:"flush_check_period"`
59	FlushOpTimeout      time.Duration     `yaml:"flush_op_timeout"`
60	RetainPeriod        time.Duration     `yaml:"chunk_retain_period"`
61	MaxChunkIdle        time.Duration     `yaml:"chunk_idle_period"`
62	BlockSize           int               `yaml:"chunk_block_size"`
63	TargetChunkSize     int               `yaml:"chunk_target_size"`
64	ChunkEncoding       string            `yaml:"chunk_encoding"`
65	parsedEncoding      chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding
66	MaxChunkAge         time.Duration     `yaml:"max_chunk_age"`
67	AutoForgetUnhealthy bool              `yaml:"autoforget_unhealthy"`
68
69	// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
70	SyncPeriod         time.Duration `yaml:"sync_period"`
71	SyncMinUtilization float64       `yaml:"sync_min_utilization"`
72
73	MaxReturnedErrors int `yaml:"max_returned_stream_errors"`
74
75	// For testing, you can override the address and ID of this ingester.
76	ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)
77
78	QueryStore                  bool          `yaml:"-"`
79	QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`
80
81	WAL WALConfig `yaml:"wal,omitempty"`
82
83	ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
84
85	IndexShards int `yaml:"index_shards"`
86}
87
88// RegisterFlags registers the flags.
89func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
90	cfg.LifecyclerConfig.RegisterFlags(f)
91	cfg.WAL.RegisterFlags(f)
92
93	f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.")
94	f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 16, "")
95	f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "")
96	f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "")
97	f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "")
98	f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "")
99	f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "")
100	f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "") //1.5 MB
101	f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
102	f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.")
103	f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
104	f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
105	f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
106	f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
107	f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
108	f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
109}
110
111func (cfg *Config) Validate() error {
112	enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
113	if err != nil {
114		return err
115	}
116	cfg.parsedEncoding = enc
117
118	if err = cfg.WAL.Validate(); err != nil {
119		return err
120	}
121
122	if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled {
123		return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers")
124	}
125
126	if cfg.IndexShards <= 0 {
127		return fmt.Errorf("Invalid ingester index shard factor: %d", cfg.IndexShards)
128	}
129
130	return nil
131}
132
133// Ingester builds chunks for incoming log streams.
134type Ingester struct {
135	services.Service
136
137	cfg           Config
138	clientConfig  client.Config
139	tenantConfigs *runtime.TenantConfigs
140
141	shutdownMtx  sync.Mutex // Allows processes to grab a lock and prevent a shutdown
142	instancesMtx sync.RWMutex
143	instances    map[string]*instance
144	readonly     bool
145
146	lifecycler        *ring.Lifecycler
147	lifecyclerWatcher *services.FailureWatcher
148
149	store           ChunkStore
150	periodicConfigs []chunk.PeriodConfig
151
152	loopDone    sync.WaitGroup
153	loopQuit    chan struct{}
154	tailersQuit chan struct{}
155
156	// One queue per flush thread.  Fingerprint is used to
157	// pick a queue.
158	flushQueues     []*util.PriorityQueue
159	flushQueuesDone sync.WaitGroup
160
161	limiter *Limiter
162
163	// Denotes whether the ingester should flush on shutdown.
164	// Currently only used by the WAL to signal when the disk is full.
165	flushOnShutdownSwitch *OnceSwitch
166
167	// Only used by WAL & flusher to coordinate backpressure during replay.
168	replayController *replayController
169
170	metrics *ingesterMetrics
171
172	wal WAL
173
174	chunkFilter storage.RequestChunkFilterer
175}
176
177// ChunkStore is the interface we need to store chunks.
178type ChunkStore interface {
179	Put(ctx context.Context, chunks []chunk.Chunk) error
180	SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
181	SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
182	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error)
183	GetSchemaConfigs() []chunk.PeriodConfig
184}
185
186// New makes a new Ingester.
187func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {
188	if cfg.ingesterClientFactory == nil {
189		cfg.ingesterClientFactory = client.New
190	}
191
192	metrics := newIngesterMetrics(registerer)
193
194	i := &Ingester{
195		cfg:                   cfg,
196		clientConfig:          clientConfig,
197		tenantConfigs:         configs,
198		instances:             map[string]*instance{},
199		store:                 store,
200		periodicConfigs:       store.GetSchemaConfigs(),
201		loopQuit:              make(chan struct{}),
202		flushQueues:           make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
203		tailersQuit:           make(chan struct{}),
204		metrics:               metrics,
205		flushOnShutdownSwitch: &OnceSwitch{},
206	}
207	i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})
208
209	if cfg.WAL.Enabled {
210		if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil {
211			return nil, err
212		}
213	}
214
215	wal, err := newWAL(cfg.WAL, registerer, metrics, newIngesterSeriesIter(i))
216	if err != nil {
217		return nil, err
218	}
219	i.wal = wal
220
221	i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WAL.Enabled || cfg.WAL.FlushOnShutdown, registerer)
222	if err != nil {
223		return nil, err
224	}
225
226	i.lifecyclerWatcher = services.NewFailureWatcher()
227	i.lifecyclerWatcher.WatchService(i.lifecycler)
228
229	// Now that the lifecycler has been created, we can create the limiter
230	// which depends on it.
231	i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
232
233	i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
234
235	i.setupAutoForget()
236
237	if i.cfg.ChunkFilterer != nil {
238		i.SetChunkFilterer(i.cfg.ChunkFilterer)
239	}
240
241	return i, nil
242}
243
244func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) {
245	i.chunkFilter = chunkFilter
246}
247
248// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
249// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
250func (i *Ingester) setupAutoForget() {
251	if !i.cfg.AutoForgetUnhealthy {
252		return
253	}
254
255	go func() {
256		ctx := context.Background()
257		err := i.Service.AwaitRunning(ctx)
258		if err != nil {
259			level.Error(util_log.Logger).Log("msg", fmt.Sprintf("autoforget received error %s, autoforget is disabled", err.Error()))
260			return
261		}
262
263		level.Info(util_log.Logger).Log("msg", fmt.Sprintf("autoforget is enabled and will remove unhealthy instances from the ring after %v with no heartbeat", i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout))
264
265		ticker := time.NewTicker(i.cfg.LifecyclerConfig.HeartbeatPeriod)
266		defer ticker.Stop()
267
268		var forgetList []string
269		for range ticker.C {
270			err := i.lifecycler.KVStore.CAS(ctx, ring.IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
271				forgetList = forgetList[:0]
272				if in == nil {
273					return nil, false, nil
274				}
275
276				ringDesc, ok := in.(*ring.Desc)
277				if !ok {
278					level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget saw a KV store value that was not `ring.Desc`, got `%T`", in))
279					return nil, false, nil
280				}
281
282				for id, ingester := range ringDesc.Ingesters {
283					if !ingester.IsHealthy(ring.Reporting, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout, time.Now()) {
284						if i.lifecycler.ID == id {
285							level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget has seen our ID `%s` as unhealthy in the ring, network may be partitioned, skip forgeting ingesters this round", id))
286							return nil, false, nil
287						}
288						forgetList = append(forgetList, id)
289					}
290				}
291
292				if len(forgetList) == len(ringDesc.Ingesters)-1 {
293					level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("autoforget have seen %d unhealthy ingesters out of %d, network may be partioned, skip forgeting ingesters this round", len(forgetList), len(ringDesc.Ingesters)))
294					forgetList = forgetList[:0]
295					return nil, false, nil
296				}
297
298				if len(forgetList) > 0 {
299					for _, id := range forgetList {
300						ringDesc.RemoveIngester(id)
301					}
302					return ringDesc, true, nil
303				}
304				return nil, false, nil
305			})
306			if err != nil {
307				level.Warn(util_log.Logger).Log("msg", err)
308				continue
309			}
310
311			for _, id := range forgetList {
312				level.Info(util_log.Logger).Log("msg", fmt.Sprintf("autoforget removed ingester %v from the ring because it was not healthy after %v", id, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout))
313			}
314			i.metrics.autoForgetUnhealthyIngestersTotal.Add(float64(len(forgetList)))
315		}
316	}()
317}
318
319func (i *Ingester) starting(ctx context.Context) error {
320	if i.cfg.WAL.Enabled {
321		start := time.Now()
322
323		// Ignore retain period during wal replay.
324		oldRetain := i.cfg.RetainPeriod
325		i.cfg.RetainPeriod = 0
326
327		// Disable the in process stream limit checks while replaying the WAL.
328		// It is re-enabled in the recover's Close() method.
329		i.limiter.DisableForWALReplay()
330
331		recoverer := newIngesterRecoverer(i)
332
333		i.metrics.walReplayActive.Set(1)
334
335		endReplay := func() func() {
336			var once sync.Once
337			return func() {
338				once.Do(func() {
339					level.Info(util_log.Logger).Log("msg", "closing recoverer")
340					recoverer.Close()
341
342					elapsed := time.Since(start)
343
344					i.metrics.walReplayActive.Set(0)
345					i.metrics.walReplayDuration.Set(elapsed.Seconds())
346					i.cfg.RetainPeriod = oldRetain
347					level.Info(util_log.Logger).Log("msg", "WAL recovery finished", "time", elapsed.String())
348				})
349			}
350		}()
351		defer endReplay()
352
353		level.Info(util_log.Logger).Log("msg", "recovering from checkpoint")
354		checkpointReader, checkpointCloser, err := newCheckpointReader(i.cfg.WAL.Dir)
355		if err != nil {
356			return err
357		}
358		defer checkpointCloser.Close()
359
360		checkpointRecoveryErr := RecoverCheckpoint(checkpointReader, recoverer)
361		if checkpointRecoveryErr != nil {
362			i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc()
363			level.Error(util_log.Logger).Log(
364				"msg",
365				`Recovered from checkpoint with errors. Some streams were likely not recovered due to WAL checkpoint file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.`,
366				"elapsed", time.Since(start).String(),
367			)
368		}
369		level.Info(util_log.Logger).Log(
370			"msg", "recovered WAL checkpoint recovery finished",
371			"elapsed", time.Since(start).String(),
372			"errors", checkpointRecoveryErr != nil,
373		)
374
375		level.Info(util_log.Logger).Log("msg", "recovering from WAL")
376		segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
377		if err != nil {
378			return err
379		}
380		defer segmentCloser.Close()
381
382		segmentRecoveryErr := RecoverWAL(segmentReader, recoverer)
383		if segmentRecoveryErr != nil {
384			i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc()
385			level.Error(util_log.Logger).Log(
386				"msg",
387				"Recovered from WAL segments with errors. Some streams and/or entries were likely not recovered due to WAL segment file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.",
388				"elapsed", time.Since(start).String(),
389			)
390		}
391		level.Info(util_log.Logger).Log(
392			"msg", "WAL segment recovery finished",
393			"elapsed", time.Since(start).String(),
394			"errors", segmentRecoveryErr != nil,
395		)
396
397		endReplay()
398
399		i.wal.Start()
400	}
401
402	i.InitFlushQueues()
403
404	// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
405	err := i.lifecycler.StartAsync(context.Background())
406	if err != nil {
407		return err
408	}
409
410	err = i.lifecycler.AwaitRunning(ctx)
411	if err != nil {
412		return err
413	}
414
415	// start our loop
416	i.loopDone.Add(1)
417	go i.loop()
418	return nil
419}
420
421func (i *Ingester) running(ctx context.Context) error {
422	var serviceError error
423	select {
424	// wait until service is asked to stop
425	case <-ctx.Done():
426	// stop
427	case err := <-i.lifecyclerWatcher.Chan():
428		serviceError = fmt.Errorf("lifecycler failed: %w", err)
429	}
430
431	// close tailers before stopping our loop
432	close(i.tailersQuit)
433	for _, instance := range i.getInstances() {
434		instance.closeTailers()
435	}
436
437	close(i.loopQuit)
438	i.loopDone.Wait()
439	return serviceError
440}
441
442// Called after running exits, when Ingester transitions to Stopping state.
443// At this point, loop no longer runs, but flushers are still running.
444func (i *Ingester) stopping(_ error) error {
445	i.stopIncomingRequests()
446	var errs errUtil.MultiError
447	errs.Add(i.wal.Stop())
448
449	if i.flushOnShutdownSwitch.Get() {
450		i.lifecycler.SetFlushOnShutdown(true)
451	}
452	errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
453
454	// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
455	// we better stop them.
456	for _, flushQueue := range i.flushQueues {
457		flushQueue.Close()
458	}
459	i.flushQueuesDone.Wait()
460
461	return errs.Err()
462}
463
464func (i *Ingester) loop() {
465	defer i.loopDone.Done()
466
467	flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
468	defer flushTicker.Stop()
469
470	for {
471		select {
472		case <-flushTicker.C:
473			i.sweepUsers(false, true)
474
475		case <-i.loopQuit:
476			return
477		}
478	}
479}
480
481// ShutdownHandler triggers the following set of operations in order:
482//     * Change the state of ring to stop accepting writes.
483//     * Flush all the chunks.
484func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
485	originalState := i.lifecycler.FlushOnShutdown()
486	// We want to flush the chunks if transfer fails irrespective of original flag.
487	i.lifecycler.SetFlushOnShutdown(true)
488	_ = services.StopAndAwaitTerminated(context.Background(), i)
489	i.lifecycler.SetFlushOnShutdown(originalState)
490	w.WriteHeader(http.StatusNoContent)
491}
492
493// Push implements logproto.Pusher.
494func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
495	instanceID, err := tenant.TenantID(ctx)
496	if err != nil {
497		return nil, err
498	} else if i.readonly {
499		return nil, ErrReadOnly
500	}
501
502	instance := i.getOrCreateInstance(instanceID)
503	err = instance.Push(ctx, req)
504	return &logproto.PushResponse{}, err
505}
506
507func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
508	inst, ok := i.getInstanceByID(instanceID)
509	if ok {
510		return inst
511	}
512
513	i.instancesMtx.Lock()
514	defer i.instancesMtx.Unlock()
515	inst, ok = i.instances[instanceID]
516	if !ok {
517		inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
518		i.instances[instanceID] = inst
519	}
520	return inst
521}
522
523// Query the ingests for log streams matching a set of matchers.
524func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
525	// initialize stats collection for ingester queries and set grpc trailer with stats.
526	ctx := stats.NewContext(queryServer.Context())
527	defer stats.SendAsTrailer(ctx, queryServer)
528
529	instanceID, err := tenant.TenantID(ctx)
530	if err != nil {
531		return err
532	}
533
534	instance := i.getOrCreateInstance(instanceID)
535	itrs, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req})
536	if err != nil {
537		return err
538	}
539
540	if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
541		storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{
542			Selector:  req.Selector,
543			Direction: req.Direction,
544			Start:     start,
545			End:       end,
546			Limit:     req.Limit,
547			Shards:    req.Shards,
548		}}
549		storeItr, err := i.store.SelectLogs(ctx, storeReq)
550		if err != nil {
551			return err
552		}
553
554		itrs = append(itrs, storeItr)
555	}
556
557	heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)
558
559	defer listutil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)
560
561	return sendBatches(ctx, heapItr, queryServer, req.Limit)
562}
563
564// QuerySample the ingesters for series from logs matching a set of matchers.
565func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error {
566	// initialize stats collection for ingester queries and set grpc trailer with stats.
567	ctx := stats.NewContext(queryServer.Context())
568	defer stats.SendAsTrailer(ctx, queryServer)
569
570	instanceID, err := tenant.TenantID(ctx)
571	if err != nil {
572		return err
573	}
574
575	instance := i.getOrCreateInstance(instanceID)
576	itrs, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req})
577	if err != nil {
578		return err
579	}
580
581	if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
582		storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{
583			Start:    start,
584			End:      end,
585			Selector: req.Selector,
586			Shards:   req.Shards,
587		}}
588		storeItr, err := i.store.SelectSamples(ctx, storeReq)
589		if err != nil {
590			return err
591		}
592
593		itrs = append(itrs, storeItr)
594	}
595
596	heapItr := iter.NewHeapSampleIterator(ctx, itrs)
597
598	defer listutil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)
599
600	return sendSampleBatches(ctx, heapItr, queryServer)
601}
602
603// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper.
604// max look back is limited to from time of boltdb-shipper config.
605// It considers previous periodic config's from time if that also has index type set to boltdb-shipper.
606func (i *Ingester) boltdbShipperMaxLookBack() time.Duration {
607	activePeriodicConfigIndex := storage.ActivePeriodConfig(i.periodicConfigs)
608	activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex]
609	if activePeriodicConfig.IndexType != shipper.BoltDBShipperType {
610		return 0
611	}
612
613	startTime := activePeriodicConfig.From
614	if activePeriodicConfigIndex != 0 && i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == shipper.BoltDBShipperType {
615		startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From
616	}
617
618	maxLookBack := time.Since(startTime.Time.Time())
619	return maxLookBack
620}
621
622// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
623func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
624	orgID, err := tenant.TenantID(ctx)
625	if err != nil {
626		return nil, err
627	}
628
629	boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
630	if boltdbShipperMaxLookBack == 0 {
631		return &logproto.GetChunkIDsResponse{}, nil
632	}
633
634	reqStart := req.Start
635	reqStart = adjustQueryStartTime(boltdbShipperMaxLookBack, reqStart, time.Now())
636
637	// parse the request
638	start, end := listutil.RoundToMilliseconds(reqStart, req.End)
639	matchers, err := logql.ParseMatchers(req.Matchers)
640	if err != nil {
641		return nil, err
642	}
643
644	// get chunk references
645	chunksGroups, _, err := i.store.GetChunkRefs(ctx, orgID, start, end, matchers...)
646	if err != nil {
647		return nil, err
648	}
649
650	// build the response
651	resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
652	for _, chunks := range chunksGroups {
653		for _, chk := range chunks {
654			resp.ChunkIDs = append(resp.ChunkIDs, chk.ExternalKey())
655		}
656	}
657
658	return &resp, nil
659}
660
661// Label returns the set of labels for the stream this ingester knows about.
662func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
663	userID, err := tenant.TenantID(ctx)
664	if err != nil {
665		return nil, err
666	}
667
668	instance := i.getOrCreateInstance(userID)
669	resp, err := instance.Label(ctx, req)
670	if err != nil {
671		return nil, err
672	}
673
674	// Only continue if the active index type is boltdb-shipper or QueryStore flag is true.
675	boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
676	if boltdbShipperMaxLookBack == 0 && !i.cfg.QueryStore {
677		return resp, nil
678	}
679
680	// Only continue if the store is a chunk.Store
681	var cs chunk.Store
682	var ok bool
683	if cs, ok = i.store.(chunk.Store); !ok {
684		return resp, nil
685	}
686
687	maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod
688	if boltdbShipperMaxLookBack != 0 {
689		maxLookBackPeriod = boltdbShipperMaxLookBack
690	}
691	// Adjust the start time based on QueryStoreMaxLookBackPeriod.
692	start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now())
693	if start.After(*req.End) {
694		// The request is older than we are allowed to query the store, just return what we have.
695		return resp, nil
696	}
697	from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
698	var storeValues []string
699	if req.Values {
700		storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
701		if err != nil {
702			return nil, err
703		}
704	} else {
705		storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs")
706		if err != nil {
707			return nil, err
708		}
709	}
710
711	return &logproto.LabelResponse{
712		Values: listutil.MergeStringLists(resp.Values, storeValues),
713	}, nil
714}
715
716// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
717func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
718	instanceID, err := tenant.TenantID(ctx)
719	if err != nil {
720		return nil, err
721	}
722
723	instance := i.getOrCreateInstance(instanceID)
724	return instance.Series(ctx, req)
725}
726
727// Check implements grpc_health_v1.HealthCheck.
728func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
729	return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
730}
731
732// Watch implements grpc_health_v1.HealthCheck.
733func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
734	return nil
735}
736
737// ReadinessHandler is used to indicate to k8s when the ingesters are ready for
738// the addition removal of another ingester. Returns 204 when the ingester is
739// ready, 500 otherwise.
740func (i *Ingester) CheckReady(ctx context.Context) error {
741	if s := i.State(); s != services.Running && s != services.Stopping {
742		return fmt.Errorf("ingester not ready: %v", s)
743	}
744	return i.lifecycler.CheckReady(ctx)
745}
746
747func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
748	i.instancesMtx.RLock()
749	defer i.instancesMtx.RUnlock()
750
751	inst, ok := i.instances[id]
752	return inst, ok
753}
754
755func (i *Ingester) getInstances() []*instance {
756	i.instancesMtx.RLock()
757	defer i.instancesMtx.RUnlock()
758
759	instances := make([]*instance, 0, len(i.instances))
760	for _, instance := range i.instances {
761		instances = append(instances, instance)
762	}
763	return instances
764}
765
766// Tail logs matching given query
767func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
768	select {
769	case <-i.tailersQuit:
770		return errors.New("Ingester is stopping")
771	default:
772	}
773
774	instanceID, err := tenant.TenantID(queryServer.Context())
775	if err != nil {
776		return err
777	}
778
779	instance := i.getOrCreateInstance(instanceID)
780	tailer, err := newTailer(instanceID, req.Query, queryServer)
781	if err != nil {
782		return err
783	}
784
785	if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil {
786		return err
787	}
788	tailer.loop()
789	return nil
790}
791
792// TailersCount returns count of active tail requests from a user
793func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
794	instanceID, err := tenant.TenantID(ctx)
795	if err != nil {
796		return nil, err
797	}
798
799	resp := logproto.TailersCountResponse{}
800
801	instance, ok := i.getInstanceByID(instanceID)
802	if ok {
803		resp.Count = instance.openTailersCount()
804	}
805
806	return &resp, nil
807}
808
809// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration.
810// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure
811// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query.
812func buildStoreRequest(cfg Config, start, end, now time.Time) (time.Time, time.Time, bool) {
813	if !cfg.QueryStore {
814		return time.Time{}, time.Time{}, false
815	}
816	start = adjustQueryStartTime(cfg.QueryStoreMaxLookBackPeriod, start, now)
817
818	if start.After(end) {
819		return time.Time{}, time.Time{}, false
820	}
821	return start, end, true
822}
823
824func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time) time.Time {
825	if maxLookBackPeriod > 0 {
826		oldestStartTime := now.Add(-maxLookBackPeriod)
827		if oldestStartTime.After(start) {
828			return oldestStartTime
829		}
830	}
831	return start
832}
833