1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package v2 provides xDS v2 transport protocol specific functionality. 20package v2 21 22import ( 23 "context" 24 "fmt" 25 26 "github.com/golang/protobuf/proto" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/internal/grpclog" 30 xdsclient "google.golang.org/grpc/xds/internal/client" 31 "google.golang.org/grpc/xds/internal/version" 32 33 v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 34 v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 35 v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 36 statuspb "google.golang.org/genproto/googleapis/rpc/status" 37) 38 39func init() { 40 xdsclient.RegisterAPIClientBuilder(clientBuilder{}) 41} 42 43var ( 44 resourceTypeToURL = map[xdsclient.ResourceType]string{ 45 xdsclient.ListenerResource: version.V2ListenerURL, 46 xdsclient.RouteConfigResource: version.V2RouteConfigURL, 47 xdsclient.ClusterResource: version.V2ClusterURL, 48 xdsclient.EndpointsResource: version.V2EndpointsURL, 49 } 50) 51 52type clientBuilder struct{} 53 54func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { 55 return newClient(cc, opts) 56} 57 58func (clientBuilder) Version() version.TransportAPI { 59 return version.TransportV2 60} 61 62func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { 63 nodeProto, ok := opts.NodeProto.(*v2corepb.Node) 64 if !ok { 65 return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil)) 66 } 67 v2c := &client{ 68 cc: cc, 69 parent: opts.Parent, 70 nodeProto: nodeProto, 71 logger: opts.Logger, 72 } 73 v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) 74 v2c.TransportHelper = xdsclient.NewTransportHelper(v2c, opts.Logger, opts.Backoff) 75 return v2c, nil 76} 77 78type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient 79 80// client performs the actual xDS RPCs using the xDS v2 API. It creates a 81// single ADS stream on which the different types of xDS requests and responses 82// are multiplexed. 83type client struct { 84 *xdsclient.TransportHelper 85 86 ctx context.Context 87 cancelCtx context.CancelFunc 88 parent xdsclient.UpdateHandler 89 logger *grpclog.PrefixLogger 90 91 // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. 92 cc *grpc.ClientConn 93 nodeProto *v2corepb.Node 94} 95 96func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { 97 return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) 98} 99 100// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type 101// rType, on the provided stream. 102// 103// version is the ack version to be sent with the request 104// - If this is the new request (not an ack/nack), version will be empty. 105// - If this is an ack, version will be the version from the response. 106// - If this is a nack, version will be the previous acked version (from 107// versionMap). If there was no ack before, it will be empty. 108func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error { 109 stream, ok := s.(adsStream) 110 if !ok { 111 return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) 112 } 113 req := &v2xdspb.DiscoveryRequest{ 114 Node: v2c.nodeProto, 115 TypeUrl: resourceTypeToURL[rType], 116 ResourceNames: resourceNames, 117 VersionInfo: version, 118 ResponseNonce: nonce, 119 } 120 if errMsg != "" { 121 req.ErrorDetail = &statuspb.Status{ 122 Code: int32(codes.InvalidArgument), Message: errMsg, 123 } 124 } 125 if err := stream.Send(req); err != nil { 126 return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) 127 } 128 v2c.logger.Debugf("ADS request sent: %v", req) 129 return nil 130} 131 132// RecvResponse blocks on the receipt of one response message on the provided 133// stream. 134func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { 135 stream, ok := s.(adsStream) 136 if !ok { 137 return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s) 138 } 139 140 resp, err := stream.Recv() 141 if err != nil { 142 // TODO: call watch callbacks with error when stream is broken. 143 return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) 144 } 145 v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) 146 v2c.logger.Debugf("ADS response received: %v", resp) 147 return resp, nil 148} 149 150func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) { 151 rType := xdsclient.UnknownResource 152 resp, ok := r.(*v2xdspb.DiscoveryResponse) 153 if !ok { 154 return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp) 155 } 156 157 // Note that the xDS transport protocol is versioned independently of 158 // the resource types, and it is supported to transfer older versions 159 // of resource types using new versions of the transport protocol, or 160 // vice-versa. Hence we need to handle v3 type_urls as well here. 161 var err error 162 url := resp.GetTypeUrl() 163 switch { 164 case xdsclient.IsListenerResource(url): 165 err = v2c.handleLDSResponse(resp) 166 rType = xdsclient.ListenerResource 167 case xdsclient.IsRouteConfigResource(url): 168 err = v2c.handleRDSResponse(resp) 169 rType = xdsclient.RouteConfigResource 170 case xdsclient.IsClusterResource(url): 171 err = v2c.handleCDSResponse(resp) 172 rType = xdsclient.ClusterResource 173 case xdsclient.IsEndpointsResource(url): 174 err = v2c.handleEDSResponse(resp) 175 rType = xdsclient.EndpointsResource 176 default: 177 return rType, "", "", xdsclient.ErrResourceTypeUnsupported{ 178 ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()), 179 } 180 } 181 return rType, resp.GetVersionInfo(), resp.GetNonce(), err 182} 183 184// handleLDSResponse processes an LDS response received from the management 185// server. On receipt of a good response, it also invokes the registered watcher 186// callback. 187func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error { 188 update, md, err := xdsclient.UnmarshalListener(resp.GetVersionInfo(), resp.GetResources(), v2c.logger) 189 v2c.parent.NewListeners(update, md) 190 return err 191} 192 193// handleRDSResponse processes an RDS response received from the management 194// server. On receipt of a good response, it caches validated resources and also 195// invokes the registered watcher callback. 196func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error { 197 update, md, err := xdsclient.UnmarshalRouteConfig(resp.GetVersionInfo(), resp.GetResources(), v2c.logger) 198 v2c.parent.NewRouteConfigs(update, md) 199 return err 200} 201 202// handleCDSResponse processes an CDS response received from the management 203// server. On receipt of a good response, it also invokes the registered watcher 204// callback. 205func (v2c *client) handleCDSResponse(resp *v2xdspb.DiscoveryResponse) error { 206 update, md, err := xdsclient.UnmarshalCluster(resp.GetVersionInfo(), resp.GetResources(), v2c.logger) 207 v2c.parent.NewClusters(update, md) 208 return err 209} 210 211func (v2c *client) handleEDSResponse(resp *v2xdspb.DiscoveryResponse) error { 212 update, md, err := xdsclient.UnmarshalEndpoints(resp.GetVersionInfo(), resp.GetResources(), v2c.logger) 213 v2c.parent.NewEndpoints(update, md) 214 return err 215} 216