1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package client
19
20import (
21	"fmt"
22	"net"
23	"strconv"
24
25	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
26	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
27	endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
28	typepb "github.com/envoyproxy/go-control-plane/envoy/type"
29	"github.com/golang/protobuf/ptypes"
30	"google.golang.org/grpc/xds/internal"
31)
32
33// OverloadDropConfig contains the config to drop overloads.
34type OverloadDropConfig struct {
35	Category    string
36	Numerator   uint32
37	Denominator uint32
38}
39
40// EndpointHealthStatus represents the health status of an endpoint.
41type EndpointHealthStatus int32
42
43const (
44	// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
45	EndpointHealthStatusUnknown EndpointHealthStatus = iota
46	// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
47	EndpointHealthStatusHealthy
48	// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
49	EndpointHealthStatusUnhealthy
50	// EndpointHealthStatusDraining represents HealthStatus DRAINING.
51	EndpointHealthStatusDraining
52	// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
53	EndpointHealthStatusTimeout
54	// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
55	EndpointHealthStatusDegraded
56)
57
58// Endpoint contains information of an endpoint.
59type Endpoint struct {
60	Address      string
61	HealthStatus EndpointHealthStatus
62	Weight       uint32
63}
64
65// Locality contains information of a locality.
66type Locality struct {
67	Endpoints []Endpoint
68	ID        internal.Locality
69	Priority  uint32
70	Weight    uint32
71}
72
73// EDSUpdate contains an EDS update.
74type EDSUpdate struct {
75	Drops      []OverloadDropConfig
76	Localities []Locality
77}
78
79func parseAddress(socketAddress *corepb.SocketAddress) string {
80	return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
81}
82
83func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
84	percentage := dropPolicy.GetDropPercentage()
85	var (
86		numerator   = percentage.GetNumerator()
87		denominator uint32
88	)
89	switch percentage.GetDenominator() {
90	case typepb.FractionalPercent_HUNDRED:
91		denominator = 100
92	case typepb.FractionalPercent_TEN_THOUSAND:
93		denominator = 10000
94	case typepb.FractionalPercent_MILLION:
95		denominator = 1000000
96	}
97	return OverloadDropConfig{
98		Category:    dropPolicy.GetCategory(),
99		Numerator:   numerator,
100		Denominator: denominator,
101	}
102}
103
104func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint {
105	endpoints := make([]Endpoint, 0, len(lbEndpoints))
106	for _, lbEndpoint := range lbEndpoints {
107		endpoints = append(endpoints, Endpoint{
108			HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
109			Address:      parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
110			Weight:       lbEndpoint.GetLoadBalancingWeight().GetValue(),
111		})
112	}
113	return endpoints
114}
115
116// ParseEDSRespProto turns EDS response proto message to EDSUpdate.
117//
118// This is temporarily exported to be used in eds balancer, before it switches
119// to use xds client. TODO: unexport.
120func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) {
121	ret := &EDSUpdate{}
122	for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
123		ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
124	}
125	priorities := make(map[uint32]struct{})
126	for _, locality := range m.Endpoints {
127		l := locality.GetLocality()
128		if l == nil {
129			return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
130		}
131		lid := internal.Locality{
132			Region:  l.Region,
133			Zone:    l.Zone,
134			SubZone: l.SubZone,
135		}
136		priority := locality.GetPriority()
137		priorities[priority] = struct{}{}
138		ret.Localities = append(ret.Localities, Locality{
139			ID:        lid,
140			Endpoints: parseEndpoints(locality.GetLbEndpoints()),
141			Weight:    locality.GetLoadBalancingWeight().GetValue(),
142			Priority:  priority,
143		})
144	}
145	for i := 0; i < len(priorities); i++ {
146		if _, ok := priorities[uint32(i)]; !ok {
147			return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
148		}
149	}
150	return ret, nil
151}
152
153// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails.
154// This is used by EDS balancer tests.
155//
156// TODO: delete this. The EDS balancer tests should build an EDSUpdate directly,
157//  instead of building and parsing a proto message.
158func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
159	u, err := ParseEDSRespProto(m)
160	if err != nil {
161		panic(err.Error())
162	}
163	return u
164}
165
166func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
167	v2c.mu.Lock()
168	defer v2c.mu.Unlock()
169
170	wi := v2c.watchMap[edsURL]
171	if wi == nil {
172		return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
173	}
174
175	var returnUpdate *EDSUpdate
176	for _, r := range resp.GetResources() {
177		var resource ptypes.DynamicAny
178		if err := ptypes.UnmarshalAny(r, &resource); err != nil {
179			return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err)
180		}
181		cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment)
182		if !ok {
183			return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message)
184		}
185		v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, cla)
186
187		if cla.GetClusterName() != wi.target[0] {
188			// We won't validate the remaining resources. If one of the
189			// uninteresting ones is invalid, we will still ACK the response.
190			continue
191		}
192
193		u, err := ParseEDSRespProto(cla)
194		if err != nil {
195			return err
196		}
197
198		returnUpdate = u
199		// Break from the loop because the request resource is found. But
200		// this also means we won't validate the remaining resources. If one
201		// of the uninteresting ones is invalid, we will still ACK the
202		// response.
203		break
204	}
205
206	if returnUpdate != nil {
207		wi.stopTimer()
208		wi.callback.(edsCallback)(returnUpdate, nil)
209	}
210
211	return nil
212}
213