1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package csds implements features to dump the status (xDS responses) the
20// xds_client is using.
21//
22// Notice: This package is EXPERIMENTAL and may be changed or removed in a later
23// release.
24package csds
25
26import (
27	"context"
28	"io"
29	"time"
30
31	v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
32	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
33	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
34	v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
35	v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
36	"github.com/golang/protobuf/proto"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/status"
40	"google.golang.org/grpc/xds/internal/xdsclient"
41	"google.golang.org/protobuf/types/known/timestamppb"
42
43	_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
44	_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
45)
46
47var (
48	logger       = grpclog.Component("xds")
49	newXDSClient = func() xdsclient.XDSClient {
50		c, err := xdsclient.New()
51		if err != nil {
52			logger.Warningf("failed to create xds client: %v", err)
53			return nil
54		}
55		return c
56	}
57)
58
59// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
60type ClientStatusDiscoveryServer struct {
61	// xdsClient will always be the same in practice. But we keep a copy in each
62	// server instance for testing.
63	xdsClient xdsclient.XDSClient
64}
65
66// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
67// registered on a gRPC server.
68func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
69	return &ClientStatusDiscoveryServer{xdsClient: newXDSClient()}, nil
70}
71
72// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
73func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.ClientStatusDiscoveryService_StreamClientStatusServer) error {
74	for {
75		req, err := stream.Recv()
76		if err == io.EOF {
77			return nil
78		}
79		if err != nil {
80			return err
81		}
82		resp, err := s.buildClientStatusRespForReq(req)
83		if err != nil {
84			return err
85		}
86		if err := stream.Send(resp); err != nil {
87			return err
88		}
89	}
90}
91
92// FetchClientStatus implementations interface ClientStatusDiscoveryServiceServer.
93func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
94	return s.buildClientStatusRespForReq(req)
95}
96
97// buildClientStatusRespForReq fetches the status from the client, and returns
98// the response to be sent back to xdsclient.
99//
100// If it returns an error, the error is a status error.
101func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
102	if s.xdsClient == nil {
103		return &v3statuspb.ClientStatusResponse{}, nil
104	}
105	// Field NodeMatchers is unsupported, by design
106	// https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching.
107	if len(req.NodeMatchers) != 0 {
108		return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
109	}
110
111	ret := &v3statuspb.ClientStatusResponse{
112		Config: []*v3statuspb.ClientConfig{
113			{
114				Node: nodeProtoToV3(s.xdsClient.BootstrapConfig().NodeProto),
115				XdsConfig: []*v3statuspb.PerXdsConfig{
116					s.buildLDSPerXDSConfig(),
117					s.buildRDSPerXDSConfig(),
118					s.buildCDSPerXDSConfig(),
119					s.buildEDSPerXDSConfig(),
120				},
121			},
122		},
123	}
124	return ret, nil
125}
126
127// Close cleans up the resources.
128func (s *ClientStatusDiscoveryServer) Close() {
129	if s.xdsClient != nil {
130		s.xdsClient.Close()
131	}
132}
133
134// nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap
135// config, it can be either v2.Node or v3.Node.
136//
137// If n is already a v3.Node, return it.
138// If n is v2.Node, marshal and unmarshal it to v3.
139// Otherwise, return nil.
140//
141// The default case (not v2 or v3) is nil, instead of error, because the
142// resources in the response are more important than the node. The worst case is
143// that the user will receive no Node info, but will still get resources.
144func nodeProtoToV3(n proto.Message) *v3corepb.Node {
145	var node *v3corepb.Node
146	switch nn := n.(type) {
147	case *v3corepb.Node:
148		node = nn
149	case *v2corepb.Node:
150		v2, err := proto.Marshal(nn)
151		if err != nil {
152			logger.Warningf("Failed to marshal node (%v): %v", n, err)
153			break
154		}
155		node = new(v3corepb.Node)
156		if err := proto.Unmarshal(v2, node); err != nil {
157			logger.Warningf("Failed to unmarshal node (%v): %v", v2, err)
158		}
159	default:
160		logger.Warningf("node from bootstrap is %#v, only v2.Node and v3.Node are supported", nn)
161	}
162	return node
163}
164
165func (s *ClientStatusDiscoveryServer) buildLDSPerXDSConfig() *v3statuspb.PerXdsConfig {
166	version, dump := s.xdsClient.DumpLDS()
167	var resources []*v3adminpb.ListenersConfigDump_DynamicListener
168	for name, d := range dump {
169		configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
170			Name:         name,
171			ClientStatus: serviceStatusToProto(d.MD.Status),
172		}
173		if (d.MD.Timestamp != time.Time{}) {
174			configDump.ActiveState = &v3adminpb.ListenersConfigDump_DynamicListenerState{
175				VersionInfo: d.MD.Version,
176				Listener:    d.Raw,
177				LastUpdated: timestamppb.New(d.MD.Timestamp),
178			}
179		}
180		if errState := d.MD.ErrState; errState != nil {
181			configDump.ErrorState = &v3adminpb.UpdateFailureState{
182				LastUpdateAttempt: timestamppb.New(errState.Timestamp),
183				Details:           errState.Err.Error(),
184				VersionInfo:       errState.Version,
185			}
186		}
187		resources = append(resources, configDump)
188	}
189	return &v3statuspb.PerXdsConfig{
190		PerXdsConfig: &v3statuspb.PerXdsConfig_ListenerConfig{
191			ListenerConfig: &v3adminpb.ListenersConfigDump{
192				VersionInfo:      version,
193				DynamicListeners: resources,
194			},
195		},
196	}
197}
198
199func (s *ClientStatusDiscoveryServer) buildRDSPerXDSConfig() *v3statuspb.PerXdsConfig {
200	_, dump := s.xdsClient.DumpRDS()
201	var resources []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
202	for _, d := range dump {
203		configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
204			VersionInfo:  d.MD.Version,
205			ClientStatus: serviceStatusToProto(d.MD.Status),
206		}
207		if (d.MD.Timestamp != time.Time{}) {
208			configDump.RouteConfig = d.Raw
209			configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
210		}
211		if errState := d.MD.ErrState; errState != nil {
212			configDump.ErrorState = &v3adminpb.UpdateFailureState{
213				LastUpdateAttempt: timestamppb.New(errState.Timestamp),
214				Details:           errState.Err.Error(),
215				VersionInfo:       errState.Version,
216			}
217		}
218		resources = append(resources, configDump)
219	}
220	return &v3statuspb.PerXdsConfig{
221		PerXdsConfig: &v3statuspb.PerXdsConfig_RouteConfig{
222			RouteConfig: &v3adminpb.RoutesConfigDump{
223				DynamicRouteConfigs: resources,
224			},
225		},
226	}
227}
228
229func (s *ClientStatusDiscoveryServer) buildCDSPerXDSConfig() *v3statuspb.PerXdsConfig {
230	version, dump := s.xdsClient.DumpCDS()
231	var resources []*v3adminpb.ClustersConfigDump_DynamicCluster
232	for _, d := range dump {
233		configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
234			VersionInfo:  d.MD.Version,
235			ClientStatus: serviceStatusToProto(d.MD.Status),
236		}
237		if (d.MD.Timestamp != time.Time{}) {
238			configDump.Cluster = d.Raw
239			configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
240		}
241		if errState := d.MD.ErrState; errState != nil {
242			configDump.ErrorState = &v3adminpb.UpdateFailureState{
243				LastUpdateAttempt: timestamppb.New(errState.Timestamp),
244				Details:           errState.Err.Error(),
245				VersionInfo:       errState.Version,
246			}
247		}
248		resources = append(resources, configDump)
249	}
250	return &v3statuspb.PerXdsConfig{
251		PerXdsConfig: &v3statuspb.PerXdsConfig_ClusterConfig{
252			ClusterConfig: &v3adminpb.ClustersConfigDump{
253				VersionInfo:           version,
254				DynamicActiveClusters: resources,
255			},
256		},
257	}
258}
259
260func (s *ClientStatusDiscoveryServer) buildEDSPerXDSConfig() *v3statuspb.PerXdsConfig {
261	_, dump := s.xdsClient.DumpEDS()
262	var resources []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
263	for _, d := range dump {
264		configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
265			VersionInfo:  d.MD.Version,
266			ClientStatus: serviceStatusToProto(d.MD.Status),
267		}
268		if (d.MD.Timestamp != time.Time{}) {
269			configDump.EndpointConfig = d.Raw
270			configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
271		}
272		if errState := d.MD.ErrState; errState != nil {
273			configDump.ErrorState = &v3adminpb.UpdateFailureState{
274				LastUpdateAttempt: timestamppb.New(errState.Timestamp),
275				Details:           errState.Err.Error(),
276				VersionInfo:       errState.Version,
277			}
278		}
279		resources = append(resources, configDump)
280	}
281	return &v3statuspb.PerXdsConfig{
282		PerXdsConfig: &v3statuspb.PerXdsConfig_EndpointConfig{
283			EndpointConfig: &v3adminpb.EndpointsConfigDump{
284				DynamicEndpointConfigs: resources,
285			},
286		},
287	}
288}
289
290func serviceStatusToProto(serviceStatus xdsclient.ServiceStatus) v3adminpb.ClientResourceStatus {
291	switch serviceStatus {
292	case xdsclient.ServiceStatusUnknown:
293		return v3adminpb.ClientResourceStatus_UNKNOWN
294	case xdsclient.ServiceStatusRequested:
295		return v3adminpb.ClientResourceStatus_REQUESTED
296	case xdsclient.ServiceStatusNotExist:
297		return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
298	case xdsclient.ServiceStatusACKed:
299		return v3adminpb.ClientResourceStatus_ACKED
300	case xdsclient.ServiceStatusNACKed:
301		return v3adminpb.ClientResourceStatus_NACKED
302	default:
303		return v3adminpb.ClientResourceStatus_UNKNOWN
304	}
305}
306