1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package xds 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "net" 26 "strings" 27 "sync" 28 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/credentials" 31 "google.golang.org/grpc/grpclog" 32 "google.golang.org/grpc/internal" 33 "google.golang.org/grpc/internal/buffer" 34 internalgrpclog "google.golang.org/grpc/internal/grpclog" 35 "google.golang.org/grpc/internal/grpcsync" 36 xdsclient "google.golang.org/grpc/xds/internal/client" 37 "google.golang.org/grpc/xds/internal/client/bootstrap" 38 "google.golang.org/grpc/xds/internal/server" 39) 40 41const serverPrefix = "[xds-server %p] " 42 43var ( 44 // These new functions will be overridden in unit tests. 45 newXDSClient = func() (xdsClientInterface, error) { 46 return xdsclient.New() 47 } 48 newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface { 49 return grpc.NewServer(opts...) 50 } 51 52 grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials) 53 drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string)) 54 logger = grpclog.Component("xds") 55) 56 57func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger { 58 return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p)) 59} 60 61// xdsClientInterface contains methods from xdsClient.Client which are used by 62// the server. This is useful for overriding in unit tests. 63type xdsClientInterface interface { 64 WatchListener(string, func(xdsclient.ListenerUpdate, error)) func() 65 BootstrapConfig() *bootstrap.Config 66 Close() 67} 68 69// grpcServerInterface contains methods from grpc.Server which are used by the 70// GRPCServer type here. This is useful for overriding in unit tests. 71type grpcServerInterface interface { 72 RegisterService(*grpc.ServiceDesc, interface{}) 73 Serve(net.Listener) error 74 Stop() 75 GracefulStop() 76} 77 78// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by 79// communication with a management server using xDS APIs. It implements the 80// grpc.ServiceRegistrar interface and can be passed to service registration 81// functions in IDL generated code. 82type GRPCServer struct { 83 gs grpcServerInterface 84 quit *grpcsync.Event 85 logger *internalgrpclog.PrefixLogger 86 xdsCredsInUse bool 87 opts *serverOptions 88 89 // clientMu is used only in initXDSClient(), which is called at the 90 // beginning of Serve(), where we have to decide if we have to create a 91 // client or use an existing one. 92 clientMu sync.Mutex 93 xdsC xdsClientInterface 94} 95 96// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts. 97// The underlying gRPC server has no service registered and has not started to 98// accept requests yet. 99func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { 100 newOpts := []grpc.ServerOption{ 101 grpc.ChainUnaryInterceptor(xdsUnaryInterceptor), 102 grpc.ChainStreamInterceptor(xdsStreamInterceptor), 103 } 104 newOpts = append(newOpts, opts...) 105 s := &GRPCServer{ 106 gs: newGRPCServer(newOpts...), 107 quit: grpcsync.NewEvent(), 108 opts: handleServerOptions(opts), 109 } 110 s.logger = prefixLogger(s) 111 s.logger.Infof("Created xds.GRPCServer") 112 113 // We type assert our underlying gRPC server to the real grpc.Server here 114 // before trying to retrieve the configured credentials. This approach 115 // avoids performing the same type assertion in the grpc package which 116 // provides the implementation for internal.GetServerCredentials, and allows 117 // us to use a fake gRPC server in tests. 118 if gs, ok := s.gs.(*grpc.Server); ok { 119 creds := grpcGetServerCreds(gs) 120 if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { 121 s.xdsCredsInUse = true 122 } 123 } 124 125 s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse) 126 return s 127} 128 129// handleServerOptions iterates through the list of server options passed in by 130// the user, and handles the xDS server specific options. 131func handleServerOptions(opts []grpc.ServerOption) *serverOptions { 132 so := &serverOptions{} 133 for _, opt := range opts { 134 if o, ok := opt.(serverOption); ok { 135 o.applyServerOption(so) 136 } 137 } 138 return so 139} 140 141// RegisterService registers a service and its implementation to the underlying 142// gRPC server. It is called from the IDL generated code. This must be called 143// before invoking Serve. 144func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) { 145 s.gs.RegisterService(sd, ss) 146} 147 148// initXDSClient creates a new xdsClient if there is no existing one available. 149func (s *GRPCServer) initXDSClient() error { 150 s.clientMu.Lock() 151 defer s.clientMu.Unlock() 152 153 if s.xdsC != nil { 154 return nil 155 } 156 157 client, err := newXDSClient() 158 if err != nil { 159 return fmt.Errorf("xds: failed to create xds-client: %v", err) 160 } 161 s.xdsC = client 162 s.logger.Infof("Created an xdsClient") 163 return nil 164} 165 166// Serve gets the underlying gRPC server to accept incoming connections on the 167// listener lis, which is expected to be listening on a TCP port. 168// 169// A connection to the management server, to receive xDS configuration, is 170// initiated here. 171// 172// Serve will return a non-nil error unless Stop or GracefulStop is called. 173func (s *GRPCServer) Serve(lis net.Listener) error { 174 s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String()) 175 if _, ok := lis.Addr().(*net.TCPAddr); !ok { 176 return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr()) 177 } 178 179 // If this is the first time Serve() is being called, we need to initialize 180 // our xdsClient. If not, we can use the existing one. 181 if err := s.initXDSClient(); err != nil { 182 return err 183 } 184 185 cfg := s.xdsC.BootstrapConfig() 186 if cfg == nil { 187 return errors.New("bootstrap configuration is empty") 188 } 189 190 // If xds credentials were specified by the user, but bootstrap configs do 191 // not contain any certificate provider configuration, it is better to fail 192 // right now rather than failing when attempting to create certificate 193 // providers after receiving an LDS response with security configuration. 194 if s.xdsCredsInUse { 195 if len(cfg.CertProviderConfigs) == 0 { 196 return errors.New("xds: certificate_providers config missing in bootstrap file") 197 } 198 } 199 200 // The server listener resource name template from the bootstrap 201 // configuration contains a template for the name of the Listener resource 202 // to subscribe to for a gRPC server. If the token `%s` is present in the 203 // string, it will be replaced with the server's listening "IP:port" (e.g., 204 // "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated 205 // as an error since we do not have any default value for this. 206 if cfg.ServerListenerResourceNameTemplate == "" { 207 return errors.New("missing server_listener_resource_name_template in the bootstrap configuration") 208 } 209 name := cfg.ServerListenerResourceNameTemplate 210 if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") { 211 name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1) 212 } 213 214 modeUpdateCh := buffer.NewUnbounded() 215 go func() { 216 s.handleServingModeChanges(modeUpdateCh) 217 }() 218 219 // Create a listenerWrapper which handles all functionality required by 220 // this particular instance of Serve(). 221 lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{ 222 Listener: lis, 223 ListenerResourceName: name, 224 XDSCredsInUse: s.xdsCredsInUse, 225 XDSClient: s.xdsC, 226 ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) { 227 modeUpdateCh.Put(&modeChangeArgs{ 228 addr: addr, 229 mode: mode, 230 err: err, 231 }) 232 }, 233 }) 234 235 // Block until a good LDS response is received or the server is stopped. 236 select { 237 case <-s.quit.Done(): 238 // Since the listener has not yet been handed over to gs.Serve(), we 239 // need to explicitly close the listener. Cancellation of the xDS watch 240 // is handled by the listenerWrapper. 241 lw.Close() 242 return nil 243 case <-goodUpdateCh: 244 } 245 return s.gs.Serve(lw) 246} 247 248// modeChangeArgs wraps argument required for invoking mode change callback. 249type modeChangeArgs struct { 250 addr net.Addr 251 mode server.ServingMode 252 err error 253} 254 255// handleServingModeChanges runs as a separate goroutine, spawned from Serve(). 256// It reads a channel on to which mode change arguments are pushed, and in turn 257// invokes the user registered callback. It also calls an internal method on the 258// underlying grpc.Server to gracefully close existing connections, if the 259// listener moved to a "not-serving" mode. 260func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) { 261 for { 262 select { 263 case <-s.quit.Done(): 264 return 265 case u := <-updateCh.Get(): 266 updateCh.Load() 267 args := u.(*modeChangeArgs) 268 if args.mode == ServingModeNotServing { 269 // We type assert our underlying gRPC server to the real 270 // grpc.Server here before trying to initiate the drain 271 // operation. This approach avoids performing the same type 272 // assertion in the grpc package which provides the 273 // implementation for internal.GetServerCredentials, and allows 274 // us to use a fake gRPC server in tests. 275 if gs, ok := s.gs.(*grpc.Server); ok { 276 drainServerTransports(gs, args.addr.String()) 277 } 278 } 279 if s.opts.modeCallback != nil { 280 s.opts.modeCallback(args.addr, ServingModeChangeArgs{ 281 Mode: args.mode, 282 Err: args.err, 283 }) 284 } 285 } 286 } 287} 288 289// Stop stops the underlying gRPC server. It immediately closes all open 290// connections. It cancels all active RPCs on the server side and the 291// corresponding pending RPCs on the client side will get notified by connection 292// errors. 293func (s *GRPCServer) Stop() { 294 s.quit.Fire() 295 s.gs.Stop() 296 if s.xdsC != nil { 297 s.xdsC.Close() 298 } 299} 300 301// GracefulStop stops the underlying gRPC server gracefully. It stops the server 302// from accepting new connections and RPCs and blocks until all the pending RPCs 303// are finished. 304func (s *GRPCServer) GracefulStop() { 305 s.quit.Fire() 306 s.gs.GracefulStop() 307 if s.xdsC != nil { 308 s.xdsC.Close() 309 } 310} 311 312// xdsUnaryInterceptor is the unary interceptor added to the gRPC server to 313// perform any xDS specific functionality on unary RPCs. 314// 315// This is a no-op at this point. 316func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { 317 return handler(ctx, req) 318} 319 320// xdsStreamInterceptor is the stream interceptor added to the gRPC server to 321// perform any xDS specific functionality on streaming RPCs. 322// 323// This is a no-op at this point. 324func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 325 return handler(srv, ss) 326} 327