1package worker
2
3import (
4	"context"
5	"fmt"
6	"net/http"
7	"time"
8
9	"github.com/go-kit/log"
10	"github.com/go-kit/log/level"
11	"github.com/grafana/dskit/backoff"
12	"github.com/weaveworks/common/httpgrpc"
13	"google.golang.org/grpc"
14
15	"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
16	"github.com/cortexproject/cortex/pkg/querier/stats"
17	querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
18)
19
20var (
21	processorBackoffConfig = backoff.Config{
22		MinBackoff: 50 * time.Millisecond,
23		MaxBackoff: 1 * time.Second,
24	}
25)
26
27func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) processor {
28	return &frontendProcessor{
29		log:            log,
30		handler:        handler,
31		maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
32		querierID:      cfg.QuerierID,
33	}
34}
35
36// Handles incoming queries from frontend.
37type frontendProcessor struct {
38	handler        RequestHandler
39	maxMessageSize int
40	querierID      string
41
42	log log.Logger
43}
44
45// notifyShutdown implements processor.
46func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
47	client := frontendv1pb.NewFrontendClient(conn)
48
49	req := &frontendv1pb.NotifyClientShutdownRequest{ClientID: fp.querierID}
50	if _, err := client.NotifyClientShutdown(ctx, req); err != nil {
51		// Since we're shutting down there's nothing we can do except logging it.
52		level.Warn(fp.log).Log("msg", "failed to notify querier shutdown to query-frontend", "address", address, "err", err)
53	}
54}
55
56// runOne loops, trying to establish a stream to the frontend to begin request processing.
57func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
58	client := frontendv1pb.NewFrontendClient(conn)
59
60	backoff := backoff.New(ctx, processorBackoffConfig)
61	for backoff.Ongoing() {
62		c, err := client.Process(ctx)
63		if err != nil {
64			level.Error(fp.log).Log("msg", "error contacting frontend", "address", address, "err", err)
65			backoff.Wait()
66			continue
67		}
68
69		if err := fp.process(c); err != nil {
70			level.Error(fp.log).Log("msg", "error processing requests", "address", address, "err", err)
71			backoff.Wait()
72			continue
73		}
74
75		backoff.Reset()
76	}
77}
78
79// process loops processing requests on an established stream.
80func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error {
81	// Build a child context so we can cancel a query when the stream is closed.
82	ctx, cancel := context.WithCancel(c.Context())
83	defer cancel()
84
85	for {
86		request, err := c.Recv()
87		if err != nil {
88			return err
89		}
90
91		switch request.Type {
92		case frontendv1pb.HTTP_REQUEST:
93			// Handle the request on a "background" goroutine, so we go back to
94			// blocking on c.Recv().  This allows us to detect the stream closing
95			// and cancel the query.  We don't actually handle queries in parallel
96			// here, as we're running in lock step with the server - each Recv is
97			// paired with a Send.
98			go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error {
99				return c.Send(&frontendv1pb.ClientToFrontend{
100					HttpResponse: response,
101					Stats:        stats,
102				})
103			})
104
105		case frontendv1pb.GET_ID:
106			err := c.Send(&frontendv1pb.ClientToFrontend{ClientID: fp.querierID})
107			if err != nil {
108				return err
109			}
110
111		default:
112			return fmt.Errorf("unknown request type: %v", request.Type)
113		}
114	}
115}
116
117func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, statsEnabled bool, sendHTTPResponse func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error) {
118	var stats *querier_stats.Stats
119	if statsEnabled {
120		stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
121	}
122
123	response, err := fp.handler.Handle(ctx, request)
124	if err != nil {
125		var ok bool
126		response, ok = httpgrpc.HTTPResponseFromError(err)
127		if !ok {
128			response = &httpgrpc.HTTPResponse{
129				Code: http.StatusInternalServerError,
130				Body: []byte(err.Error()),
131			}
132		}
133	}
134
135	// Ensure responses that are too big are not retried.
136	if len(response.Body) >= fp.maxMessageSize {
137		errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize)
138		response = &httpgrpc.HTTPResponse{
139			Code: http.StatusRequestEntityTooLarge,
140			Body: []byte(errMsg),
141		}
142		level.Error(fp.log).Log("msg", "error processing query", "err", errMsg)
143	}
144
145	if err := sendHTTPResponse(response, stats); err != nil {
146		level.Error(fp.log).Log("msg", "error processing requests", "err", err)
147	}
148}
149