1package loki 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "net/http" 8 "net/http/httputil" 9 "net/url" 10 "os" 11 "time" 12 13 "github.com/NYTimes/gziphandler" 14 "github.com/cortexproject/cortex/pkg/cortex" 15 "github.com/cortexproject/cortex/pkg/frontend" 16 "github.com/cortexproject/cortex/pkg/frontend/transport" 17 "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" 18 "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" 19 "github.com/cortexproject/cortex/pkg/ring" 20 cortex_ruler "github.com/cortexproject/cortex/pkg/ruler" 21 "github.com/cortexproject/cortex/pkg/scheduler" 22 "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" 23 util_log "github.com/cortexproject/cortex/pkg/util/log" 24 "github.com/go-kit/kit/log/level" 25 "github.com/grafana/dskit/kv/codec" 26 "github.com/grafana/dskit/kv/memberlist" 27 "github.com/grafana/dskit/runtimeconfig" 28 "github.com/grafana/dskit/services" 29 "github.com/prometheus/client_golang/prometheus" 30 "github.com/prometheus/common/version" 31 "github.com/thanos-io/thanos/pkg/discovery/dns" 32 "github.com/weaveworks/common/middleware" 33 "github.com/weaveworks/common/server" 34 "github.com/weaveworks/common/user" 35 36 "github.com/grafana/loki/pkg/distributor" 37 "github.com/grafana/loki/pkg/ingester" 38 "github.com/grafana/loki/pkg/logproto" 39 "github.com/grafana/loki/pkg/logql" 40 "github.com/grafana/loki/pkg/querier" 41 "github.com/grafana/loki/pkg/querier/queryrange" 42 "github.com/grafana/loki/pkg/ruler" 43 "github.com/grafana/loki/pkg/runtime" 44 loki_storage "github.com/grafana/loki/pkg/storage" 45 "github.com/grafana/loki/pkg/storage/chunk" 46 "github.com/grafana/loki/pkg/storage/chunk/cache" 47 "github.com/grafana/loki/pkg/storage/chunk/storage" 48 chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage" 49 chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" 50 "github.com/grafana/loki/pkg/storage/stores/shipper" 51 "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" 52 "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" 53 "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" 54 "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" 55 serverutil "github.com/grafana/loki/pkg/util/server" 56 "github.com/grafana/loki/pkg/validation" 57) 58 59const maxChunkAgeForTableManager = 12 * time.Hour 60 61// The various modules that make up Loki. 62const ( 63 Ring string = "ring" 64 RuntimeConfig string = "runtime-config" 65 Overrides string = "overrides" 66 TenantConfigs string = "tenant-configs" 67 Server string = "server" 68 Distributor string = "distributor" 69 Ingester string = "ingester" 70 Querier string = "querier" 71 IngesterQuerier string = "ingester-querier" 72 QueryFrontend string = "query-frontend" 73 QueryFrontendTripperware string = "query-frontend-tripperware" 74 RulerStorage string = "ruler-storage" 75 Ruler string = "ruler" 76 Store string = "store" 77 TableManager string = "table-manager" 78 MemberlistKV string = "memberlist-kv" 79 Compactor string = "compactor" 80 IndexGateway string = "index-gateway" 81 QueryScheduler string = "query-scheduler" 82 All string = "all" 83) 84 85func (t *Loki) initServer() (services.Service, error) { 86 prometheus.MustRegister(version.NewCollector("loki")) 87 88 // Loki handles signals on its own. 89 cortex.DisableSignalHandling(&t.Cfg.Server) 90 serv, err := server.New(t.Cfg.Server) 91 if err != nil { 92 return nil, err 93 } 94 95 t.Server = serv 96 97 servicesToWaitFor := func() []services.Service { 98 svs := []services.Service(nil) 99 for m, s := range t.serviceMap { 100 // Server should not wait for itself. 101 if m != Server { 102 svs = append(svs, s) 103 } 104 } 105 return svs 106 } 107 108 s := cortex.NewServerService(t.Server, servicesToWaitFor) 109 110 // Best effort to propagate the org ID from the start. 111 t.Server.HTTPServer.Handler = func(next http.Handler) http.Handler { 112 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 113 if !t.Cfg.AuthEnabled { 114 next.ServeHTTP(w, r.WithContext(user.InjectOrgID(r.Context(), "fake"))) 115 return 116 } 117 _, ctx, _ := user.ExtractOrgIDFromHTTPRequest(r) 118 next.ServeHTTP(w, r.WithContext(ctx)) 119 }) 120 }(t.Server.HTTPServer.Handler) 121 122 return s, nil 123} 124 125func (t *Loki) initRing() (_ services.Service, err error) { 126 t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) 127 t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV 128 t.ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer) 129 if err != nil { 130 return 131 } 132 prometheus.MustRegister(t.ring) 133 t.Server.HTTP.Handle("/ring", t.ring) 134 return t.ring, nil 135} 136 137func (t *Loki) initRuntimeConfig() (services.Service, error) { 138 if t.Cfg.RuntimeConfig.LoadPath == "" { 139 t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig 140 t.Cfg.RuntimeConfig.ReloadPeriod = time.Duration(t.Cfg.LimitsConfig.PerTenantOverridePeriod) 141 } 142 143 if t.Cfg.RuntimeConfig.LoadPath == "" { 144 // no need to initialize module if load path is empty 145 return nil, nil 146 } 147 148 t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig 149 150 // make sure to set default limits before we start loading configuration into memory 151 validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) 152 153 var err error 154 t.runtimeConfig, err = runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), util_log.Logger) 155 return t.runtimeConfig, err 156} 157 158func (t *Loki) initOverrides() (_ services.Service, err error) { 159 t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, newtenantLimitsFromRuntimeConfig(t.runtimeConfig)) 160 // overrides are not a service, since they don't have any operational state. 161 return nil, err 162} 163 164func (t *Loki) initTenantConfigs() (_ services.Service, err error) { 165 t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig)) 166 // tenantConfigs are not a service, since they don't have any operational state. 167 return nil, err 168} 169 170func (t *Loki) initDistributor() (services.Service, error) { 171 t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) 172 t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV 173 var err error 174 t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) 175 if err != nil { 176 return nil, err 177 } 178 179 // Register the distributor to receive Push requests over GRPC 180 // EXCEPT when running with `-target=all` or `-target=` contains `ingester` 181 if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Ingester) { 182 logproto.RegisterPusherServer(t.Server.GRPC, t.distributor) 183 } 184 185 pushHandler := middleware.Merge( 186 serverutil.RecoveryHTTPMiddleware, 187 t.HTTPAuthMiddleware, 188 ).Wrap(http.HandlerFunc(t.distributor.PushHandler)) 189 190 t.Server.HTTP.Handle("/api/prom/push", pushHandler) 191 t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler) 192 return t.distributor, nil 193} 194 195func (t *Loki) initQuerier() (services.Service, error) { 196 if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { 197 t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod 198 } 199 200 var err error 201 t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) 202 if err != nil { 203 return nil, err 204 } 205 206 querierWorkerServiceConfig := querier.WorkerServiceConfig{ 207 AllEnabled: t.Cfg.isModuleEnabled(All), 208 GrpcListenPort: t.Cfg.Server.GRPCListenPort, 209 QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent, 210 QuerierWorkerConfig: &t.Cfg.Worker, 211 QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), 212 QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), 213 } 214 215 var queryHandlers = map[string]http.Handler{ 216 "/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler), 217 "/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler), 218 "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), 219 "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), 220 "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), 221 "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), 222 "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), 223 224 "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), 225 "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), 226 "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), 227 "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), 228 "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), 229 } 230 return querier.InitWorkerService( 231 querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, 232 ) 233} 234 235func (t *Loki) initIngester() (_ services.Service, err error) { 236 t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) 237 t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV 238 t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort 239 240 t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) 241 if err != nil { 242 return 243 } 244 logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester) 245 logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester) 246 logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester) 247 248 httpMiddleware := middleware.Merge( 249 serverutil.RecoveryHTTPMiddleware, 250 ) 251 t.Server.HTTP.Path("/flush").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler))) 252 t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler))) 253 254 return t.Ingester, nil 255} 256 257func (t *Loki) initTableManager() (services.Service, error) { 258 err := t.Cfg.SchemaConfig.Load() 259 if err != nil { 260 return nil, err 261 } 262 263 // Assume the newest config is the one to use 264 lastConfig := &t.Cfg.SchemaConfig.Configs[len(t.Cfg.SchemaConfig.Configs)-1] 265 266 if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled || 267 t.Cfg.TableManager.IndexTables.WriteScale.Enabled || 268 t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || 269 t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || 270 t.Cfg.TableManager.ChunkTables.ReadScale.Enabled || 271 t.Cfg.TableManager.IndexTables.ReadScale.Enabled || 272 t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || 273 t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && 274 t.Cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" { 275 level.Error(util_log.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided") 276 os.Exit(1) 277 } 278 279 reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) 280 281 tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg) 282 if err != nil { 283 return nil, err 284 } 285 286 bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config) 287 util_log.CheckFatal("initializing bucket client", err) 288 289 t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer) 290 if err != nil { 291 return nil, err 292 } 293 294 return t.tableManager, nil 295} 296 297func (t *Loki) initStore() (_ services.Service, err error) { 298 // If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache. 299 // This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data. 300 if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { 301 t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true 302 t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{} 303 } 304 305 if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { 306 t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID 307 switch true { 308 case t.Cfg.isModuleEnabled(Ingester): 309 // We do not want ingester to unnecessarily keep downloading files 310 t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly 311 // Use fifo cache for caching index in memory. 312 t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{ 313 EnableFifoCache: true, 314 Fifocache: cache.FifoCacheConfig{ 315 MaxSizeBytes: "200 MB", 316 // We snapshot the index in ingesters every minute for reads so reduce the index cache validity by a minute. 317 // This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries, 318 // I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for 319 // unmarshalling the cached data to check the expiry. 320 Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute, 321 }, 322 } 323 t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute 324 case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler): 325 // We do not want query to do any updates to index 326 t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly 327 default: 328 t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite 329 t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute 330 } 331 } 332 333 chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger) 334 if err != nil { 335 return 336 } 337 338 if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { 339 boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg) 340 switch true { 341 case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler): 342 // Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true 343 if t.Cfg.Querier.QueryStoreOnly { 344 break 345 } 346 // Use AsyncStore to query both ingesters local store and chunk store for store queries. 347 // Only queriers should use the AsyncStore, it should never be used in ingesters. 348 chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier, 349 calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration), 350 ) 351 case t.Cfg.isModuleEnabled(All): 352 // We want ingester to also query the store when using boltdb-shipper but only when running with target All. 353 // We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store. 354 // ToDo: See if we can avoid doing this when not running loki in clustered mode. 355 t.Cfg.Ingester.QueryStore = true 356 boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs) 357 if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType { 358 boltdbShipperConfigIdx++ 359 } 360 mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod, 361 boltdbShipperMinIngesterQueryStoreDuration) 362 if err != nil { 363 return nil, err 364 } 365 t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb 366 } 367 } 368 369 t.Store, err = loki_storage.NewStore(t.Cfg.StorageConfig, t.Cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer) 370 if err != nil { 371 return 372 } 373 374 return services.NewIdleService(nil, func(_ error) error { 375 t.Store.Stop() 376 return nil 377 }), nil 378} 379 380func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { 381 t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay) 382 if err != nil { 383 return nil, err 384 } 385 386 return services.NewIdleService(nil, nil), nil 387} 388 389// Placeholder limits type to pass to cortex frontend 390type disabledShuffleShardingLimits struct{} 391 392func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { return 0 } 393 394func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { 395 level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware") 396 397 tripperware, stopper, err := queryrange.NewTripperware( 398 t.Cfg.QueryRange, 399 util_log.Logger, 400 t.overrides, 401 t.Cfg.SchemaConfig.SchemaConfig, 402 t.Cfg.Querier.QueryIngestersWithin, 403 prometheus.DefaultRegisterer, 404 ) 405 if err != nil { 406 return 407 } 408 t.stopper = stopper 409 t.QueryFrontEndTripperware = tripperware 410 411 return services.NewIdleService(nil, nil), nil 412} 413 414func (t *Loki) initQueryFrontend() (_ services.Service, err error) { 415 level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend)) 416 417 roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{ 418 Handler: t.Cfg.Frontend.Handler, 419 FrontendV1: t.Cfg.Frontend.FrontendV1, 420 FrontendV2: t.Cfg.Frontend.FrontendV2, 421 DownstreamURL: t.Cfg.Frontend.DownstreamURL, 422 }, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer) 423 if err != nil { 424 return nil, err 425 } 426 427 if frontendV1 != nil { 428 frontendv1pb.RegisterFrontendServer(t.Server.GRPC, frontendV1) 429 t.frontend = frontendV1 430 level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v1") 431 } else if frontendV2 != nil { 432 frontendv2pb.RegisterFrontendForQuerierServer(t.Server.GRPC, frontendV2) 433 t.frontend = frontendV2 434 level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v2") 435 } else { 436 level.Debug(util_log.Logger).Log("msg", "no query frontend configured") 437 } 438 439 roundTripper = t.QueryFrontEndTripperware(roundTripper) 440 441 frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer) 442 if t.Cfg.Frontend.CompressResponses { 443 frontendHandler = gziphandler.GzipHandler(frontendHandler) 444 } 445 446 frontendHandler = middleware.Merge( 447 serverutil.RecoveryHTTPMiddleware, 448 t.HTTPAuthMiddleware, 449 queryrange.StatsHTTPMiddleware, 450 serverutil.NewPrepopulateMiddleware(), 451 serverutil.ResponseJSONMiddleware(), 452 ).Wrap(frontendHandler) 453 454 var defaultHandler http.Handler 455 if t.Cfg.Frontend.TailProxyURL != "" { 456 httpMiddleware := middleware.Merge( 457 t.HTTPAuthMiddleware, 458 queryrange.StatsHTTPMiddleware, 459 ) 460 tailURL, err := url.Parse(t.Cfg.Frontend.TailProxyURL) 461 if err != nil { 462 return nil, err 463 } 464 tp := httputil.NewSingleHostReverseProxy(tailURL) 465 466 director := tp.Director 467 tp.Director = func(req *http.Request) { 468 director(req) 469 req.Host = tailURL.Host 470 } 471 472 defaultHandler = httpMiddleware.Wrap(tp) 473 } else { 474 defaultHandler = frontendHandler 475 } 476 t.Server.HTTP.Handle("/loki/api/v1/query_range", frontendHandler) 477 t.Server.HTTP.Handle("/loki/api/v1/query", frontendHandler) 478 t.Server.HTTP.Handle("/loki/api/v1/label", frontendHandler) 479 t.Server.HTTP.Handle("/loki/api/v1/labels", frontendHandler) 480 t.Server.HTTP.Handle("/loki/api/v1/label/{name}/values", frontendHandler) 481 t.Server.HTTP.Handle("/loki/api/v1/series", frontendHandler) 482 t.Server.HTTP.Handle("/api/prom/query", frontendHandler) 483 t.Server.HTTP.Handle("/api/prom/label", frontendHandler) 484 t.Server.HTTP.Handle("/api/prom/label/{name}/values", frontendHandler) 485 t.Server.HTTP.Handle("/api/prom/series", frontendHandler) 486 487 // defer tail endpoints to the default handler 488 t.Server.HTTP.Handle("/loki/api/v1/tail", defaultHandler) 489 t.Server.HTTP.Handle("/api/prom/tail", defaultHandler) 490 491 if t.frontend == nil { 492 return services.NewIdleService(nil, func(_ error) error { 493 if t.stopper != nil { 494 t.stopper.Stop() 495 t.stopper = nil 496 } 497 return nil 498 }), nil 499 } 500 501 return services.NewIdleService(func(ctx context.Context) error { 502 return services.StartAndAwaitRunning(ctx, t.frontend) 503 }, func(_ error) error { 504 // Log but not return in case of error, so that other following dependencies 505 // are stopped too. 506 if err := services.StopAndAwaitTerminated(context.Background(), t.frontend); err != nil { 507 level.Warn(util_log.Logger).Log("msg", "failed to stop frontend service", "err", err) 508 } 509 510 if t.stopper != nil { 511 t.stopper.Stop() 512 } 513 return nil 514 }), nil 515} 516 517func (t *Loki) initRulerStorage() (_ services.Service, err error) { 518 // if the ruler is not configured and we're in single binary then let's just log an error and continue. 519 // unfortunately there is no way to generate a "default" config and compare default against actual 520 // to determine if it's unconfigured. the following check, however, correctly tests this. 521 // Single binary integration tests will break if this ever drifts 522 if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() { 523 level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.") 524 return 525 } 526 527 // Loki doesn't support the configdb backend, but without excessive mangling/refactoring 528 // it's hard to enforce this at validation time. Therefore detect this and fail early. 529 if t.Cfg.Ruler.StoreConfig.Type == "configdb" { 530 return nil, errors.New("configdb is not supported as a Loki rules backend type") 531 } 532 533 // Make sure storage directory exists if using filesystem store 534 if t.Cfg.Ruler.StoreConfig.Type == "local" && t.Cfg.Ruler.StoreConfig.Local.Directory != "" { 535 err := chunk_util.EnsureDirectory(t.Cfg.Ruler.StoreConfig.Local.Directory) 536 if err != nil { 537 return nil, err 538 } 539 } 540 541 t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, ruler.GroupLoader{}, util_log.Logger) 542 543 return 544} 545 546func (t *Loki) initRuler() (_ services.Service, err error) { 547 if t.RulerStorage == nil { 548 level.Info(util_log.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.") 549 return nil, nil 550 } 551 552 t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort 553 t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV 554 q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) 555 if err != nil { 556 return nil, err 557 } 558 559 engine := logql.NewEngine(t.Cfg.Querier.Engine, q, t.overrides) 560 561 t.ruler, err = ruler.NewRuler( 562 t.Cfg.Ruler, 563 engine, 564 prometheus.DefaultRegisterer, 565 util_log.Logger, 566 t.RulerStorage, 567 t.overrides, 568 ) 569 570 if err != nil { 571 return 572 } 573 574 t.rulerAPI = cortex_ruler.NewAPI(t.ruler, t.RulerStorage, util_log.Logger) 575 576 // Expose HTTP endpoints. 577 if t.Cfg.Ruler.EnableAPI { 578 579 t.Server.HTTP.Handle("/ruler/ring", t.ruler) 580 cortex_ruler.RegisterRulerServer(t.Server.GRPC, t.ruler) 581 582 // Prometheus Rule API Routes 583 t.Server.HTTP.Path("/prometheus/api/v1/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.PrometheusRules))) 584 t.Server.HTTP.Path("/prometheus/api/v1/alerts").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.PrometheusAlerts))) 585 586 // Ruler Legacy API Routes 587 t.Server.HTTP.Path("/api/prom/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules))) 588 t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules))) 589 t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.CreateRuleGroup))) 590 t.Server.HTTP.Path("/api/prom/rules/{namespace}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteNamespace))) 591 t.Server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.GetRuleGroup))) 592 t.Server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup))) 593 594 // Ruler API Routes 595 t.Server.HTTP.Path("/loki/api/v1/rules").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules))) 596 t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.ListRules))) 597 t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.CreateRuleGroup))) 598 t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteNamespace))) 599 t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.GetRuleGroup))) 600 t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup))) 601 } 602 603 return t.ruler, nil 604} 605 606func (t *Loki) initMemberlistKV() (services.Service, error) { 607 reg := prometheus.DefaultRegisterer 608 t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer 609 t.Cfg.MemberlistKV.Codecs = []codec.Codec{ 610 ring.GetCodec(), 611 } 612 613 dnsProviderReg := prometheus.WrapRegistererWithPrefix( 614 "cortex_", 615 prometheus.WrapRegistererWith( 616 prometheus.Labels{"name": "memberlist"}, 617 reg, 618 ), 619 ) 620 dnsProvider := dns.NewProvider(util_log.Logger, dnsProviderReg, dns.GolangResolverType) 621 622 t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV, util_log.Logger, dnsProvider, reg) 623 return t.MemberlistKV, nil 624} 625 626func (t *Loki) initCompactor() (services.Service, error) { 627 err := t.Cfg.SchemaConfig.Load() 628 if err != nil { 629 return nil, err 630 } 631 t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer) 632 if err != nil { 633 return nil, err 634 } 635 636 if t.Cfg.CompactorConfig.RetentionEnabled { 637 t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) 638 t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) 639 t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) 640 } 641 642 return t.compactor, nil 643} 644 645func (t *Loki) initIndexGateway() (services.Service, error) { 646 t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly 647 objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config) 648 if err != nil { 649 return nil, err 650 } 651 652 shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, prometheus.DefaultRegisterer) 653 if err != nil { 654 return nil, err 655 } 656 657 gateway := indexgateway.NewIndexGateway(shipperIndexClient.(*shipper.Shipper)) 658 indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway) 659 return gateway, nil 660} 661 662func (t *Loki) initQueryScheduler() (services.Service, error) { 663 s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer) 664 if err != nil { 665 return nil, err 666 } 667 668 schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s) 669 schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s) 670 return s, nil 671} 672 673func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { 674 if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { 675 return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") 676 } 677 678 if maxLookBackConfig == 0 { 679 // If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the minDuration. 680 return minDuration, nil 681 } else if maxLookBackConfig > 0 && maxLookBackConfig < minDuration { 682 // If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than minDuration or throw an error 683 return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+ 684 "which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+ 685 "greater than the default or remove it from the configuration to use the default", maxLookBackConfig, minDuration) 686 } 687 return maxLookBackConfig, nil 688} 689 690func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDuration time.Duration) time.Duration { 691 // 0 means do not limit queries, we would also not limit ingester queries from AsyncStore. 692 if queryIngestersWithinConfig == 0 { 693 return 0 694 } 695 696 if queryIngestersWithinConfig < minDuration { 697 return minDuration 698 } 699 return queryIngestersWithinConfig 700} 701 702// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded. 703// It also considers index cache validity because a querier could have cached index just before it was going to resync which means 704// it would keep serving index until the cache entries expire. 705func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration { 706 return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval 707} 708 709// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed. 710func boltdbShipperIngesterIndexUploadDelay() time.Duration { 711 return uploads.ShardDBsByDuration + shipper.UploadInterval 712} 713 714// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to 715// avoid missing any logs or chunk ids due to async nature of BoltDB Shipper. 716func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration { 717 return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 2*time.Minute 718} 719