1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package xds
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"net"
26	"strings"
27	"sync"
28
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/credentials"
31	"google.golang.org/grpc/grpclog"
32	"google.golang.org/grpc/internal"
33	"google.golang.org/grpc/internal/buffer"
34	internalgrpclog "google.golang.org/grpc/internal/grpclog"
35	"google.golang.org/grpc/internal/grpcsync"
36	"google.golang.org/grpc/xds/internal/server"
37	"google.golang.org/grpc/xds/internal/xdsclient"
38)
39
40const serverPrefix = "[xds-server %p] "
41
42var (
43	// These new functions will be overridden in unit tests.
44	newXDSClient = func() (xdsclient.XDSClient, error) {
45		return xdsclient.New()
46	}
47	newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
48		return grpc.NewServer(opts...)
49	}
50
51	grpcGetServerCreds    = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
52	drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
53	logger                = grpclog.Component("xds")
54)
55
56func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger {
57	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p))
58}
59
60// grpcServer contains methods from grpc.Server which are used by the
61// GRPCServer type here. This is useful for overriding in unit tests.
62type grpcServer interface {
63	RegisterService(*grpc.ServiceDesc, interface{})
64	Serve(net.Listener) error
65	Stop()
66	GracefulStop()
67	GetServiceInfo() map[string]grpc.ServiceInfo
68}
69
70// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
71// communication with a management server using xDS APIs. It implements the
72// grpc.ServiceRegistrar interface and can be passed to service registration
73// functions in IDL generated code.
74type GRPCServer struct {
75	gs            grpcServer
76	quit          *grpcsync.Event
77	logger        *internalgrpclog.PrefixLogger
78	xdsCredsInUse bool
79	opts          *serverOptions
80
81	// clientMu is used only in initXDSClient(), which is called at the
82	// beginning of Serve(), where we have to decide if we have to create a
83	// client or use an existing one.
84	clientMu sync.Mutex
85	xdsC     xdsclient.XDSClient
86}
87
88// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
89// The underlying gRPC server has no service registered and has not started to
90// accept requests yet.
91func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
92	newOpts := []grpc.ServerOption{
93		grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
94		grpc.ChainStreamInterceptor(xdsStreamInterceptor),
95	}
96	newOpts = append(newOpts, opts...)
97	s := &GRPCServer{
98		gs:   newGRPCServer(newOpts...),
99		quit: grpcsync.NewEvent(),
100		opts: handleServerOptions(opts),
101	}
102	s.logger = prefixLogger(s)
103	s.logger.Infof("Created xds.GRPCServer")
104
105	// We type assert our underlying gRPC server to the real grpc.Server here
106	// before trying to retrieve the configured credentials. This approach
107	// avoids performing the same type assertion in the grpc package which
108	// provides the implementation for internal.GetServerCredentials, and allows
109	// us to use a fake gRPC server in tests.
110	if gs, ok := s.gs.(*grpc.Server); ok {
111		creds := grpcGetServerCreds(gs)
112		if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
113			s.xdsCredsInUse = true
114		}
115	}
116
117	s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
118	return s
119}
120
121// handleServerOptions iterates through the list of server options passed in by
122// the user, and handles the xDS server specific options.
123func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
124	so := &serverOptions{}
125	for _, opt := range opts {
126		if o, ok := opt.(*serverOption); ok {
127			o.apply(so)
128		}
129	}
130	return so
131}
132
133// RegisterService registers a service and its implementation to the underlying
134// gRPC server. It is called from the IDL generated code. This must be called
135// before invoking Serve.
136func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
137	s.gs.RegisterService(sd, ss)
138}
139
140// GetServiceInfo returns a map from service names to ServiceInfo.
141// Service names include the package names, in the form of <package>.<service>.
142func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
143	return s.gs.GetServiceInfo()
144}
145
146// initXDSClient creates a new xdsClient if there is no existing one available.
147func (s *GRPCServer) initXDSClient() error {
148	s.clientMu.Lock()
149	defer s.clientMu.Unlock()
150
151	if s.xdsC != nil {
152		return nil
153	}
154
155	newXDSClient := newXDSClient
156	if s.opts.bootstrapContents != nil {
157		newXDSClient = func() (xdsclient.XDSClient, error) {
158			return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents)
159		}
160	}
161	client, err := newXDSClient()
162	if err != nil {
163		return fmt.Errorf("xds: failed to create xds-client: %v", err)
164	}
165	s.xdsC = client
166	s.logger.Infof("Created an xdsClient")
167	return nil
168}
169
170// Serve gets the underlying gRPC server to accept incoming connections on the
171// listener lis, which is expected to be listening on a TCP port.
172//
173// A connection to the management server, to receive xDS configuration, is
174// initiated here.
175//
176// Serve will return a non-nil error unless Stop or GracefulStop is called.
177func (s *GRPCServer) Serve(lis net.Listener) error {
178	s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
179	if _, ok := lis.Addr().(*net.TCPAddr); !ok {
180		return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
181	}
182
183	// If this is the first time Serve() is being called, we need to initialize
184	// our xdsClient. If not, we can use the existing one.
185	if err := s.initXDSClient(); err != nil {
186		return err
187	}
188	cfg := s.xdsC.BootstrapConfig()
189	if cfg == nil {
190		return errors.New("bootstrap configuration is empty")
191	}
192
193	// If xds credentials were specified by the user, but bootstrap configs do
194	// not contain any certificate provider configuration, it is better to fail
195	// right now rather than failing when attempting to create certificate
196	// providers after receiving an LDS response with security configuration.
197	if s.xdsCredsInUse {
198		if len(cfg.CertProviderConfigs) == 0 {
199			return errors.New("xds: certificate_providers config missing in bootstrap file")
200		}
201	}
202
203	// The server listener resource name template from the bootstrap
204	// configuration contains a template for the name of the Listener resource
205	// to subscribe to for a gRPC server. If the token `%s` is present in the
206	// string, it will be replaced with the server's listening "IP:port" (e.g.,
207	// "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
208	// as an error since we do not have any default value for this.
209	if cfg.ServerListenerResourceNameTemplate == "" {
210		return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
211	}
212	name := cfg.ServerListenerResourceNameTemplate
213	if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") {
214		name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)
215	}
216
217	modeUpdateCh := buffer.NewUnbounded()
218	go func() {
219		s.handleServingModeChanges(modeUpdateCh)
220	}()
221
222	// Create a listenerWrapper which handles all functionality required by
223	// this particular instance of Serve().
224	lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
225		Listener:             lis,
226		ListenerResourceName: name,
227		XDSCredsInUse:        s.xdsCredsInUse,
228		XDSClient:            s.xdsC,
229		ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
230			modeUpdateCh.Put(&modeChangeArgs{
231				addr: addr,
232				mode: mode,
233				err:  err,
234			})
235		},
236	})
237
238	// Block until a good LDS response is received or the server is stopped.
239	select {
240	case <-s.quit.Done():
241		// Since the listener has not yet been handed over to gs.Serve(), we
242		// need to explicitly close the listener. Cancellation of the xDS watch
243		// is handled by the listenerWrapper.
244		lw.Close()
245		return nil
246	case <-goodUpdateCh:
247	}
248	return s.gs.Serve(lw)
249}
250
251// modeChangeArgs wraps argument required for invoking mode change callback.
252type modeChangeArgs struct {
253	addr net.Addr
254	mode server.ServingMode
255	err  error
256}
257
258// handleServingModeChanges runs as a separate goroutine, spawned from Serve().
259// It reads a channel on to which mode change arguments are pushed, and in turn
260// invokes the user registered callback. It also calls an internal method on the
261// underlying grpc.Server to gracefully close existing connections, if the
262// listener moved to a "not-serving" mode.
263func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
264	for {
265		select {
266		case <-s.quit.Done():
267			return
268		case u := <-updateCh.Get():
269			updateCh.Load()
270			args := u.(*modeChangeArgs)
271			if args.mode == ServingModeNotServing {
272				// We type assert our underlying gRPC server to the real
273				// grpc.Server here before trying to initiate the drain
274				// operation. This approach avoids performing the same type
275				// assertion in the grpc package which provides the
276				// implementation for internal.GetServerCredentials, and allows
277				// us to use a fake gRPC server in tests.
278				if gs, ok := s.gs.(*grpc.Server); ok {
279					drainServerTransports(gs, args.addr.String())
280				}
281			}
282			if s.opts.modeCallback != nil {
283				s.opts.modeCallback(args.addr, ServingModeChangeArgs{
284					Mode: args.mode,
285					Err:  args.err,
286				})
287			}
288		}
289	}
290}
291
292// Stop stops the underlying gRPC server. It immediately closes all open
293// connections. It cancels all active RPCs on the server side and the
294// corresponding pending RPCs on the client side will get notified by connection
295// errors.
296func (s *GRPCServer) Stop() {
297	s.quit.Fire()
298	s.gs.Stop()
299	if s.xdsC != nil {
300		s.xdsC.Close()
301	}
302}
303
304// GracefulStop stops the underlying gRPC server gracefully. It stops the server
305// from accepting new connections and RPCs and blocks until all the pending RPCs
306// are finished.
307func (s *GRPCServer) GracefulStop() {
308	s.quit.Fire()
309	s.gs.GracefulStop()
310	if s.xdsC != nil {
311		s.xdsC.Close()
312	}
313}
314
315// xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
316// perform any xDS specific functionality on unary RPCs.
317//
318// This is a no-op at this point.
319func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
320	return handler(ctx, req)
321}
322
323// xdsStreamInterceptor is the stream interceptor added to the gRPC server to
324// perform any xDS specific functionality on streaming RPCs.
325//
326// This is a no-op at this point.
327func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
328	return handler(srv, ss)
329}
330