1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package v2 provides xDS v2 transport protocol specific functionality. 20package v2 21 22import ( 23 "context" 24 "fmt" 25 26 "github.com/golang/protobuf/proto" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/internal/grpclog" 30 "google.golang.org/grpc/internal/pretty" 31 controllerversion "google.golang.org/grpc/xds/internal/xdsclient/controller/version" 32 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" 33 xdsresourceversion "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" 34 "google.golang.org/protobuf/types/known/anypb" 35 36 v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 37 v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 38 v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 39 statuspb "google.golang.org/genproto/googleapis/rpc/status" 40) 41 42func init() { 43 controllerversion.RegisterAPIClientBuilder(xdsresourceversion.TransportV2, newClient) 44} 45 46var ( 47 resourceTypeToURL = map[xdsresource.ResourceType]string{ 48 xdsresource.ListenerResource: xdsresourceversion.V2ListenerURL, 49 xdsresource.RouteConfigResource: xdsresourceversion.V2RouteConfigURL, 50 xdsresource.ClusterResource: xdsresourceversion.V2ClusterURL, 51 xdsresource.EndpointsResource: xdsresourceversion.V2EndpointsURL, 52 } 53) 54 55func newClient(opts controllerversion.BuildOptions) (controllerversion.VersionedClient, error) { 56 nodeProto, ok := opts.NodeProto.(*v2corepb.Node) 57 if !ok { 58 return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil)) 59 } 60 v2c := &client{nodeProto: nodeProto, logger: opts.Logger} 61 return v2c, nil 62} 63 64type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient 65 66// client performs the actual xDS RPCs using the xDS v2 API. It creates a 67// single ADS stream on which the different types of xDS requests and responses 68// are multiplexed. 69type client struct { 70 nodeProto *v2corepb.Node 71 logger *grpclog.PrefixLogger 72} 73 74func (v2c *client) NewStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { 75 return v2adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx, grpc.WaitForReady(true)) 76} 77 78// SendRequest sends out a DiscoveryRequest for the given resourceNames, of type 79// rType, on the provided stream. 80// 81// version is the ack version to be sent with the request 82// - If this is the new request (not an ack/nack), version will be empty. 83// - If this is an ack, version will be the version from the response. 84// - If this is a nack, version will be the previous acked version (from 85// versionMap). If there was no ack before, it will be empty. 86func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsresource.ResourceType, version, nonce, errMsg string) error { 87 stream, ok := s.(adsStream) 88 if !ok { 89 return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) 90 } 91 req := &v2xdspb.DiscoveryRequest{ 92 Node: v2c.nodeProto, 93 TypeUrl: resourceTypeToURL[rType], 94 ResourceNames: resourceNames, 95 VersionInfo: version, 96 ResponseNonce: nonce, 97 } 98 if errMsg != "" { 99 req.ErrorDetail = &statuspb.Status{ 100 Code: int32(codes.InvalidArgument), Message: errMsg, 101 } 102 } 103 if err := stream.Send(req); err != nil { 104 return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) 105 } 106 v2c.logger.Debugf("ADS request sent: %v", pretty.ToJSON(req)) 107 return nil 108} 109 110// RecvResponse blocks on the receipt of one response message on the provided 111// stream. 112func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { 113 stream, ok := s.(adsStream) 114 if !ok { 115 return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s) 116 } 117 118 resp, err := stream.Recv() 119 if err != nil { 120 return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) 121 } 122 v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) 123 v2c.logger.Debugf("ADS response received: %v", pretty.ToJSON(resp)) 124 return resp, nil 125} 126 127func (v2c *client) ParseResponse(r proto.Message) (xdsresource.ResourceType, []*anypb.Any, string, string, error) { 128 rType := xdsresource.UnknownResource 129 resp, ok := r.(*v2xdspb.DiscoveryResponse) 130 if !ok { 131 return rType, nil, "", "", fmt.Errorf("xds: unsupported message type: %T", resp) 132 } 133 134 // Note that the xDS transport protocol is versioned independently of 135 // the resource types, and it is supported to transfer older versions 136 // of resource types using new versions of the transport protocol, or 137 // vice-versa. Hence we need to handle v3 type_urls as well here. 138 var err error 139 url := resp.GetTypeUrl() 140 switch { 141 case xdsresource.IsListenerResource(url): 142 rType = xdsresource.ListenerResource 143 case xdsresource.IsRouteConfigResource(url): 144 rType = xdsresource.RouteConfigResource 145 case xdsresource.IsClusterResource(url): 146 rType = xdsresource.ClusterResource 147 case xdsresource.IsEndpointsResource(url): 148 rType = xdsresource.EndpointsResource 149 default: 150 return rType, nil, "", "", controllerversion.ErrResourceTypeUnsupported{ 151 ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()), 152 } 153 } 154 return rType, resp.GetResources(), resp.GetVersionInfo(), resp.GetNonce(), err 155} 156