1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package client 19 20import ( 21 "fmt" 22 "net" 23 "strconv" 24 25 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 26 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 27 endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" 28 typepb "github.com/envoyproxy/go-control-plane/envoy/type" 29 "github.com/golang/protobuf/ptypes" 30 "google.golang.org/grpc/grpclog" 31 "google.golang.org/grpc/xds/internal" 32) 33 34// OverloadDropConfig contains the config to drop overloads. 35type OverloadDropConfig struct { 36 Category string 37 Numerator uint32 38 Denominator uint32 39} 40 41// EndpointHealthStatus represents the health status of an endpoint. 42type EndpointHealthStatus int32 43 44const ( 45 // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. 46 EndpointHealthStatusUnknown EndpointHealthStatus = iota 47 // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. 48 EndpointHealthStatusHealthy 49 // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. 50 EndpointHealthStatusUnhealthy 51 // EndpointHealthStatusDraining represents HealthStatus DRAINING. 52 EndpointHealthStatusDraining 53 // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. 54 EndpointHealthStatusTimeout 55 // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. 56 EndpointHealthStatusDegraded 57) 58 59// Endpoint contains information of an endpoint. 60type Endpoint struct { 61 Address string 62 HealthStatus EndpointHealthStatus 63 Weight uint32 64} 65 66// Locality contains information of a locality. 67type Locality struct { 68 Endpoints []Endpoint 69 ID internal.Locality 70 Priority uint32 71 Weight uint32 72} 73 74// EDSUpdate contains an EDS update. 75type EDSUpdate struct { 76 Drops []OverloadDropConfig 77 Localities []Locality 78} 79 80func parseAddress(socketAddress *corepb.SocketAddress) string { 81 return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) 82} 83 84func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { 85 percentage := dropPolicy.GetDropPercentage() 86 var ( 87 numerator = percentage.GetNumerator() 88 denominator uint32 89 ) 90 switch percentage.GetDenominator() { 91 case typepb.FractionalPercent_HUNDRED: 92 denominator = 100 93 case typepb.FractionalPercent_TEN_THOUSAND: 94 denominator = 10000 95 case typepb.FractionalPercent_MILLION: 96 denominator = 1000000 97 } 98 return OverloadDropConfig{ 99 Category: dropPolicy.GetCategory(), 100 Numerator: numerator, 101 Denominator: denominator, 102 } 103} 104 105func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { 106 endpoints := make([]Endpoint, 0, len(lbEndpoints)) 107 for _, lbEndpoint := range lbEndpoints { 108 endpoints = append(endpoints, Endpoint{ 109 HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), 110 Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), 111 Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), 112 }) 113 } 114 return endpoints 115} 116 117// ParseEDSRespProto turns EDS response proto message to EDSUpdate. 118// 119// This is temporarily exported to be used in eds balancer, before it switches 120// to use xds client. TODO: unexport. 121func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { 122 ret := &EDSUpdate{} 123 for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { 124 ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) 125 } 126 priorities := make(map[uint32]struct{}) 127 for _, locality := range m.Endpoints { 128 l := locality.GetLocality() 129 if l == nil { 130 return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) 131 } 132 lid := internal.Locality{ 133 Region: l.Region, 134 Zone: l.Zone, 135 SubZone: l.SubZone, 136 } 137 priority := locality.GetPriority() 138 priorities[priority] = struct{}{} 139 ret.Localities = append(ret.Localities, Locality{ 140 ID: lid, 141 Endpoints: parseEndpoints(locality.GetLbEndpoints()), 142 Weight: locality.GetLoadBalancingWeight().GetValue(), 143 Priority: priority, 144 }) 145 } 146 for i := 0; i < len(priorities); i++ { 147 if _, ok := priorities[uint32(i)]; !ok { 148 return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) 149 } 150 } 151 return ret, nil 152} 153 154// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. 155// This is used by EDS balancer tests. 156// 157// TODO: delete this. The EDS balancer should build an EDSUpdate directly, 158// instead of building and parsing a proto message. 159func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { 160 u, err := ParseEDSRespProto(m) 161 if err != nil { 162 panic(err.Error()) 163 } 164 return u 165} 166 167func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { 168 v2c.mu.Lock() 169 defer v2c.mu.Unlock() 170 171 wi := v2c.watchMap[edsURL] 172 if wi == nil { 173 return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) 174 } 175 176 var returnUpdate *EDSUpdate 177 for _, r := range resp.GetResources() { 178 var resource ptypes.DynamicAny 179 if err := ptypes.UnmarshalAny(r, &resource); err != nil { 180 return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err) 181 } 182 cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment) 183 if !ok { 184 return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message) 185 } 186 187 if cla.GetClusterName() != wi.target[0] { 188 grpclog.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0]) 189 // We won't validate the remaining resources. If one of the 190 // uninteresting ones is invalid, we will still ACK the response. 191 continue 192 } 193 194 u, err := ParseEDSRespProto(cla) 195 if err != nil { 196 return err 197 } 198 199 returnUpdate = u 200 // Break from the loop because the request resource is found. But 201 // this also means we won't validate the remaining resources. If one 202 // of the uninteresting ones is invalid, we will still ACK the 203 // response. 204 break 205 } 206 207 if returnUpdate != nil { 208 wi.stopTimer() 209 wi.callback.(edsCallback)(returnUpdate, nil) 210 } 211 212 return nil 213} 214