1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package xds 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "net" 26 "strings" 27 "sync" 28 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/credentials" 31 "google.golang.org/grpc/grpclog" 32 "google.golang.org/grpc/internal" 33 "google.golang.org/grpc/internal/buffer" 34 internalgrpclog "google.golang.org/grpc/internal/grpclog" 35 "google.golang.org/grpc/internal/grpcsync" 36 "google.golang.org/grpc/xds/internal/server" 37 "google.golang.org/grpc/xds/internal/xdsclient" 38) 39 40const serverPrefix = "[xds-server %p] " 41 42var ( 43 // These new functions will be overridden in unit tests. 44 newXDSClient = func() (xdsclient.XDSClient, error) { 45 return xdsclient.New() 46 } 47 newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { 48 return grpc.NewServer(opts...) 49 } 50 51 grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials) 52 drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string)) 53 logger = grpclog.Component("xds") 54) 55 56func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger { 57 return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p)) 58} 59 60// grpcServer contains methods from grpc.Server which are used by the 61// GRPCServer type here. This is useful for overriding in unit tests. 62type grpcServer interface { 63 RegisterService(*grpc.ServiceDesc, interface{}) 64 Serve(net.Listener) error 65 Stop() 66 GracefulStop() 67 GetServiceInfo() map[string]grpc.ServiceInfo 68} 69 70// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by 71// communication with a management server using xDS APIs. It implements the 72// grpc.ServiceRegistrar interface and can be passed to service registration 73// functions in IDL generated code. 74type GRPCServer struct { 75 gs grpcServer 76 quit *grpcsync.Event 77 logger *internalgrpclog.PrefixLogger 78 xdsCredsInUse bool 79 opts *serverOptions 80 81 // clientMu is used only in initXDSClient(), which is called at the 82 // beginning of Serve(), where we have to decide if we have to create a 83 // client or use an existing one. 84 clientMu sync.Mutex 85 xdsC xdsclient.XDSClient 86} 87 88// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts. 89// The underlying gRPC server has no service registered and has not started to 90// accept requests yet. 91func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { 92 newOpts := []grpc.ServerOption{ 93 grpc.ChainUnaryInterceptor(xdsUnaryInterceptor), 94 grpc.ChainStreamInterceptor(xdsStreamInterceptor), 95 } 96 newOpts = append(newOpts, opts...) 97 s := &GRPCServer{ 98 gs: newGRPCServer(newOpts...), 99 quit: grpcsync.NewEvent(), 100 opts: handleServerOptions(opts), 101 } 102 s.logger = prefixLogger(s) 103 s.logger.Infof("Created xds.GRPCServer") 104 105 // We type assert our underlying gRPC server to the real grpc.Server here 106 // before trying to retrieve the configured credentials. This approach 107 // avoids performing the same type assertion in the grpc package which 108 // provides the implementation for internal.GetServerCredentials, and allows 109 // us to use a fake gRPC server in tests. 110 if gs, ok := s.gs.(*grpc.Server); ok { 111 creds := grpcGetServerCreds(gs) 112 if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { 113 s.xdsCredsInUse = true 114 } 115 } 116 117 s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse) 118 return s 119} 120 121// handleServerOptions iterates through the list of server options passed in by 122// the user, and handles the xDS server specific options. 123func handleServerOptions(opts []grpc.ServerOption) *serverOptions { 124 so := &serverOptions{} 125 for _, opt := range opts { 126 if o, ok := opt.(*serverOption); ok { 127 o.apply(so) 128 } 129 } 130 return so 131} 132 133// RegisterService registers a service and its implementation to the underlying 134// gRPC server. It is called from the IDL generated code. This must be called 135// before invoking Serve. 136func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) { 137 s.gs.RegisterService(sd, ss) 138} 139 140// GetServiceInfo returns a map from service names to ServiceInfo. 141// Service names include the package names, in the form of <package>.<service>. 142func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo { 143 return s.gs.GetServiceInfo() 144} 145 146// initXDSClient creates a new xdsClient if there is no existing one available. 147func (s *GRPCServer) initXDSClient() error { 148 s.clientMu.Lock() 149 defer s.clientMu.Unlock() 150 151 if s.xdsC != nil { 152 return nil 153 } 154 155 newXDSClient := newXDSClient 156 if s.opts.bootstrapContents != nil { 157 newXDSClient = func() (xdsclient.XDSClient, error) { 158 return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents) 159 } 160 } 161 client, err := newXDSClient() 162 if err != nil { 163 return fmt.Errorf("xds: failed to create xds-client: %v", err) 164 } 165 s.xdsC = client 166 s.logger.Infof("Created an xdsClient") 167 return nil 168} 169 170// Serve gets the underlying gRPC server to accept incoming connections on the 171// listener lis, which is expected to be listening on a TCP port. 172// 173// A connection to the management server, to receive xDS configuration, is 174// initiated here. 175// 176// Serve will return a non-nil error unless Stop or GracefulStop is called. 177func (s *GRPCServer) Serve(lis net.Listener) error { 178 s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String()) 179 if _, ok := lis.Addr().(*net.TCPAddr); !ok { 180 return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr()) 181 } 182 183 // If this is the first time Serve() is being called, we need to initialize 184 // our xdsClient. If not, we can use the existing one. 185 if err := s.initXDSClient(); err != nil { 186 return err 187 } 188 cfg := s.xdsC.BootstrapConfig() 189 if cfg == nil { 190 return errors.New("bootstrap configuration is empty") 191 } 192 193 // If xds credentials were specified by the user, but bootstrap configs do 194 // not contain any certificate provider configuration, it is better to fail 195 // right now rather than failing when attempting to create certificate 196 // providers after receiving an LDS response with security configuration. 197 if s.xdsCredsInUse { 198 if len(cfg.CertProviderConfigs) == 0 { 199 return errors.New("xds: certificate_providers config missing in bootstrap file") 200 } 201 } 202 203 // The server listener resource name template from the bootstrap 204 // configuration contains a template for the name of the Listener resource 205 // to subscribe to for a gRPC server. If the token `%s` is present in the 206 // string, it will be replaced with the server's listening "IP:port" (e.g., 207 // "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated 208 // as an error since we do not have any default value for this. 209 if cfg.ServerListenerResourceNameTemplate == "" { 210 return errors.New("missing server_listener_resource_name_template in the bootstrap configuration") 211 } 212 name := cfg.ServerListenerResourceNameTemplate 213 if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") { 214 name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1) 215 } 216 217 modeUpdateCh := buffer.NewUnbounded() 218 go func() { 219 s.handleServingModeChanges(modeUpdateCh) 220 }() 221 222 // Create a listenerWrapper which handles all functionality required by 223 // this particular instance of Serve(). 224 lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{ 225 Listener: lis, 226 ListenerResourceName: name, 227 XDSCredsInUse: s.xdsCredsInUse, 228 XDSClient: s.xdsC, 229 ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) { 230 modeUpdateCh.Put(&modeChangeArgs{ 231 addr: addr, 232 mode: mode, 233 err: err, 234 }) 235 }, 236 }) 237 238 // Block until a good LDS response is received or the server is stopped. 239 select { 240 case <-s.quit.Done(): 241 // Since the listener has not yet been handed over to gs.Serve(), we 242 // need to explicitly close the listener. Cancellation of the xDS watch 243 // is handled by the listenerWrapper. 244 lw.Close() 245 return nil 246 case <-goodUpdateCh: 247 } 248 return s.gs.Serve(lw) 249} 250 251// modeChangeArgs wraps argument required for invoking mode change callback. 252type modeChangeArgs struct { 253 addr net.Addr 254 mode server.ServingMode 255 err error 256} 257 258// handleServingModeChanges runs as a separate goroutine, spawned from Serve(). 259// It reads a channel on to which mode change arguments are pushed, and in turn 260// invokes the user registered callback. It also calls an internal method on the 261// underlying grpc.Server to gracefully close existing connections, if the 262// listener moved to a "not-serving" mode. 263func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) { 264 for { 265 select { 266 case <-s.quit.Done(): 267 return 268 case u := <-updateCh.Get(): 269 updateCh.Load() 270 args := u.(*modeChangeArgs) 271 if args.mode == ServingModeNotServing { 272 // We type assert our underlying gRPC server to the real 273 // grpc.Server here before trying to initiate the drain 274 // operation. This approach avoids performing the same type 275 // assertion in the grpc package which provides the 276 // implementation for internal.GetServerCredentials, and allows 277 // us to use a fake gRPC server in tests. 278 if gs, ok := s.gs.(*grpc.Server); ok { 279 drainServerTransports(gs, args.addr.String()) 280 } 281 } 282 if s.opts.modeCallback != nil { 283 s.opts.modeCallback(args.addr, ServingModeChangeArgs{ 284 Mode: args.mode, 285 Err: args.err, 286 }) 287 } 288 } 289 } 290} 291 292// Stop stops the underlying gRPC server. It immediately closes all open 293// connections. It cancels all active RPCs on the server side and the 294// corresponding pending RPCs on the client side will get notified by connection 295// errors. 296func (s *GRPCServer) Stop() { 297 s.quit.Fire() 298 s.gs.Stop() 299 if s.xdsC != nil { 300 s.xdsC.Close() 301 } 302} 303 304// GracefulStop stops the underlying gRPC server gracefully. It stops the server 305// from accepting new connections and RPCs and blocks until all the pending RPCs 306// are finished. 307func (s *GRPCServer) GracefulStop() { 308 s.quit.Fire() 309 s.gs.GracefulStop() 310 if s.xdsC != nil { 311 s.xdsC.Close() 312 } 313} 314 315// xdsUnaryInterceptor is the unary interceptor added to the gRPC server to 316// perform any xDS specific functionality on unary RPCs. 317// 318// This is a no-op at this point. 319func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { 320 return handler(ctx, req) 321} 322 323// xdsStreamInterceptor is the stream interceptor added to the gRPC server to 324// perform any xDS specific functionality on streaming RPCs. 325// 326// This is a no-op at this point. 327func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 328 return handler(srv, ss) 329} 330