1package querier
2
3import (
4	"fmt"
5	"net/http"
6
7	querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
8	util_log "github.com/cortexproject/cortex/pkg/util/log"
9	"github.com/go-kit/log/level"
10	"github.com/gorilla/mux"
11	"github.com/grafana/dskit/services"
12	"github.com/opentracing-contrib/go-stdlib/nethttp"
13	"github.com/opentracing/opentracing-go"
14	"github.com/prometheus/client_golang/prometheus"
15	httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
16	"github.com/weaveworks/common/middleware"
17
18	serverutil "github.com/grafana/loki/pkg/util/server"
19)
20
21type WorkerServiceConfig struct {
22	AllEnabled            bool
23	GrpcListenPort        int
24	QuerierMaxConcurrent  int
25	QuerierWorkerConfig   *querier_worker.Config
26	QueryFrontendEnabled  bool
27	QuerySchedulerEnabled bool
28}
29
30// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external
31// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all
32// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or
33// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module.
34//
35// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal
36//    HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend
37//    worker.
38//
39// 2. Querier Standalone: The querier will register the internal HTTP router with the external
40//    HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
41//    as a http.Handler to the frontend worker.
42//
43func InitWorkerService(
44	cfg WorkerServiceConfig,
45	queryRoutesToHandlers map[string]http.Handler,
46	externalRouter *mux.Router,
47	externalHandler http.Handler,
48	authMiddleware middleware.Interface,
49) (serve services.Service, err error) {
50
51	internalRouter := mux.NewRouter()
52	for route, handler := range queryRoutesToHandlers {
53		internalRouter.Handle(route, handler)
54	}
55
56	// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
57	// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
58	// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
59	// middleware instrumentation.
60	if querierRunningStandalone(cfg) {
61
62		// First, register the internal querier handler with the external HTTP server
63		routes := make([]string, len(queryRoutesToHandlers))
64		var idx = 0
65		for route := range queryRoutesToHandlers {
66			routes[idx] = route
67			idx++
68		}
69
70		registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)
71
72		//If no frontend or scheduler address has been configured, then there is no place for the
73		//querier worker to request work from, so no need to start a worker service
74		if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
75			return nil, nil
76		}
77
78		// If a frontend or scheduler address has been configured, return a querier worker service that uses
79		// the external Loki Server HTTP server, which has now has the internal handler's routes registered with it
80		return querier_worker.NewQuerierWorker(
81			*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer)
82	}
83
84	// Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address
85	// is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port.
86	if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
87		address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort)
88		level.Warn(util_log.Logger).Log(
89			"msg", "Worker address is empty, attempting automatic worker configuration.  If queries are unresponsive consider configuring the worker explicitly.",
90			"address", address)
91		cfg.QuerierWorkerConfig.FrontendAddress = address
92	}
93
94	// Add a middleware to extract the trace context and add a header.
95	var internalHandler http.Handler
96	internalHandler = nethttp.MiddlewareFunc(
97		opentracing.GlobalTracer(),
98		internalRouter.ServeHTTP,
99		nethttp.OperationNameFunc(func(r *http.Request) string {
100			return "internalQuerier"
101		}))
102
103	// If queries are processed using the external HTTP Server, we need wrap the internal querier with
104	// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
105	// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
106	httpMiddleware := middleware.Merge(
107		serverutil.RecoveryHTTPMiddleware,
108		authMiddleware,
109		serverutil.NewPrepopulateMiddleware(),
110	)
111
112	internalHandler = httpMiddleware.Wrap(internalHandler)
113
114	//Querier worker's max concurrent requests must be the same as the querier setting
115	(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent
116
117	//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
118	//and the query frontend
119	return querier_worker.NewQuerierWorker(
120		*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer)
121}
122
123func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
124	httpMiddleware := middleware.Merge(
125		serverutil.RecoveryHTTPMiddleware,
126		authMiddleware,
127		serverutil.NewPrepopulateMiddleware(),
128		serverutil.ResponseJSONMiddleware(),
129	)
130
131	for _, route := range routes {
132		externalRouter.Handle(route, httpMiddleware.Wrap(internalHandler))
133	}
134}
135
136func querierRunningStandalone(cfg WorkerServiceConfig) bool {
137	runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.AllEnabled
138	level.Debug(util_log.Logger).Log(
139		"msg", "determining if querier is running as standalone target",
140		"runningStandalone", runningStandalone,
141		"queryFrontendEnabled", cfg.QueryFrontendEnabled,
142		"queryScheduleEnabled", cfg.QuerySchedulerEnabled,
143		"allEnabled", cfg.AllEnabled,
144	)
145
146	return runningStandalone
147}
148