1package worker 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 "time" 8 9 "github.com/go-kit/log" 10 "github.com/go-kit/log/level" 11 "github.com/grafana/dskit/backoff" 12 "github.com/weaveworks/common/httpgrpc" 13 "google.golang.org/grpc" 14 15 "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" 16 "github.com/cortexproject/cortex/pkg/querier/stats" 17 querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" 18) 19 20var ( 21 processorBackoffConfig = backoff.Config{ 22 MinBackoff: 50 * time.Millisecond, 23 MaxBackoff: 1 * time.Second, 24 } 25) 26 27func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) processor { 28 return &frontendProcessor{ 29 log: log, 30 handler: handler, 31 maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, 32 querierID: cfg.QuerierID, 33 } 34} 35 36// Handles incoming queries from frontend. 37type frontendProcessor struct { 38 handler RequestHandler 39 maxMessageSize int 40 querierID string 41 42 log log.Logger 43} 44 45// notifyShutdown implements processor. 46func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) { 47 client := frontendv1pb.NewFrontendClient(conn) 48 49 req := &frontendv1pb.NotifyClientShutdownRequest{ClientID: fp.querierID} 50 if _, err := client.NotifyClientShutdown(ctx, req); err != nil { 51 // Since we're shutting down there's nothing we can do except logging it. 52 level.Warn(fp.log).Log("msg", "failed to notify querier shutdown to query-frontend", "address", address, "err", err) 53 } 54} 55 56// runOne loops, trying to establish a stream to the frontend to begin request processing. 57func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { 58 client := frontendv1pb.NewFrontendClient(conn) 59 60 backoff := backoff.New(ctx, processorBackoffConfig) 61 for backoff.Ongoing() { 62 c, err := client.Process(ctx) 63 if err != nil { 64 level.Error(fp.log).Log("msg", "error contacting frontend", "address", address, "err", err) 65 backoff.Wait() 66 continue 67 } 68 69 if err := fp.process(c); err != nil { 70 level.Error(fp.log).Log("msg", "error processing requests", "address", address, "err", err) 71 backoff.Wait() 72 continue 73 } 74 75 backoff.Reset() 76 } 77} 78 79// process loops processing requests on an established stream. 80func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error { 81 // Build a child context so we can cancel a query when the stream is closed. 82 ctx, cancel := context.WithCancel(c.Context()) 83 defer cancel() 84 85 for { 86 request, err := c.Recv() 87 if err != nil { 88 return err 89 } 90 91 switch request.Type { 92 case frontendv1pb.HTTP_REQUEST: 93 // Handle the request on a "background" goroutine, so we go back to 94 // blocking on c.Recv(). This allows us to detect the stream closing 95 // and cancel the query. We don't actually handle queries in parallel 96 // here, as we're running in lock step with the server - each Recv is 97 // paired with a Send. 98 go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error { 99 return c.Send(&frontendv1pb.ClientToFrontend{ 100 HttpResponse: response, 101 Stats: stats, 102 }) 103 }) 104 105 case frontendv1pb.GET_ID: 106 err := c.Send(&frontendv1pb.ClientToFrontend{ClientID: fp.querierID}) 107 if err != nil { 108 return err 109 } 110 111 default: 112 return fmt.Errorf("unknown request type: %v", request.Type) 113 } 114 } 115} 116 117func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, statsEnabled bool, sendHTTPResponse func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error) { 118 var stats *querier_stats.Stats 119 if statsEnabled { 120 stats, ctx = querier_stats.ContextWithEmptyStats(ctx) 121 } 122 123 response, err := fp.handler.Handle(ctx, request) 124 if err != nil { 125 var ok bool 126 response, ok = httpgrpc.HTTPResponseFromError(err) 127 if !ok { 128 response = &httpgrpc.HTTPResponse{ 129 Code: http.StatusInternalServerError, 130 Body: []byte(err.Error()), 131 } 132 } 133 } 134 135 // Ensure responses that are too big are not retried. 136 if len(response.Body) >= fp.maxMessageSize { 137 errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) 138 response = &httpgrpc.HTTPResponse{ 139 Code: http.StatusRequestEntityTooLarge, 140 Body: []byte(errMsg), 141 } 142 level.Error(fp.log).Log("msg", "error processing query", "err", errMsg) 143 } 144 145 if err := sendHTTPResponse(response, stats); err != nil { 146 level.Error(fp.log).Log("msg", "error processing requests", "err", err) 147 } 148} 149