1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package v2 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "time" 26 27 "github.com/golang/protobuf/proto" 28 "github.com/golang/protobuf/ptypes" 29 "google.golang.org/grpc/xds/internal/client/load" 30 31 v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 32 v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" 33 lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" 34 lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" 35 "google.golang.org/grpc" 36 "google.golang.org/grpc/xds/internal" 37) 38 39const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters" 40 41type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient 42 43func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { 44 c := lrsgrpc.NewLoadReportingServiceClient(cc) 45 return c.StreamLoadStats(ctx) 46} 47 48func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { 49 stream, ok := s.(lrsStream) 50 if !ok { 51 return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) 52 } 53 node := proto.Clone(v2c.nodeProto).(*v2corepb.Node) 54 if node == nil { 55 node = &v2corepb.Node{} 56 } 57 node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters) 58 59 req := &lrspb.LoadStatsRequest{Node: node} 60 v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req) 61 return stream.Send(req) 62} 63 64func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { 65 stream, ok := s.(lrsStream) 66 if !ok { 67 return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) 68 } 69 70 resp, err := stream.Recv() 71 if err != nil { 72 return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err) 73 } 74 v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp) 75 76 interval, err := ptypes.Duration(resp.GetLoadReportingInterval()) 77 if err != nil { 78 return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) 79 } 80 81 if resp.ReportEndpointGranularity { 82 // TODO: fixme to support per endpoint loads. 83 return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") 84 } 85 86 clusters := resp.Clusters 87 if resp.SendAllClusters { 88 // Return nil to send stats for all clusters. 89 clusters = nil 90 } 91 92 return clusters, interval, nil 93} 94 95func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error { 96 stream, ok := s.(lrsStream) 97 if !ok { 98 return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) 99 } 100 101 var clusterStats []*v2endpointpb.ClusterStats 102 for _, sd := range loads { 103 var ( 104 droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests 105 localityStats []*v2endpointpb.UpstreamLocalityStats 106 ) 107 for category, count := range sd.Drops { 108 droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{ 109 Category: category, 110 DroppedCount: count, 111 }) 112 } 113 for l, localityData := range sd.LocalityStats { 114 lid, err := internal.LocalityIDFromString(l) 115 if err != nil { 116 return err 117 } 118 var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats 119 for name, loadData := range localityData.LoadStats { 120 loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{ 121 MetricName: name, 122 NumRequestsFinishedWithMetric: loadData.Count, 123 TotalMetricValue: loadData.Sum, 124 }) 125 } 126 localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{ 127 Locality: &v2corepb.Locality{ 128 Region: lid.Region, 129 Zone: lid.Zone, 130 SubZone: lid.SubZone, 131 }, 132 TotalSuccessfulRequests: localityData.RequestStats.Succeeded, 133 TotalRequestsInProgress: localityData.RequestStats.InProgress, 134 TotalErrorRequests: localityData.RequestStats.Errored, 135 LoadMetricStats: loadMetricStats, 136 UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads. 137 }) 138 } 139 140 clusterStats = append(clusterStats, &v2endpointpb.ClusterStats{ 141 ClusterName: sd.Cluster, 142 ClusterServiceName: sd.Service, 143 UpstreamLocalityStats: localityStats, 144 TotalDroppedRequests: sd.TotalDrops, 145 DroppedRequests: droppedReqs, 146 LoadReportInterval: ptypes.DurationProto(sd.ReportInterval), 147 }) 148 149 } 150 151 req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} 152 v2c.logger.Infof("lrs: sending LRS loads: %+v", req) 153 return stream.Send(req) 154} 155