1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "fmt" 23 24 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 25 "github.com/golang/protobuf/ptypes" 26) 27 28// handleCDSResponse processes an CDS response received from the xDS server. On 29// receipt of a good response, it also invokes the registered watcher callback. 30func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { 31 v2c.mu.Lock() 32 defer v2c.mu.Unlock() 33 34 wi := v2c.watchMap[cdsURL] 35 if wi == nil { 36 return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) 37 } 38 39 var returnUpdate CDSUpdate 40 localCache := make(map[string]CDSUpdate) 41 for _, r := range resp.GetResources() { 42 var resource ptypes.DynamicAny 43 if err := ptypes.UnmarshalAny(r, &resource); err != nil { 44 return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err) 45 } 46 cluster, ok := resource.Message.(*xdspb.Cluster) 47 if !ok { 48 return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) 49 } 50 v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v", cluster.GetName(), cluster, cluster) 51 update, err := validateCluster(cluster) 52 if err != nil { 53 return err 54 } 55 56 // If the Cluster message in the CDS response did not contain a 57 // serviceName, we will just use the clusterName for EDS. 58 if update.ServiceName == "" { 59 update.ServiceName = cluster.GetName() 60 } 61 localCache[cluster.GetName()] = update 62 v2c.logger.Debugf("Resource with name %v, type %T, value %+v added to cache", cluster.GetName(), update, update) 63 if cluster.GetName() == wi.target[0] { 64 returnUpdate = update 65 } 66 } 67 v2c.cdsCache = localCache 68 69 var err error 70 if returnUpdate.ServiceName == "" { 71 err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) 72 } 73 wi.stopTimer() 74 wi.callback.(cdsCallback)(returnUpdate, err) 75 return nil 76} 77 78func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) { 79 emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} 80 switch { 81 case cluster.GetType() != xdspb.Cluster_EDS: 82 return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) 83 case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil: 84 return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster) 85 case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN: 86 return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) 87 } 88 89 return CDSUpdate{ 90 ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), 91 EnableLRS: cluster.GetLrsServer().GetSelf() != nil, 92 }, nil 93} 94