1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package v2 provides xDS v2 transport protocol specific functionality.
20package v2
21
22import (
23	"context"
24	"fmt"
25
26	"github.com/golang/protobuf/proto"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/internal/grpclog"
30	xdsclient "google.golang.org/grpc/xds/internal/client"
31	"google.golang.org/grpc/xds/internal/version"
32
33	v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
34	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
35	v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
36	statuspb "google.golang.org/genproto/googleapis/rpc/status"
37)
38
39func init() {
40	xdsclient.RegisterAPIClientBuilder(clientBuilder{})
41}
42
43var (
44	resourceTypeToURL = map[xdsclient.ResourceType]string{
45		xdsclient.ListenerResource:    version.V2ListenerURL,
46		xdsclient.RouteConfigResource: version.V2RouteConfigURL,
47		xdsclient.ClusterResource:     version.V2ClusterURL,
48		xdsclient.EndpointsResource:   version.V2EndpointsURL,
49	}
50)
51
52type clientBuilder struct{}
53
54func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
55	return newClient(cc, opts)
56}
57
58func (clientBuilder) Version() version.TransportAPI {
59	return version.TransportV2
60}
61
62func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
63	nodeProto, ok := opts.NodeProto.(*v2corepb.Node)
64	if !ok {
65		return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil))
66	}
67	v2c := &client{
68		cc:        cc,
69		parent:    opts.Parent,
70		nodeProto: nodeProto,
71		logger:    opts.Logger,
72	}
73	v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
74	v2c.TransportHelper = xdsclient.NewTransportHelper(v2c, opts.Logger, opts.Backoff)
75	return v2c, nil
76}
77
78type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
79
80// client performs the actual xDS RPCs using the xDS v2 API. It creates a
81// single ADS stream on which the different types of xDS requests and responses
82// are multiplexed.
83type client struct {
84	*xdsclient.TransportHelper
85
86	ctx       context.Context
87	cancelCtx context.CancelFunc
88	parent    xdsclient.UpdateHandler
89	logger    *grpclog.PrefixLogger
90
91	// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
92	cc        *grpc.ClientConn
93	nodeProto *v2corepb.Node
94}
95
96func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
97	return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true))
98}
99
100// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type
101// rType, on the provided stream.
102//
103// version is the ack version to be sent with the request
104// - If this is the new request (not an ack/nack), version will be empty.
105// - If this is an ack, version will be the version from the response.
106// - If this is a nack, version will be the previous acked version (from
107//   versionMap). If there was no ack before, it will be empty.
108func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
109	stream, ok := s.(adsStream)
110	if !ok {
111		return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
112	}
113	req := &v2xdspb.DiscoveryRequest{
114		Node:          v2c.nodeProto,
115		TypeUrl:       resourceTypeToURL[rType],
116		ResourceNames: resourceNames,
117		VersionInfo:   version,
118		ResponseNonce: nonce,
119	}
120	if errMsg != "" {
121		req.ErrorDetail = &statuspb.Status{
122			Code: int32(codes.InvalidArgument), Message: errMsg,
123		}
124	}
125	if err := stream.Send(req); err != nil {
126		return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
127	}
128	v2c.logger.Debugf("ADS request sent: %v", req)
129	return nil
130}
131
132// RecvResponse blocks on the receipt of one response message on the provided
133// stream.
134func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
135	stream, ok := s.(adsStream)
136	if !ok {
137		return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s)
138	}
139
140	resp, err := stream.Recv()
141	if err != nil {
142		// TODO: call watch callbacks with error when stream is broken.
143		return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
144	}
145	v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
146	v2c.logger.Debugf("ADS response received: %v", resp)
147	return resp, nil
148}
149
150func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) {
151	rType := xdsclient.UnknownResource
152	resp, ok := r.(*v2xdspb.DiscoveryResponse)
153	if !ok {
154		return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
155	}
156
157	// Note that the xDS transport protocol is versioned independently of
158	// the resource types, and it is supported to transfer older versions
159	// of resource types using new versions of the transport protocol, or
160	// vice-versa. Hence we need to handle v3 type_urls as well here.
161	var err error
162	url := resp.GetTypeUrl()
163	switch {
164	case xdsclient.IsListenerResource(url):
165		err = v2c.handleLDSResponse(resp)
166		rType = xdsclient.ListenerResource
167	case xdsclient.IsRouteConfigResource(url):
168		err = v2c.handleRDSResponse(resp)
169		rType = xdsclient.RouteConfigResource
170	case xdsclient.IsClusterResource(url):
171		err = v2c.handleCDSResponse(resp)
172		rType = xdsclient.ClusterResource
173	case xdsclient.IsEndpointsResource(url):
174		err = v2c.handleEDSResponse(resp)
175		rType = xdsclient.EndpointsResource
176	default:
177		return rType, "", "", xdsclient.ErrResourceTypeUnsupported{
178			ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
179		}
180	}
181	return rType, resp.GetVersionInfo(), resp.GetNonce(), err
182}
183
184// handleLDSResponse processes an LDS response received from the management
185// server. On receipt of a good response, it also invokes the registered watcher
186// callback.
187func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error {
188	update, md, err := xdsclient.UnmarshalListener(resp.GetVersionInfo(), resp.GetResources(), v2c.logger)
189	v2c.parent.NewListeners(update, md)
190	return err
191}
192
193// handleRDSResponse processes an RDS response received from the management
194// server. On receipt of a good response, it caches validated resources and also
195// invokes the registered watcher callback.
196func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error {
197	update, md, err := xdsclient.UnmarshalRouteConfig(resp.GetVersionInfo(), resp.GetResources(), v2c.logger)
198	v2c.parent.NewRouteConfigs(update, md)
199	return err
200}
201
202// handleCDSResponse processes an CDS response received from the management
203// server. On receipt of a good response, it also invokes the registered watcher
204// callback.
205func (v2c *client) handleCDSResponse(resp *v2xdspb.DiscoveryResponse) error {
206	update, md, err := xdsclient.UnmarshalCluster(resp.GetVersionInfo(), resp.GetResources(), v2c.logger)
207	v2c.parent.NewClusters(update, md)
208	return err
209}
210
211func (v2c *client) handleEDSResponse(resp *v2xdspb.DiscoveryResponse) error {
212	update, md, err := xdsclient.UnmarshalEndpoints(resp.GetVersionInfo(), resp.GetResources(), v2c.logger)
213	v2c.parent.NewEndpoints(update, md)
214	return err
215}
216