1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package client 19 20import ( 21 "fmt" 22 "net" 23 "strconv" 24 25 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 26 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 27 endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" 28 typepb "github.com/envoyproxy/go-control-plane/envoy/type" 29 "github.com/golang/protobuf/ptypes" 30 "google.golang.org/grpc/xds/internal" 31) 32 33// OverloadDropConfig contains the config to drop overloads. 34type OverloadDropConfig struct { 35 Category string 36 Numerator uint32 37 Denominator uint32 38} 39 40// EndpointHealthStatus represents the health status of an endpoint. 41type EndpointHealthStatus int32 42 43const ( 44 // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. 45 EndpointHealthStatusUnknown EndpointHealthStatus = iota 46 // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. 47 EndpointHealthStatusHealthy 48 // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. 49 EndpointHealthStatusUnhealthy 50 // EndpointHealthStatusDraining represents HealthStatus DRAINING. 51 EndpointHealthStatusDraining 52 // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. 53 EndpointHealthStatusTimeout 54 // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. 55 EndpointHealthStatusDegraded 56) 57 58// Endpoint contains information of an endpoint. 59type Endpoint struct { 60 Address string 61 HealthStatus EndpointHealthStatus 62 Weight uint32 63} 64 65// Locality contains information of a locality. 66type Locality struct { 67 Endpoints []Endpoint 68 ID internal.Locality 69 Priority uint32 70 Weight uint32 71} 72 73// EDSUpdate contains an EDS update. 74type EDSUpdate struct { 75 Drops []OverloadDropConfig 76 Localities []Locality 77} 78 79func parseAddress(socketAddress *corepb.SocketAddress) string { 80 return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) 81} 82 83func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { 84 percentage := dropPolicy.GetDropPercentage() 85 var ( 86 numerator = percentage.GetNumerator() 87 denominator uint32 88 ) 89 switch percentage.GetDenominator() { 90 case typepb.FractionalPercent_HUNDRED: 91 denominator = 100 92 case typepb.FractionalPercent_TEN_THOUSAND: 93 denominator = 10000 94 case typepb.FractionalPercent_MILLION: 95 denominator = 1000000 96 } 97 return OverloadDropConfig{ 98 Category: dropPolicy.GetCategory(), 99 Numerator: numerator, 100 Denominator: denominator, 101 } 102} 103 104func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { 105 endpoints := make([]Endpoint, 0, len(lbEndpoints)) 106 for _, lbEndpoint := range lbEndpoints { 107 endpoints = append(endpoints, Endpoint{ 108 HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), 109 Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), 110 Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), 111 }) 112 } 113 return endpoints 114} 115 116// ParseEDSRespProto turns EDS response proto message to EDSUpdate. 117// 118// This is temporarily exported to be used in eds balancer, before it switches 119// to use xds client. TODO: unexport. 120func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { 121 ret := &EDSUpdate{} 122 for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { 123 ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) 124 } 125 priorities := make(map[uint32]struct{}) 126 for _, locality := range m.Endpoints { 127 l := locality.GetLocality() 128 if l == nil { 129 return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) 130 } 131 lid := internal.Locality{ 132 Region: l.Region, 133 Zone: l.Zone, 134 SubZone: l.SubZone, 135 } 136 priority := locality.GetPriority() 137 priorities[priority] = struct{}{} 138 ret.Localities = append(ret.Localities, Locality{ 139 ID: lid, 140 Endpoints: parseEndpoints(locality.GetLbEndpoints()), 141 Weight: locality.GetLoadBalancingWeight().GetValue(), 142 Priority: priority, 143 }) 144 } 145 for i := 0; i < len(priorities); i++ { 146 if _, ok := priorities[uint32(i)]; !ok { 147 return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) 148 } 149 } 150 return ret, nil 151} 152 153// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. 154// This is used by EDS balancer tests. 155// 156// TODO: delete this. The EDS balancer tests should build an EDSUpdate directly, 157// instead of building and parsing a proto message. 158func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { 159 u, err := ParseEDSRespProto(m) 160 if err != nil { 161 panic(err.Error()) 162 } 163 return u 164} 165 166func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { 167 v2c.mu.Lock() 168 defer v2c.mu.Unlock() 169 170 wi := v2c.watchMap[edsURL] 171 if wi == nil { 172 return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) 173 } 174 175 var returnUpdate *EDSUpdate 176 for _, r := range resp.GetResources() { 177 var resource ptypes.DynamicAny 178 if err := ptypes.UnmarshalAny(r, &resource); err != nil { 179 return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err) 180 } 181 cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment) 182 if !ok { 183 return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message) 184 } 185 v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, cla) 186 187 if cla.GetClusterName() != wi.target[0] { 188 // We won't validate the remaining resources. If one of the 189 // uninteresting ones is invalid, we will still ACK the response. 190 continue 191 } 192 193 u, err := ParseEDSRespProto(cla) 194 if err != nil { 195 return err 196 } 197 198 returnUpdate = u 199 // Break from the loop because the request resource is found. But 200 // this also means we won't validate the remaining resources. If one 201 // of the uninteresting ones is invalid, we will still ACK the 202 // response. 203 break 204 } 205 206 if returnUpdate != nil { 207 wi.stopTimer() 208 wi.edsCallback(returnUpdate, nil) 209 } 210 211 return nil 212} 213