1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package xds
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"net"
26	"strings"
27	"sync"
28
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/credentials"
31	"google.golang.org/grpc/grpclog"
32	"google.golang.org/grpc/internal"
33	"google.golang.org/grpc/internal/buffer"
34	internalgrpclog "google.golang.org/grpc/internal/grpclog"
35	"google.golang.org/grpc/internal/grpcsync"
36	xdsclient "google.golang.org/grpc/xds/internal/client"
37	"google.golang.org/grpc/xds/internal/client/bootstrap"
38	"google.golang.org/grpc/xds/internal/server"
39)
40
41const serverPrefix = "[xds-server %p] "
42
43var (
44	// These new functions will be overridden in unit tests.
45	newXDSClient = func() (xdsClientInterface, error) {
46		return xdsclient.New()
47	}
48	newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface {
49		return grpc.NewServer(opts...)
50	}
51
52	grpcGetServerCreds    = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
53	drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
54	logger                = grpclog.Component("xds")
55)
56
57func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger {
58	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p))
59}
60
61// xdsClientInterface contains methods from xdsClient.Client which are used by
62// the server. This is useful for overriding in unit tests.
63type xdsClientInterface interface {
64	WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
65	BootstrapConfig() *bootstrap.Config
66	Close()
67}
68
69// grpcServerInterface contains methods from grpc.Server which are used by the
70// GRPCServer type here. This is useful for overriding in unit tests.
71type grpcServerInterface interface {
72	RegisterService(*grpc.ServiceDesc, interface{})
73	Serve(net.Listener) error
74	Stop()
75	GracefulStop()
76}
77
78// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
79// communication with a management server using xDS APIs. It implements the
80// grpc.ServiceRegistrar interface and can be passed to service registration
81// functions in IDL generated code.
82type GRPCServer struct {
83	gs            grpcServerInterface
84	quit          *grpcsync.Event
85	logger        *internalgrpclog.PrefixLogger
86	xdsCredsInUse bool
87	opts          *serverOptions
88
89	// clientMu is used only in initXDSClient(), which is called at the
90	// beginning of Serve(), where we have to decide if we have to create a
91	// client or use an existing one.
92	clientMu sync.Mutex
93	xdsC     xdsClientInterface
94}
95
96// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
97// The underlying gRPC server has no service registered and has not started to
98// accept requests yet.
99func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
100	newOpts := []grpc.ServerOption{
101		grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
102		grpc.ChainStreamInterceptor(xdsStreamInterceptor),
103	}
104	newOpts = append(newOpts, opts...)
105	s := &GRPCServer{
106		gs:   newGRPCServer(newOpts...),
107		quit: grpcsync.NewEvent(),
108		opts: handleServerOptions(opts),
109	}
110	s.logger = prefixLogger(s)
111	s.logger.Infof("Created xds.GRPCServer")
112
113	// We type assert our underlying gRPC server to the real grpc.Server here
114	// before trying to retrieve the configured credentials. This approach
115	// avoids performing the same type assertion in the grpc package which
116	// provides the implementation for internal.GetServerCredentials, and allows
117	// us to use a fake gRPC server in tests.
118	if gs, ok := s.gs.(*grpc.Server); ok {
119		creds := grpcGetServerCreds(gs)
120		if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
121			s.xdsCredsInUse = true
122		}
123	}
124
125	s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
126	return s
127}
128
129// handleServerOptions iterates through the list of server options passed in by
130// the user, and handles the xDS server specific options.
131func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
132	so := &serverOptions{}
133	for _, opt := range opts {
134		if o, ok := opt.(serverOption); ok {
135			o.applyServerOption(so)
136		}
137	}
138	return so
139}
140
141// RegisterService registers a service and its implementation to the underlying
142// gRPC server. It is called from the IDL generated code. This must be called
143// before invoking Serve.
144func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
145	s.gs.RegisterService(sd, ss)
146}
147
148// initXDSClient creates a new xdsClient if there is no existing one available.
149func (s *GRPCServer) initXDSClient() error {
150	s.clientMu.Lock()
151	defer s.clientMu.Unlock()
152
153	if s.xdsC != nil {
154		return nil
155	}
156
157	client, err := newXDSClient()
158	if err != nil {
159		return fmt.Errorf("xds: failed to create xds-client: %v", err)
160	}
161	s.xdsC = client
162	s.logger.Infof("Created an xdsClient")
163	return nil
164}
165
166// Serve gets the underlying gRPC server to accept incoming connections on the
167// listener lis, which is expected to be listening on a TCP port.
168//
169// A connection to the management server, to receive xDS configuration, is
170// initiated here.
171//
172// Serve will return a non-nil error unless Stop or GracefulStop is called.
173func (s *GRPCServer) Serve(lis net.Listener) error {
174	s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
175	if _, ok := lis.Addr().(*net.TCPAddr); !ok {
176		return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
177	}
178
179	// If this is the first time Serve() is being called, we need to initialize
180	// our xdsClient. If not, we can use the existing one.
181	if err := s.initXDSClient(); err != nil {
182		return err
183	}
184
185	cfg := s.xdsC.BootstrapConfig()
186	if cfg == nil {
187		return errors.New("bootstrap configuration is empty")
188	}
189
190	// If xds credentials were specified by the user, but bootstrap configs do
191	// not contain any certificate provider configuration, it is better to fail
192	// right now rather than failing when attempting to create certificate
193	// providers after receiving an LDS response with security configuration.
194	if s.xdsCredsInUse {
195		if len(cfg.CertProviderConfigs) == 0 {
196			return errors.New("xds: certificate_providers config missing in bootstrap file")
197		}
198	}
199
200	// The server listener resource name template from the bootstrap
201	// configuration contains a template for the name of the Listener resource
202	// to subscribe to for a gRPC server. If the token `%s` is present in the
203	// string, it will be replaced with the server's listening "IP:port" (e.g.,
204	// "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
205	// as an error since we do not have any default value for this.
206	if cfg.ServerListenerResourceNameTemplate == "" {
207		return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
208	}
209	name := cfg.ServerListenerResourceNameTemplate
210	if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") {
211		name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)
212	}
213
214	modeUpdateCh := buffer.NewUnbounded()
215	go func() {
216		s.handleServingModeChanges(modeUpdateCh)
217	}()
218
219	// Create a listenerWrapper which handles all functionality required by
220	// this particular instance of Serve().
221	lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
222		Listener:             lis,
223		ListenerResourceName: name,
224		XDSCredsInUse:        s.xdsCredsInUse,
225		XDSClient:            s.xdsC,
226		ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
227			modeUpdateCh.Put(&modeChangeArgs{
228				addr: addr,
229				mode: mode,
230				err:  err,
231			})
232		},
233	})
234
235	// Block until a good LDS response is received or the server is stopped.
236	select {
237	case <-s.quit.Done():
238		// Since the listener has not yet been handed over to gs.Serve(), we
239		// need to explicitly close the listener. Cancellation of the xDS watch
240		// is handled by the listenerWrapper.
241		lw.Close()
242		return nil
243	case <-goodUpdateCh:
244	}
245	return s.gs.Serve(lw)
246}
247
248// modeChangeArgs wraps argument required for invoking mode change callback.
249type modeChangeArgs struct {
250	addr net.Addr
251	mode server.ServingMode
252	err  error
253}
254
255// handleServingModeChanges runs as a separate goroutine, spawned from Serve().
256// It reads a channel on to which mode change arguments are pushed, and in turn
257// invokes the user registered callback. It also calls an internal method on the
258// underlying grpc.Server to gracefully close existing connections, if the
259// listener moved to a "not-serving" mode.
260func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
261	for {
262		select {
263		case <-s.quit.Done():
264			return
265		case u := <-updateCh.Get():
266			updateCh.Load()
267			args := u.(*modeChangeArgs)
268			if args.mode == ServingModeNotServing {
269				// We type assert our underlying gRPC server to the real
270				// grpc.Server here before trying to initiate the drain
271				// operation. This approach avoids performing the same type
272				// assertion in the grpc package which provides the
273				// implementation for internal.GetServerCredentials, and allows
274				// us to use a fake gRPC server in tests.
275				if gs, ok := s.gs.(*grpc.Server); ok {
276					drainServerTransports(gs, args.addr.String())
277				}
278			}
279			if s.opts.modeCallback != nil {
280				s.opts.modeCallback(args.addr, ServingModeChangeArgs{
281					Mode: args.mode,
282					Err:  args.err,
283				})
284			}
285		}
286	}
287}
288
289// Stop stops the underlying gRPC server. It immediately closes all open
290// connections. It cancels all active RPCs on the server side and the
291// corresponding pending RPCs on the client side will get notified by connection
292// errors.
293func (s *GRPCServer) Stop() {
294	s.quit.Fire()
295	s.gs.Stop()
296	if s.xdsC != nil {
297		s.xdsC.Close()
298	}
299}
300
301// GracefulStop stops the underlying gRPC server gracefully. It stops the server
302// from accepting new connections and RPCs and blocks until all the pending RPCs
303// are finished.
304func (s *GRPCServer) GracefulStop() {
305	s.quit.Fire()
306	s.gs.GracefulStop()
307	if s.xdsC != nil {
308		s.xdsC.Close()
309	}
310}
311
312// xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
313// perform any xDS specific functionality on unary RPCs.
314//
315// This is a no-op at this point.
316func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
317	return handler(ctx, req)
318}
319
320// xdsStreamInterceptor is the stream interceptor added to the gRPC server to
321// perform any xDS specific functionality on streaming RPCs.
322//
323// This is a no-op at this point.
324func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
325	return handler(srv, ss)
326}
327