1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package client
19
20import (
21	"fmt"
22	"net"
23	"strconv"
24
25	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
26	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
27	endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
28	typepb "github.com/envoyproxy/go-control-plane/envoy/type"
29	"github.com/golang/protobuf/ptypes"
30	"google.golang.org/grpc/grpclog"
31	"google.golang.org/grpc/xds/internal"
32)
33
34// OverloadDropConfig contains the config to drop overloads.
35type OverloadDropConfig struct {
36	Category    string
37	Numerator   uint32
38	Denominator uint32
39}
40
41// EndpointHealthStatus represents the health status of an endpoint.
42type EndpointHealthStatus int32
43
44const (
45	// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
46	EndpointHealthStatusUnknown EndpointHealthStatus = iota
47	// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
48	EndpointHealthStatusHealthy
49	// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
50	EndpointHealthStatusUnhealthy
51	// EndpointHealthStatusDraining represents HealthStatus DRAINING.
52	EndpointHealthStatusDraining
53	// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
54	EndpointHealthStatusTimeout
55	// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
56	EndpointHealthStatusDegraded
57)
58
59// Endpoint contains information of an endpoint.
60type Endpoint struct {
61	Address      string
62	HealthStatus EndpointHealthStatus
63	Weight       uint32
64}
65
66// Locality contains information of a locality.
67type Locality struct {
68	Endpoints []Endpoint
69	ID        internal.Locality
70	Priority  uint32
71	Weight    uint32
72}
73
74// EDSUpdate contains an EDS update.
75type EDSUpdate struct {
76	Drops      []OverloadDropConfig
77	Localities []Locality
78}
79
80func parseAddress(socketAddress *corepb.SocketAddress) string {
81	return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
82}
83
84func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
85	percentage := dropPolicy.GetDropPercentage()
86	var (
87		numerator   = percentage.GetNumerator()
88		denominator uint32
89	)
90	switch percentage.GetDenominator() {
91	case typepb.FractionalPercent_HUNDRED:
92		denominator = 100
93	case typepb.FractionalPercent_TEN_THOUSAND:
94		denominator = 10000
95	case typepb.FractionalPercent_MILLION:
96		denominator = 1000000
97	}
98	return OverloadDropConfig{
99		Category:    dropPolicy.GetCategory(),
100		Numerator:   numerator,
101		Denominator: denominator,
102	}
103}
104
105func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint {
106	endpoints := make([]Endpoint, 0, len(lbEndpoints))
107	for _, lbEndpoint := range lbEndpoints {
108		endpoints = append(endpoints, Endpoint{
109			HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
110			Address:      parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
111			Weight:       lbEndpoint.GetLoadBalancingWeight().GetValue(),
112		})
113	}
114	return endpoints
115}
116
117// ParseEDSRespProto turns EDS response proto message to EDSUpdate.
118//
119// This is temporarily exported to be used in eds balancer, before it switches
120// to use xds client. TODO: unexport.
121func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) {
122	ret := &EDSUpdate{}
123	for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
124		ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
125	}
126	priorities := make(map[uint32]struct{})
127	for _, locality := range m.Endpoints {
128		l := locality.GetLocality()
129		if l == nil {
130			return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
131		}
132		lid := internal.Locality{
133			Region:  l.Region,
134			Zone:    l.Zone,
135			SubZone: l.SubZone,
136		}
137		priority := locality.GetPriority()
138		priorities[priority] = struct{}{}
139		ret.Localities = append(ret.Localities, Locality{
140			ID:        lid,
141			Endpoints: parseEndpoints(locality.GetLbEndpoints()),
142			Weight:    locality.GetLoadBalancingWeight().GetValue(),
143			Priority:  priority,
144		})
145	}
146	for i := 0; i < len(priorities); i++ {
147		if _, ok := priorities[uint32(i)]; !ok {
148			return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
149		}
150	}
151	return ret, nil
152}
153
154// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails.
155// This is used by EDS balancer tests.
156//
157// TODO: delete this. The EDS balancer should build an EDSUpdate directly,
158//  instead of building and parsing a proto message.
159func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
160	u, err := ParseEDSRespProto(m)
161	if err != nil {
162		panic(err.Error())
163	}
164	return u
165}
166
167func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
168	v2c.mu.Lock()
169	defer v2c.mu.Unlock()
170
171	wi := v2c.watchMap[edsURL]
172	if wi == nil {
173		return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
174	}
175
176	var returnUpdate *EDSUpdate
177	for _, r := range resp.GetResources() {
178		var resource ptypes.DynamicAny
179		if err := ptypes.UnmarshalAny(r, &resource); err != nil {
180			return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err)
181		}
182		cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment)
183		if !ok {
184			return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message)
185		}
186
187		if cla.GetClusterName() != wi.target[0] {
188			grpclog.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0])
189			// We won't validate the remaining resources. If one of the
190			// uninteresting ones is invalid, we will still ACK the response.
191			continue
192		}
193
194		u, err := ParseEDSRespProto(cla)
195		if err != nil {
196			return err
197		}
198
199		returnUpdate = u
200		// Break from the loop because the request resource is found. But
201		// this also means we won't validate the remaining resources. If one
202		// of the uninteresting ones is invalid, we will still ACK the
203		// response.
204		break
205	}
206
207	if returnUpdate != nil {
208		wi.stopTimer()
209		wi.callback.(edsCallback)(returnUpdate, nil)
210	}
211
212	return nil
213}
214