1package loki
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"net/http"
8	"net/http/httputil"
9	"net/url"
10	"os"
11	"time"
12
13	"github.com/NYTimes/gziphandler"
14	"github.com/cortexproject/cortex/pkg/cortex"
15	"github.com/cortexproject/cortex/pkg/frontend"
16	"github.com/cortexproject/cortex/pkg/frontend/transport"
17	"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
18	"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
19	"github.com/cortexproject/cortex/pkg/ring"
20	cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
21	"github.com/cortexproject/cortex/pkg/scheduler"
22	"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
23	util_log "github.com/cortexproject/cortex/pkg/util/log"
24	"github.com/go-kit/kit/log/level"
25	"github.com/grafana/dskit/kv/codec"
26	"github.com/grafana/dskit/kv/memberlist"
27	"github.com/grafana/dskit/runtimeconfig"
28	"github.com/grafana/dskit/services"
29	"github.com/prometheus/client_golang/prometheus"
30	"github.com/prometheus/common/version"
31	"github.com/thanos-io/thanos/pkg/discovery/dns"
32	"github.com/weaveworks/common/middleware"
33	"github.com/weaveworks/common/server"
34	"github.com/weaveworks/common/user"
35
36	"github.com/grafana/loki/pkg/distributor"
37	"github.com/grafana/loki/pkg/ingester"
38	"github.com/grafana/loki/pkg/logproto"
39	"github.com/grafana/loki/pkg/logql"
40	"github.com/grafana/loki/pkg/querier"
41	"github.com/grafana/loki/pkg/querier/queryrange"
42	"github.com/grafana/loki/pkg/ruler"
43	"github.com/grafana/loki/pkg/runtime"
44	loki_storage "github.com/grafana/loki/pkg/storage"
45	"github.com/grafana/loki/pkg/storage/chunk"
46	"github.com/grafana/loki/pkg/storage/chunk/cache"
47	"github.com/grafana/loki/pkg/storage/chunk/storage"
48	chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
49	chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
50	"github.com/grafana/loki/pkg/storage/stores/shipper"
51	"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
52	"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
53	"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
54	"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
55	serverutil "github.com/grafana/loki/pkg/util/server"
56	"github.com/grafana/loki/pkg/validation"
57)
58
59const maxChunkAgeForTableManager = 12 * time.Hour
60
61// The various modules that make up Loki.
62const (
63	Ring                     string = "ring"
64	RuntimeConfig            string = "runtime-config"
65	Overrides                string = "overrides"
66	TenantConfigs            string = "tenant-configs"
67	Server                   string = "server"
68	Distributor              string = "distributor"
69	Ingester                 string = "ingester"
70	Querier                  string = "querier"
71	IngesterQuerier          string = "ingester-querier"
72	QueryFrontend            string = "query-frontend"
73	QueryFrontendTripperware string = "query-frontend-tripperware"
74	RulerStorage             string = "ruler-storage"
75	Ruler                    string = "ruler"
76	Store                    string = "store"
77	TableManager             string = "table-manager"
78	MemberlistKV             string = "memberlist-kv"
79	Compactor                string = "compactor"
80	IndexGateway             string = "index-gateway"
81	QueryScheduler           string = "query-scheduler"
82	All                      string = "all"
83)
84
85func (t *Loki) initServer() (services.Service, error) {
86	prometheus.MustRegister(version.NewCollector("loki"))
87
88	// Loki handles signals on its own.
89	cortex.DisableSignalHandling(&t.Cfg.Server)
90	serv, err := server.New(t.Cfg.Server)
91	if err != nil {
92		return nil, err
93	}
94
95	t.Server = serv
96
97	servicesToWaitFor := func() []services.Service {
98		svs := []services.Service(nil)
99		for m, s := range t.serviceMap {
100			// Server should not wait for itself.
101			if m != Server {
102				svs = append(svs, s)
103			}
104		}
105		return svs
106	}
107
108	s := cortex.NewServerService(t.Server, servicesToWaitFor)
109
110	// Best effort to propagate the org ID from the start.
111	t.Server.HTTPServer.Handler = func(next http.Handler) http.Handler {
112		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
113			if !t.Cfg.AuthEnabled {
114				next.ServeHTTP(w, r.WithContext(user.InjectOrgID(r.Context(), "fake")))
115				return
116			}
117			_, ctx, _ := user.ExtractOrgIDFromHTTPRequest(r)
118			next.ServeHTTP(w, r.WithContext(ctx))
119		})
120	}(t.Server.HTTPServer.Handler)
121
122	return s, nil
123}
124
125func (t *Loki) initRing() (_ services.Service, err error) {
126	t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
127	t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
128	t.ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer)
129	if err != nil {
130		return
131	}
132	prometheus.MustRegister(t.ring)
133	t.Server.HTTP.Handle("/ring", t.ring)
134	return t.ring, nil
135}
136
137func (t *Loki) initRuntimeConfig() (services.Service, error) {
138	if t.Cfg.RuntimeConfig.LoadPath == "" {
139		t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig
140		t.Cfg.RuntimeConfig.ReloadPeriod = time.Duration(t.Cfg.LimitsConfig.PerTenantOverridePeriod)
141	}
142
143	if t.Cfg.RuntimeConfig.LoadPath == "" {
144		// no need to initialize module if load path is empty
145		return nil, nil
146	}
147
148	t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
149
150	// make sure to set default limits before we start loading configuration into memory
151	validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
152
153	var err error
154	t.runtimeConfig, err = runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), util_log.Logger)
155	return t.runtimeConfig, err
156}
157
158func (t *Loki) initOverrides() (_ services.Service, err error) {
159	t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, newtenantLimitsFromRuntimeConfig(t.runtimeConfig))
160	// overrides are not a service, since they don't have any operational state.
161	return nil, err
162}
163
164func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
165	t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig))
166	// tenantConfigs are not a service, since they don't have any operational state.
167	return nil, err
168}
169
170func (t *Loki) initDistributor() (services.Service, error) {
171	t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
172	t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
173	var err error
174	t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer)
175	if err != nil {
176		return nil, err
177	}
178
179	// Register the distributor to receive Push requests over GRPC
180	// EXCEPT when running with `-target=all` or `-target=` contains `ingester`
181	if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Ingester) {
182		logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
183	}
184
185	pushHandler := middleware.Merge(
186		serverutil.RecoveryHTTPMiddleware,
187		t.HTTPAuthMiddleware,
188	).Wrap(http.HandlerFunc(t.distributor.PushHandler))
189
190	t.Server.HTTP.Handle("/api/prom/push", pushHandler)
191	t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler)
192	return t.distributor, nil
193}
194
195func (t *Loki) initQuerier() (services.Service, error) {
196	if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
197		t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
198	}
199
200	var err error
201	t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
202	if err != nil {
203		return nil, err
204	}
205
206	querierWorkerServiceConfig := querier.WorkerServiceConfig{
207		AllEnabled:            t.Cfg.isModuleEnabled(All),
208		GrpcListenPort:        t.Cfg.Server.GRPCListenPort,
209		QuerierMaxConcurrent:  t.Cfg.Querier.MaxConcurrent,
210		QuerierWorkerConfig:   &t.Cfg.Worker,
211		QueryFrontendEnabled:  t.Cfg.isModuleEnabled(QueryFrontend),
212		QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
213	}
214
215	var queryHandlers = map[string]http.Handler{
216		"/loki/api/v1/query_range":         http.HandlerFunc(t.Querier.RangeQueryHandler),
217		"/loki/api/v1/query":               http.HandlerFunc(t.Querier.InstantQueryHandler),
218		"/loki/api/v1/label":               http.HandlerFunc(t.Querier.LabelHandler),
219		"/loki/api/v1/labels":              http.HandlerFunc(t.Querier.LabelHandler),
220		"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
221		"/loki/api/v1/tail":                http.HandlerFunc(t.Querier.TailHandler),
222		"/loki/api/v1/series":              http.HandlerFunc(t.Querier.SeriesHandler),
223
224		"/api/prom/query":               http.HandlerFunc(t.Querier.LogQueryHandler),
225		"/api/prom/label":               http.HandlerFunc(t.Querier.LabelHandler),
226		"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
227		"/api/prom/tail":                http.HandlerFunc(t.Querier.TailHandler),
228		"/api/prom/series":              http.HandlerFunc(t.Querier.SeriesHandler),
229	}
230	return querier.InitWorkerService(
231		querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
232	)
233}
234
235func (t *Loki) initIngester() (_ services.Service, err error) {
236	t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
237	t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
238	t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
239
240	t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
241	if err != nil {
242		return
243	}
244	logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester)
245	logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester)
246	logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester)
247
248	httpMiddleware := middleware.Merge(
249		serverutil.RecoveryHTTPMiddleware,
250	)
251	t.Server.HTTP.Path("/flush").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
252	t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))
253
254	return t.Ingester, nil
255}
256
257func (t *Loki) initTableManager() (services.Service, error) {
258	err := t.Cfg.SchemaConfig.Load()
259	if err != nil {
260		return nil, err
261	}
262
263	// Assume the newest config is the one to use
264	lastConfig := &t.Cfg.SchemaConfig.Configs[len(t.Cfg.SchemaConfig.Configs)-1]
265
266	if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled ||
267		t.Cfg.TableManager.IndexTables.WriteScale.Enabled ||
268		t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
269		t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
270		t.Cfg.TableManager.ChunkTables.ReadScale.Enabled ||
271		t.Cfg.TableManager.IndexTables.ReadScale.Enabled ||
272		t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
273		t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
274		t.Cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" {
275		level.Error(util_log.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided")
276		os.Exit(1)
277	}
278
279	reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)
280
281	tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
282	if err != nil {
283		return nil, err
284	}
285
286	bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config)
287	util_log.CheckFatal("initializing bucket client", err)
288
289	t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
290	if err != nil {
291		return nil, err
292	}
293
294	return t.tableManager, nil
295}
296
297func (t *Loki) initStore() (_ services.Service, err error) {
298	// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
299	// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
300	if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
301		t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true
302		t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
303	}
304
305	if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
306		t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
307		switch true {
308		case t.Cfg.isModuleEnabled(Ingester):
309			// We do not want ingester to unnecessarily keep downloading files
310			t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
311			// Use fifo cache for caching index in memory.
312			t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
313				EnableFifoCache: true,
314				Fifocache: cache.FifoCacheConfig{
315					MaxSizeBytes: "200 MB",
316					// We snapshot the index in ingesters every minute for reads so reduce the index cache validity by a minute.
317					// This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries,
318					// I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for
319					// unmarshalling the cached data to check the expiry.
320					Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
321				},
322			}
323			t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
324		case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
325			// We do not want query to do any updates to index
326			t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
327		default:
328			t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
329			t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
330		}
331	}
332
333	chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
334	if err != nil {
335		return
336	}
337
338	if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
339		boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
340		switch true {
341		case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
342			// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
343			if t.Cfg.Querier.QueryStoreOnly {
344				break
345			}
346			// Use AsyncStore to query both ingesters local store and chunk store for store queries.
347			// Only queriers should use the AsyncStore, it should never be used in ingesters.
348			chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
349				calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
350			)
351		case t.Cfg.isModuleEnabled(All):
352			// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
353			// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
354			// ToDo: See if we can avoid doing this when not running loki in clustered mode.
355			t.Cfg.Ingester.QueryStore = true
356			boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
357			if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType {
358				boltdbShipperConfigIdx++
359			}
360			mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
361				boltdbShipperMinIngesterQueryStoreDuration)
362			if err != nil {
363				return nil, err
364			}
365			t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
366		}
367	}
368
369	t.Store, err = loki_storage.NewStore(t.Cfg.StorageConfig, t.Cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
370	if err != nil {
371		return
372	}
373
374	return services.NewIdleService(nil, func(_ error) error {
375		t.Store.Stop()
376		return nil
377	}), nil
378}
379
380func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
381	t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay)
382	if err != nil {
383		return nil, err
384	}
385
386	return services.NewIdleService(nil, nil), nil
387}
388
389// Placeholder limits type to pass to cortex frontend
390type disabledShuffleShardingLimits struct{}
391
392func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { return 0 }
393
394func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
395	level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")
396
397	tripperware, stopper, err := queryrange.NewTripperware(
398		t.Cfg.QueryRange,
399		util_log.Logger,
400		t.overrides,
401		t.Cfg.SchemaConfig.SchemaConfig,
402		t.Cfg.Querier.QueryIngestersWithin,
403		prometheus.DefaultRegisterer,
404	)
405	if err != nil {
406		return
407	}
408	t.stopper = stopper
409	t.QueryFrontEndTripperware = tripperware
410
411	return services.NewIdleService(nil, nil), nil
412}
413
414func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
415	level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))
416
417	roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
418		Handler:       t.Cfg.Frontend.Handler,
419		FrontendV1:    t.Cfg.Frontend.FrontendV1,
420		FrontendV2:    t.Cfg.Frontend.FrontendV2,
421		DownstreamURL: t.Cfg.Frontend.DownstreamURL,
422	}, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
423	if err != nil {
424		return nil, err
425	}
426
427	if frontendV1 != nil {
428		frontendv1pb.RegisterFrontendServer(t.Server.GRPC, frontendV1)
429		t.frontend = frontendV1
430		level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v1")
431	} else if frontendV2 != nil {
432		frontendv2pb.RegisterFrontendForQuerierServer(t.Server.GRPC, frontendV2)
433		t.frontend = frontendV2
434		level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v2")
435	} else {
436		level.Debug(util_log.Logger).Log("msg", "no query frontend configured")
437	}
438
439	roundTripper = t.QueryFrontEndTripperware(roundTripper)
440
441	frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
442	if t.Cfg.Frontend.CompressResponses {
443		frontendHandler = gziphandler.GzipHandler(frontendHandler)
444	}
445
446	frontendHandler = middleware.Merge(
447		serverutil.RecoveryHTTPMiddleware,
448		t.HTTPAuthMiddleware,
449		queryrange.StatsHTTPMiddleware,
450		serverutil.NewPrepopulateMiddleware(),
451		serverutil.ResponseJSONMiddleware(),
452	).Wrap(frontendHandler)
453
454	var defaultHandler http.Handler
455	if t.Cfg.Frontend.TailProxyURL != "" {
456		httpMiddleware := middleware.Merge(
457			t.HTTPAuthMiddleware,
458			queryrange.StatsHTTPMiddleware,
459		)
460		tailURL, err := url.Parse(t.Cfg.Frontend.TailProxyURL)
461		if err != nil {
462			return nil, err
463		}
464		tp := httputil.NewSingleHostReverseProxy(tailURL)
465
466		director := tp.Director
467		tp.Director = func(req *http.Request) {
468			director(req)
469			req.Host = tailURL.Host
470		}
471
472		defaultHandler = httpMiddleware.Wrap(tp)
473	} else {
474		defaultHandler = frontendHandler
475	}
476	t.Server.HTTP.Handle("/loki/api/v1/query_range", frontendHandler)
477	t.Server.HTTP.Handle("/loki/api/v1/query", frontendHandler)
478	t.Server.HTTP.Handle("/loki/api/v1/label", frontendHandler)
479	t.Server.HTTP.Handle("/loki/api/v1/labels", frontendHandler)
480	t.Server.HTTP.Handle("/loki/api/v1/label/{name}/values", frontendHandler)
481	t.Server.HTTP.Handle("/loki/api/v1/series", frontendHandler)
482	t.Server.HTTP.Handle("/api/prom/query", frontendHandler)
483	t.Server.HTTP.Handle("/api/prom/label", frontendHandler)
484	t.Server.HTTP.Handle("/api/prom/label/{name}/values", frontendHandler)
485	t.Server.HTTP.Handle("/api/prom/series", frontendHandler)
486
487	// defer tail endpoints to the default handler
488	t.Server.HTTP.Handle("/loki/api/v1/tail", defaultHandler)
489	t.Server.HTTP.Handle("/api/prom/tail", defaultHandler)
490
491	if t.frontend == nil {
492		return services.NewIdleService(nil, func(_ error) error {
493			if t.stopper != nil {
494				t.stopper.Stop()
495				t.stopper = nil
496			}
497			return nil
498		}), nil
499	}
500
501	return services.NewIdleService(func(ctx context.Context) error {
502		return services.StartAndAwaitRunning(ctx, t.frontend)
503	}, func(_ error) error {
504		// Log but not return in case of error, so that other following dependencies
505		// are stopped too.
506		if err := services.StopAndAwaitTerminated(context.Background(), t.frontend); err != nil {
507			level.Warn(util_log.Logger).Log("msg", "failed to stop frontend service", "err", err)
508		}
509
510		if t.stopper != nil {
511			t.stopper.Stop()
512		}
513		return nil
514	}), nil
515}
516
517func (t *Loki) initRulerStorage() (_ services.Service, err error) {
518	// if the ruler is not configured and we're in single binary then let's just log an error and continue.
519	// unfortunately there is no way to generate a "default" config and compare default against actual
520	// to determine if it's unconfigured.  the following check, however, correctly tests this.
521	// Single binary integration tests will break if this ever drifts
522	if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
523		level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
524		return
525	}
526
527	// Loki doesn't support the configdb backend, but without excessive mangling/refactoring
528	// it's hard to enforce this at validation time. Therefore detect this and fail early.
529	if t.Cfg.Ruler.StoreConfig.Type == "configdb" {
530		return nil, errors.New("configdb is not supported as a Loki rules backend type")
531	}
532
533	// Make sure storage directory exists if using filesystem store
534	if t.Cfg.Ruler.StoreConfig.Type == "local" && t.Cfg.Ruler.StoreConfig.Local.Directory != "" {
535		err := chunk_util.EnsureDirectory(t.Cfg.Ruler.StoreConfig.Local.Directory)
536		if err != nil {
537			return nil, err
538		}
539	}
540
541	t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, ruler.GroupLoader{}, util_log.Logger)
542
543	return
544}
545
546func (t *Loki) initRuler() (_ services.Service, err error) {
547	if t.RulerStorage == nil {
548		level.Info(util_log.Logger).Log("msg", "RulerStorage is nil.  Not starting the ruler.")
549		return nil, nil
550	}
551
552	t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
553	t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
554	q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
555	if err != nil {
556		return nil, err
557	}
558
559	engine := logql.NewEngine(t.Cfg.Querier.Engine, q, t.overrides)
560
561	t.ruler, err = ruler.NewRuler(
562		t.Cfg.Ruler,
563		engine,
564		prometheus.DefaultRegisterer,
565		util_log.Logger,
566		t.RulerStorage,
567		t.overrides,
568	)
569
570	if err != nil {
571		return
572	}
573
574	t.rulerAPI = cortex_ruler.NewAPI(t.ruler, t.RulerStorage, util_log.Logger)
575
576	// Expose HTTP endpoints.
577	if t.Cfg.Ruler.EnableAPI {
578
579		t.Server.HTTP.Handle("/ruler/ring", t.ruler)
580		cortex_ruler.RegisterRulerServer(t.Server.GRPC, t.ruler)
581
582		// Prometheus Rule API Routes
583		t.Server.HTTP.Path("/prometheus/api/v1/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.PrometheusRules)))
584		t.Server.HTTP.Path("/prometheus/api/v1/alerts").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.PrometheusAlerts)))
585
586		// Ruler Legacy API Routes
587		t.Server.HTTP.Path("/api/prom/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules)))
588		t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules)))
589		t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.CreateRuleGroup)))
590		t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteNamespace)))
591		t.Server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.GetRuleGroup)))
592		t.Server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup)))
593
594		// Ruler API Routes
595		t.Server.HTTP.Path("/loki/api/v1/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules)))
596		t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules)))
597		t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.CreateRuleGroup)))
598		t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteNamespace)))
599		t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.GetRuleGroup)))
600		t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup)))
601	}
602
603	return t.ruler, nil
604}
605
606func (t *Loki) initMemberlistKV() (services.Service, error) {
607	reg := prometheus.DefaultRegisterer
608	t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
609	t.Cfg.MemberlistKV.Codecs = []codec.Codec{
610		ring.GetCodec(),
611	}
612
613	dnsProviderReg := prometheus.WrapRegistererWithPrefix(
614		"cortex_",
615		prometheus.WrapRegistererWith(
616			prometheus.Labels{"name": "memberlist"},
617			reg,
618		),
619	)
620	dnsProvider := dns.NewProvider(util_log.Logger, dnsProviderReg, dns.GolangResolverType)
621
622	t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV, util_log.Logger, dnsProvider, reg)
623	return t.MemberlistKV, nil
624}
625
626func (t *Loki) initCompactor() (services.Service, error) {
627	err := t.Cfg.SchemaConfig.Load()
628	if err != nil {
629		return nil, err
630	}
631	t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
632	if err != nil {
633		return nil, err
634	}
635
636	if t.Cfg.CompactorConfig.RetentionEnabled {
637		t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
638		t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
639		t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
640	}
641
642	return t.compactor, nil
643}
644
645func (t *Loki) initIndexGateway() (services.Service, error) {
646	t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
647	objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config)
648	if err != nil {
649		return nil, err
650	}
651
652	shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, prometheus.DefaultRegisterer)
653	if err != nil {
654		return nil, err
655	}
656
657	gateway := indexgateway.NewIndexGateway(shipperIndexClient.(*shipper.Shipper))
658	indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway)
659	return gateway, nil
660}
661
662func (t *Loki) initQueryScheduler() (services.Service, error) {
663	s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer)
664	if err != nil {
665		return nil, err
666	}
667
668	schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
669	schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
670	return s, nil
671}
672
673func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
674	if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
675		return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
676	}
677
678	if maxLookBackConfig == 0 {
679		// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the minDuration.
680		return minDuration, nil
681	} else if maxLookBackConfig > 0 && maxLookBackConfig < minDuration {
682		// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than minDuration or throw an error
683		return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+
684			"which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer.  Increase this value"+
685			"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, minDuration)
686	}
687	return maxLookBackConfig, nil
688}
689
690func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDuration time.Duration) time.Duration {
691	// 0 means do not limit queries, we would also not limit ingester queries from AsyncStore.
692	if queryIngestersWithinConfig == 0 {
693		return 0
694	}
695
696	if queryIngestersWithinConfig < minDuration {
697		return minDuration
698	}
699	return queryIngestersWithinConfig
700}
701
702// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
703// It also considers index cache validity because a querier could have cached index just before it was going to resync which means
704// it would keep serving index until the cache entries expire.
705func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration {
706	return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval
707}
708
709// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
710func boltdbShipperIngesterIndexUploadDelay() time.Duration {
711	return uploads.ShardDBsByDuration + shipper.UploadInterval
712}
713
714// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
715// avoid missing any logs or chunk ids due to async nature of BoltDB Shipper.
716func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration {
717	return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 2*time.Minute
718}
719