1package querier 2 3import ( 4 "fmt" 5 "net/http" 6 7 querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" 8 util_log "github.com/cortexproject/cortex/pkg/util/log" 9 "github.com/go-kit/log/level" 10 "github.com/gorilla/mux" 11 "github.com/grafana/dskit/services" 12 "github.com/opentracing-contrib/go-stdlib/nethttp" 13 "github.com/opentracing/opentracing-go" 14 "github.com/prometheus/client_golang/prometheus" 15 httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" 16 "github.com/weaveworks/common/middleware" 17 18 serverutil "github.com/grafana/loki/pkg/util/server" 19) 20 21type WorkerServiceConfig struct { 22 AllEnabled bool 23 GrpcListenPort int 24 QuerierMaxConcurrent int 25 QuerierWorkerConfig *querier_worker.Config 26 QueryFrontendEnabled bool 27 QuerySchedulerEnabled bool 28} 29 30// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external 31// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all 32// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or 33// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module. 34// 35// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal 36// HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend 37// worker. 38// 39// 2. Querier Standalone: The querier will register the internal HTTP router with the external 40// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed 41// as a http.Handler to the frontend worker. 42// 43func InitWorkerService( 44 cfg WorkerServiceConfig, 45 queryRoutesToHandlers map[string]http.Handler, 46 externalRouter *mux.Router, 47 externalHandler http.Handler, 48 authMiddleware middleware.Interface, 49) (serve services.Service, err error) { 50 51 internalRouter := mux.NewRouter() 52 for route, handler := range queryRoutesToHandlers { 53 internalRouter.Handle(route, handler) 54 } 55 56 // If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal 57 // HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the 58 // external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default 59 // middleware instrumentation. 60 if querierRunningStandalone(cfg) { 61 62 // First, register the internal querier handler with the external HTTP server 63 routes := make([]string, len(queryRoutesToHandlers)) 64 var idx = 0 65 for route := range queryRoutesToHandlers { 66 routes[idx] = route 67 idx++ 68 } 69 70 registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware) 71 72 //If no frontend or scheduler address has been configured, then there is no place for the 73 //querier worker to request work from, so no need to start a worker service 74 if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { 75 return nil, nil 76 } 77 78 // If a frontend or scheduler address has been configured, return a querier worker service that uses 79 // the external Loki Server HTTP server, which has now has the internal handler's routes registered with it 80 return querier_worker.NewQuerierWorker( 81 *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer) 82 } 83 84 // Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address 85 // is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port. 86 if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { 87 address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort) 88 level.Warn(util_log.Logger).Log( 89 "msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", 90 "address", address) 91 cfg.QuerierWorkerConfig.FrontendAddress = address 92 } 93 94 // Add a middleware to extract the trace context and add a header. 95 var internalHandler http.Handler 96 internalHandler = nethttp.MiddlewareFunc( 97 opentracing.GlobalTracer(), 98 internalRouter.ServeHTTP, 99 nethttp.OperationNameFunc(func(r *http.Request) string { 100 return "internalQuerier" 101 })) 102 103 // If queries are processed using the external HTTP Server, we need wrap the internal querier with 104 // HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the 105 // request context, as well as make sure any x-www-url-formencoded params are correctly parsed 106 httpMiddleware := middleware.Merge( 107 serverutil.RecoveryHTTPMiddleware, 108 authMiddleware, 109 serverutil.NewPrepopulateMiddleware(), 110 ) 111 112 internalHandler = httpMiddleware.Wrap(internalHandler) 113 114 //Querier worker's max concurrent requests must be the same as the querier setting 115 (*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent 116 117 //Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier 118 //and the query frontend 119 return querier_worker.NewQuerierWorker( 120 *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer) 121} 122 123func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) { 124 httpMiddleware := middleware.Merge( 125 serverutil.RecoveryHTTPMiddleware, 126 authMiddleware, 127 serverutil.NewPrepopulateMiddleware(), 128 serverutil.ResponseJSONMiddleware(), 129 ) 130 131 for _, route := range routes { 132 externalRouter.Handle(route, httpMiddleware.Wrap(internalHandler)) 133 } 134} 135 136func querierRunningStandalone(cfg WorkerServiceConfig) bool { 137 runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.AllEnabled 138 level.Debug(util_log.Logger).Log( 139 "msg", "determining if querier is running as standalone target", 140 "runningStandalone", runningStandalone, 141 "queryFrontendEnabled", cfg.QueryFrontendEnabled, 142 "queryScheduleEnabled", cfg.QuerySchedulerEnabled, 143 "allEnabled", cfg.AllEnabled, 144 ) 145 146 return runningStandalone 147} 148