1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "fmt" 23 "net" 24 "strings" 25 26 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 27 "github.com/golang/protobuf/ptypes" 28) 29 30// handleRDSResponse processes an RDS response received from the xDS server. On 31// receipt of a good response, it caches validated resources and also invokes 32// the registered watcher callback. 33func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { 34 v2c.mu.Lock() 35 defer v2c.mu.Unlock() 36 37 if v2c.watchMap[ldsURL] == nil { 38 return fmt.Errorf("xds: unexpected RDS response when no LDS watcher is registered: %+v", resp) 39 } 40 target := v2c.watchMap[ldsURL].target[0] 41 42 wi := v2c.watchMap[rdsURL] 43 if wi == nil { 44 return fmt.Errorf("xds: no RDS watcher found when handling RDS response: %+v", resp) 45 } 46 47 returnCluster := "" 48 localCache := make(map[string]string) 49 for _, r := range resp.GetResources() { 50 var resource ptypes.DynamicAny 51 if err := ptypes.UnmarshalAny(r, &resource); err != nil { 52 return fmt.Errorf("xds: failed to unmarshal resource in RDS response: %v", err) 53 } 54 rc, ok := resource.Message.(*xdspb.RouteConfiguration) 55 if !ok { 56 return fmt.Errorf("xds: unexpected resource type: %T in RDS response", resource.Message) 57 } 58 cluster := getClusterFromRouteConfiguration(rc, target) 59 if cluster == "" { 60 return fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v", rc) 61 } 62 63 // If we get here, it means that this resource was a good one. 64 localCache[rc.GetName()] = cluster 65 66 // TODO: remove cache, and only process resources that are interesting. 67 if rc.GetName() == wi.target[0] { 68 returnCluster = cluster 69 } 70 } 71 72 // Update the cache in the v2Client only after we have confirmed that all 73 // resources in the received response were good. 74 for k, v := range localCache { 75 // TODO: Need to handle deletion of entries from the cache based on LDS 76 // watch calls. Not handling it does not affect correctness, but leads 77 // to unnecessary memory consumption. 78 v2c.rdsCache[k] = v 79 } 80 81 if returnCluster != "" { 82 // We stop the expiry timer and invoke the callback only when we have 83 // received the resource that we are watching for. Since RDS is an 84 // incremental protocol, the fact that we did not receive the resource 85 // that we are watching for in this response does not mean that the 86 // server does not know about it. 87 wi.stopTimer() 88 wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil) 89 } 90 return nil 91} 92 93// getClusterFromRouteConfiguration checks if the provided RouteConfiguration 94// meets the expected criteria. If so, it returns a non-empty clusterName. 95// 96// A RouteConfiguration resource is considered valid when only if it contains a 97// VirtualHost whose domain field matches the server name from the URI passed 98// to the gRPC channel, and it contains a clusterName. 99// 100// The RouteConfiguration includes a list of VirtualHosts, which may have zero 101// or more elements. We are interested in the element whose domains field 102// matches the server name specified in the "xds:" URI (with port, if any, 103// stripped off). The only field in the VirtualHost proto that the we are 104// interested in is the list of routes. We only look at the last route in the 105// list (the default route), whose match field must be empty and whose route 106// field must be set. Inside that route message, the cluster field will 107// contain the clusterName we are looking for. 108func getClusterFromRouteConfiguration(rc *xdspb.RouteConfiguration, target string) string { 109 // TODO: return error for better error logging and nack. 110 // 111 // Currently this returns "" on error, and the caller will return an error. 112 // But the error doesn't contain details of why the response is invalid 113 // (mismatch domain or empty route). 114 // 115 // For logging purposes, we can log in line. But if we want to populate 116 // error details for nack, a detailed error needs to be returned. 117 118 host, err := hostFromTarget(target) 119 if err != nil { 120 return "" 121 } 122 for _, vh := range rc.GetVirtualHosts() { 123 for _, domain := range vh.GetDomains() { 124 // TODO: Add support for wildcard matching here? 125 if domain != host || len(vh.GetRoutes()) == 0 { 126 continue 127 } 128 dr := vh.Routes[len(vh.Routes)-1] 129 if match := dr.GetMatch(); match == nil || match.GetPrefix() != "" { 130 continue 131 } 132 route := dr.GetRoute() 133 if route == nil { 134 continue 135 } 136 return route.GetCluster() 137 } 138 } 139 return "" 140} 141 142// hostFromTarget calls net.SplitHostPort and returns the host. 143// 144// It returns the original string instead of error if port is missing. 145func hostFromTarget(target string) (string, error) { 146 const portMissingErrDesc = "missing port in address" 147 h, _, err := net.SplitHostPort(target) 148 if err != nil { 149 if addrErr, ok := err.(*net.AddrError); ok && strings.Contains(addrErr.Err, portMissingErrDesc) { 150 return target, nil 151 } 152 return "", err 153 } 154 return h, nil 155} 156