1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21import (
22	"fmt"
23	"net"
24	"strings"
25
26	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
27	"github.com/golang/protobuf/ptypes"
28)
29
30// handleRDSResponse processes an RDS response received from the xDS server. On
31// receipt of a good response, it caches validated resources and also invokes
32// the registered watcher callback.
33func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
34	v2c.mu.Lock()
35	defer v2c.mu.Unlock()
36
37	if v2c.watchMap[ldsURL] == nil {
38		return fmt.Errorf("xds: unexpected RDS response when no LDS watcher is registered: %+v", resp)
39	}
40	target := v2c.watchMap[ldsURL].target[0]
41
42	wi := v2c.watchMap[rdsURL]
43	if wi == nil {
44		return fmt.Errorf("xds: no RDS watcher found when handling RDS response: %+v", resp)
45	}
46
47	returnCluster := ""
48	localCache := make(map[string]string)
49	for _, r := range resp.GetResources() {
50		var resource ptypes.DynamicAny
51		if err := ptypes.UnmarshalAny(r, &resource); err != nil {
52			return fmt.Errorf("xds: failed to unmarshal resource in RDS response: %v", err)
53		}
54		rc, ok := resource.Message.(*xdspb.RouteConfiguration)
55		if !ok {
56			return fmt.Errorf("xds: unexpected resource type: %T in RDS response", resource.Message)
57		}
58		cluster := getClusterFromRouteConfiguration(rc, target)
59		if cluster == "" {
60			return fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v", rc)
61		}
62
63		// If we get here, it means that this resource was a good one.
64		localCache[rc.GetName()] = cluster
65
66		// TODO: remove cache, and only process resources that are interesting.
67		if rc.GetName() == wi.target[0] {
68			returnCluster = cluster
69		}
70	}
71
72	// Update the cache in the v2Client only after we have confirmed that all
73	// resources in the received response were good.
74	for k, v := range localCache {
75		// TODO: Need to handle deletion of entries from the cache based on LDS
76		// watch calls. Not handling it does not affect correctness, but leads
77		// to unnecessary memory consumption.
78		v2c.rdsCache[k] = v
79	}
80
81	if returnCluster != "" {
82		// We stop the expiry timer and invoke the callback only when we have
83		// received the resource that we are watching for. Since RDS is an
84		// incremental protocol, the fact that we did not receive the resource
85		// that we are watching for in this response does not mean that the
86		// server does not know about it.
87		wi.stopTimer()
88		wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil)
89	}
90	return nil
91}
92
93// getClusterFromRouteConfiguration checks if the provided RouteConfiguration
94// meets the expected criteria. If so, it returns a non-empty clusterName.
95//
96// A RouteConfiguration resource is considered valid when only if it contains a
97// VirtualHost whose domain field matches the server name from the URI passed
98// to the gRPC channel, and it contains a clusterName.
99//
100// The RouteConfiguration includes a list of VirtualHosts, which may have zero
101// or more elements. We are interested in the element whose domains field
102// matches the server name specified in the "xds:" URI (with port, if any,
103// stripped off). The only field in the VirtualHost proto that the we are
104// interested in is the list of routes. We only look at the last route in the
105// list (the default route), whose match field must be empty and whose route
106// field must be set.  Inside that route message, the cluster field will
107// contain the clusterName we are looking for.
108func getClusterFromRouteConfiguration(rc *xdspb.RouteConfiguration, target string) string {
109	// TODO: return error for better error logging and nack.
110	//
111	// Currently this returns "" on error, and the caller will return an error.
112	// But the error doesn't contain details of why the response is invalid
113	// (mismatch domain or empty route).
114	//
115	// For logging purposes, we can log in line. But if we want to populate
116	// error details for nack, a detailed error needs to be returned.
117
118	host, err := hostFromTarget(target)
119	if err != nil {
120		return ""
121	}
122	for _, vh := range rc.GetVirtualHosts() {
123		for _, domain := range vh.GetDomains() {
124			// TODO: Add support for wildcard matching here?
125			if domain != host || len(vh.GetRoutes()) == 0 {
126				continue
127			}
128			dr := vh.Routes[len(vh.Routes)-1]
129			if match := dr.GetMatch(); match == nil || match.GetPrefix() != "" {
130				continue
131			}
132			route := dr.GetRoute()
133			if route == nil {
134				continue
135			}
136			return route.GetCluster()
137		}
138	}
139	return ""
140}
141
142// hostFromTarget calls net.SplitHostPort and returns the host.
143//
144// It returns the original string instead of error if port is missing.
145func hostFromTarget(target string) (string, error) {
146	const portMissingErrDesc = "missing port in address"
147	h, _, err := net.SplitHostPort(target)
148	if err != nil {
149		if addrErr, ok := err.(*net.AddrError); ok && strings.Contains(addrErr.Err, portMissingErrDesc) {
150			return target, nil
151		}
152		return "", err
153	}
154	return h, nil
155}
156