1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21import (
22	"fmt"
23
24	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
25	"github.com/golang/protobuf/ptypes"
26)
27
28// handleCDSResponse processes an CDS response received from the xDS server. On
29// receipt of a good response, it also invokes the registered watcher callback.
30func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
31	v2c.mu.Lock()
32	defer v2c.mu.Unlock()
33
34	wi := v2c.watchMap[cdsURL]
35	if wi == nil {
36		return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
37	}
38
39	var returnUpdate CDSUpdate
40	localCache := make(map[string]CDSUpdate)
41	for _, r := range resp.GetResources() {
42		var resource ptypes.DynamicAny
43		if err := ptypes.UnmarshalAny(r, &resource); err != nil {
44			return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err)
45		}
46		cluster, ok := resource.Message.(*xdspb.Cluster)
47		if !ok {
48			return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
49		}
50		v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v", cluster.GetName(), cluster, cluster)
51		update, err := validateCluster(cluster)
52		if err != nil {
53			return err
54		}
55
56		// If the Cluster message in the CDS response did not contain a
57		// serviceName, we will just use the clusterName for EDS.
58		if update.ServiceName == "" {
59			update.ServiceName = cluster.GetName()
60		}
61		localCache[cluster.GetName()] = update
62		v2c.logger.Debugf("Resource with name %v, type %T, value %+v added to cache", cluster.GetName(), update, update)
63		if cluster.GetName() == wi.target[0] {
64			returnUpdate = update
65		}
66	}
67	v2c.cdsCache = localCache
68
69	var err error
70	if returnUpdate.ServiceName == "" {
71		err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
72	}
73	wi.stopTimer()
74	wi.callback.(cdsCallback)(returnUpdate, err)
75	return nil
76}
77
78func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) {
79	emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false}
80	switch {
81	case cluster.GetType() != xdspb.Cluster_EDS:
82		return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
83	case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
84		return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
85	case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
86		return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
87	}
88
89	return CDSUpdate{
90		ServiceName: cluster.GetEdsClusterConfig().GetServiceName(),
91		EnableLRS:   cluster.GetLrsServer().GetSelf() != nil,
92	}, nil
93}
94