2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
19// Package v2 provides xDS v2 transport protocol specific functionality.
20package v2
22import (
23	"context"
24	"fmt"
26	"github.com/golang/protobuf/proto"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/internal/grpclog"
30	"google.golang.org/grpc/internal/pretty"
31	controllerversion "google.golang.org/grpc/xds/internal/xdsclient/controller/version"
32	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
33	xdsresourceversion "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
34	"google.golang.org/protobuf/types/known/anypb"
36	v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
37	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
38	v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
39	statuspb "google.golang.org/genproto/googleapis/rpc/status"
42func init() {
43	controllerversion.RegisterAPIClientBuilder(xdsresourceversion.TransportV2, newClient)
46var (
47	resourceTypeToURL = map[xdsresource.ResourceType]string{
48		xdsresource.ListenerResource:    xdsresourceversion.V2ListenerURL,
49		xdsresource.RouteConfigResource: xdsresourceversion.V2RouteConfigURL,
50		xdsresource.ClusterResource:     xdsresourceversion.V2ClusterURL,
51		xdsresource.EndpointsResource:   xdsresourceversion.V2EndpointsURL,
52	}
55func newClient(opts controllerversion.BuildOptions) (controllerversion.VersionedClient, error) {
56	nodeProto, ok := opts.NodeProto.(*v2corepb.Node)
57	if !ok {
58		return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil))
59	}
60	v2c := &client{nodeProto: nodeProto, logger: opts.Logger}
61	return v2c, nil
64type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
66// client performs the actual xDS RPCs using the xDS v2 API. It creates a
67// single ADS stream on which the different types of xDS requests and responses
68// are multiplexed.
69type client struct {
70	nodeProto *v2corepb.Node
71	logger    *grpclog.PrefixLogger
74func (v2c *client) NewStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
75	return v2adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx, grpc.WaitForReady(true))
78// SendRequest sends out a DiscoveryRequest for the given resourceNames, of type
79// rType, on the provided stream.
81// version is the ack version to be sent with the request
82// - If this is the new request (not an ack/nack), version will be empty.
83// - If this is an ack, version will be the version from the response.
84// - If this is a nack, version will be the previous acked version (from
85//   versionMap). If there was no ack before, it will be empty.
86func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsresource.ResourceType, version, nonce, errMsg string) error {
87	stream, ok := s.(adsStream)
88	if !ok {
89		return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
90	}
91	req := &v2xdspb.DiscoveryRequest{
92		Node:          v2c.nodeProto,
93		TypeUrl:       resourceTypeToURL[rType],
94		ResourceNames: resourceNames,
95		VersionInfo:   version,
96		ResponseNonce: nonce,
97	}
98	if errMsg != "" {
99		req.ErrorDetail = &statuspb.Status{
100			Code: int32(codes.InvalidArgument), Message: errMsg,
101		}
102	}
103	if err := stream.Send(req); err != nil {
104		return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
105	}
106	v2c.logger.Debugf("ADS request sent: %v", pretty.ToJSON(req))
107	return nil
110// RecvResponse blocks on the receipt of one response message on the provided
111// stream.
112func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
113	stream, ok := s.(adsStream)
114	if !ok {
115		return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s)
116	}
118	resp, err := stream.Recv()
119	if err != nil {
120		return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
121	}
122	v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
123	v2c.logger.Debugf("ADS response received: %v", pretty.ToJSON(resp))
124	return resp, nil
127func (v2c *client) ParseResponse(r proto.Message) (xdsresource.ResourceType, []*anypb.Any, string, string, error) {
128	rType := xdsresource.UnknownResource
129	resp, ok := r.(*v2xdspb.DiscoveryResponse)
130	if !ok {
131		return rType, nil, "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
132	}
134	// Note that the xDS transport protocol is versioned independently of
135	// the resource types, and it is supported to transfer older versions
136	// of resource types using new versions of the transport protocol, or
137	// vice-versa. Hence we need to handle v3 type_urls as well here.
138	var err error
139	url := resp.GetTypeUrl()
140	switch {
141	case xdsresource.IsListenerResource(url):
142		rType = xdsresource.ListenerResource
143	case xdsresource.IsRouteConfigResource(url):
144		rType = xdsresource.RouteConfigResource
145	case xdsresource.IsClusterResource(url):
146		rType = xdsresource.ClusterResource
147	case xdsresource.IsEndpointsResource(url):
148		rType = xdsresource.EndpointsResource
149	default:
150		return rType, nil, "", "", controllerversion.ErrResourceTypeUnsupported{
151			ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
152		}
153	}
154	return rType, resp.GetResources(), resp.GetVersionInfo(), resp.GetNonce(), err