1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package v2
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"time"
26
27	"github.com/golang/protobuf/proto"
28	"github.com/golang/protobuf/ptypes"
29	"google.golang.org/grpc/xds/internal/client/load"
30
31	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
32	v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
33	lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
34	lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
35	"google.golang.org/grpc"
36	"google.golang.org/grpc/xds/internal"
37)
38
39const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
40
41type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
42
43func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
44	c := lrsgrpc.NewLoadReportingServiceClient(cc)
45	return c.StreamLoadStats(ctx)
46}
47
48func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
49	stream, ok := s.(lrsStream)
50	if !ok {
51		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
52	}
53	node := proto.Clone(v2c.nodeProto).(*v2corepb.Node)
54	if node == nil {
55		node = &v2corepb.Node{}
56	}
57	node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
58
59	req := &lrspb.LoadStatsRequest{Node: node}
60	v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
61	return stream.Send(req)
62}
63
64func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
65	stream, ok := s.(lrsStream)
66	if !ok {
67		return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
68	}
69
70	resp, err := stream.Recv()
71	if err != nil {
72		return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
73	}
74	v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
75
76	interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
77	if err != nil {
78		return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
79	}
80
81	if resp.ReportEndpointGranularity {
82		// TODO: fixme to support per endpoint loads.
83		return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
84	}
85
86	clusters := resp.Clusters
87	if resp.SendAllClusters {
88		// Return nil to send stats for all clusters.
89		clusters = nil
90	}
91
92	return clusters, interval, nil
93}
94
95func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
96	stream, ok := s.(lrsStream)
97	if !ok {
98		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
99	}
100
101	var clusterStats []*v2endpointpb.ClusterStats
102	for _, sd := range loads {
103		var (
104			droppedReqs   []*v2endpointpb.ClusterStats_DroppedRequests
105			localityStats []*v2endpointpb.UpstreamLocalityStats
106		)
107		for category, count := range sd.Drops {
108			droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
109				Category:     category,
110				DroppedCount: count,
111			})
112		}
113		for l, localityData := range sd.LocalityStats {
114			lid, err := internal.LocalityIDFromString(l)
115			if err != nil {
116				return err
117			}
118			var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
119			for name, loadData := range localityData.LoadStats {
120				loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
121					MetricName:                    name,
122					NumRequestsFinishedWithMetric: loadData.Count,
123					TotalMetricValue:              loadData.Sum,
124				})
125			}
126			localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
127				Locality: &v2corepb.Locality{
128					Region:  lid.Region,
129					Zone:    lid.Zone,
130					SubZone: lid.SubZone,
131				},
132				TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
133				TotalRequestsInProgress: localityData.RequestStats.InProgress,
134				TotalErrorRequests:      localityData.RequestStats.Errored,
135				LoadMetricStats:         loadMetricStats,
136				UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
137			})
138		}
139
140		clusterStats = append(clusterStats, &v2endpointpb.ClusterStats{
141			ClusterName:           sd.Cluster,
142			ClusterServiceName:    sd.Service,
143			UpstreamLocalityStats: localityStats,
144			TotalDroppedRequests:  sd.TotalDrops,
145			DroppedRequests:       droppedReqs,
146			LoadReportInterval:    ptypes.DurationProto(sd.ReportInterval),
147		})
148
149	}
150
151	req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
152	v2c.logger.Infof("lrs: sending LRS loads: %+v", req)
153	return stream.Send(req)
154}
155