1/* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package server contains internal server-side functionality used by the public 20// facing xds package. 21package server 22 23import ( 24 "fmt" 25 "net" 26 "sync" 27 "time" 28 29 "google.golang.org/grpc/backoff" 30 "google.golang.org/grpc/grpclog" 31 internalbackoff "google.golang.org/grpc/internal/backoff" 32 internalgrpclog "google.golang.org/grpc/internal/grpclog" 33 "google.golang.org/grpc/internal/grpcsync" 34 xdsclient "google.golang.org/grpc/xds/internal/client" 35 "google.golang.org/grpc/xds/internal/client/bootstrap" 36) 37 38var ( 39 logger = grpclog.Component("xds") 40 41 // Backoff strategy for temporary errors received from Accept(). If this 42 // needs to be configurable, we can inject it through ListenerWrapperParams. 43 bs = internalbackoff.Exponential{Config: backoff.Config{ 44 BaseDelay: 5 * time.Millisecond, 45 Multiplier: 2.0, 46 MaxDelay: 1 * time.Second, 47 }} 48 backoffFunc = bs.Backoff 49) 50 51// ServingMode indicates the current mode of operation of the server. 52// 53// This API exactly mirrors the one in the public xds package. We have to 54// redefine it here to avoid a cyclic dependency. 55type ServingMode int 56 57const ( 58 // ServingModeStarting indicates that the serving is starting up. 59 ServingModeStarting ServingMode = iota 60 // ServingModeServing indicates the the server contains all required xDS 61 // configuration is serving RPCs. 62 ServingModeServing 63 // ServingModeNotServing indicates that the server is not accepting new 64 // connections. Existing connections will be closed gracefully, allowing 65 // in-progress RPCs to complete. A server enters this mode when it does not 66 // contain the required xDS configuration to serve RPCs. 67 ServingModeNotServing 68) 69 70func (s ServingMode) String() string { 71 switch s { 72 case ServingModeNotServing: 73 return "not-serving" 74 case ServingModeServing: 75 return "serving" 76 default: 77 return "starting" 78 } 79} 80 81// ServingModeCallback is the callback that users can register to get notified 82// about the server's serving mode changes. The callback is invoked with the 83// address of the listener and its new mode. The err parameter is set to a 84// non-nil error if the server has transitioned into not-serving mode. 85type ServingModeCallback func(addr net.Addr, mode ServingMode, err error) 86 87func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger { 88 return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p)) 89} 90 91// XDSClientInterface wraps the methods on the xdsClient which are required by 92// the listenerWrapper. 93type XDSClientInterface interface { 94 WatchListener(string, func(xdsclient.ListenerUpdate, error)) func() 95 BootstrapConfig() *bootstrap.Config 96} 97 98// ListenerWrapperParams wraps parameters required to create a listenerWrapper. 99type ListenerWrapperParams struct { 100 // Listener is the net.Listener passed by the user that is to be wrapped. 101 Listener net.Listener 102 // ListenerResourceName is the xDS Listener resource to request. 103 ListenerResourceName string 104 // XDSCredsInUse specifies whether or not the user expressed interest to 105 // receive security configuration from the control plane. 106 XDSCredsInUse bool 107 // XDSClient provides the functionality from the xdsClient required here. 108 XDSClient XDSClientInterface 109 // ModeCallback is the callback to invoke when the serving mode changes. 110 ModeCallback ServingModeCallback 111} 112 113// NewListenerWrapper creates a new listenerWrapper with params. It returns a 114// net.Listener and a channel which is written to, indicating that the former is 115// ready to be passed to grpc.Serve(). 116// 117// Only TCP listeners are supported. 118func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan struct{}) { 119 lw := &listenerWrapper{ 120 Listener: params.Listener, 121 name: params.ListenerResourceName, 122 xdsCredsInUse: params.XDSCredsInUse, 123 xdsC: params.XDSClient, 124 modeCallback: params.ModeCallback, 125 isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(), 126 127 closed: grpcsync.NewEvent(), 128 goodUpdate: grpcsync.NewEvent(), 129 } 130 lw.logger = prefixLogger(lw) 131 132 // Serve() verifies that Addr() returns a valid TCPAddr. So, it is safe to 133 // ignore the error from SplitHostPort(). 134 lisAddr := lw.Listener.Addr().String() 135 lw.addr, lw.port, _ = net.SplitHostPort(lisAddr) 136 137 cancelWatch := lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate) 138 lw.logger.Infof("Watch started on resource name %v", lw.name) 139 lw.cancelWatch = func() { 140 cancelWatch() 141 lw.logger.Infof("Watch cancelled on resource name %v", lw.name) 142 } 143 return lw, lw.goodUpdate.Done() 144} 145 146// listenerWrapper wraps the net.Listener associated with the listening address 147// passed to Serve(). It also contains all other state associated with this 148// particular invocation of Serve(). 149type listenerWrapper struct { 150 net.Listener 151 logger *internalgrpclog.PrefixLogger 152 153 name string 154 xdsCredsInUse bool 155 xdsC XDSClientInterface 156 cancelWatch func() 157 modeCallback ServingModeCallback 158 159 // Set to true if the listener is bound to the IP_ANY address (which is 160 // "0.0.0.0" for IPv4 and "::" for IPv6). 161 isUnspecifiedAddr bool 162 // Listening address and port. Used to validate the socket address in the 163 // Listener resource received from the control plane. 164 addr, port string 165 166 // This is used to notify that a good update has been received and that 167 // Serve() can be invoked on the underlying gRPC server. Using an event 168 // instead of a vanilla channel simplifies the update handler as it need not 169 // keep track of whether the received update is the first one or not. 170 goodUpdate *grpcsync.Event 171 // A small race exists in the xdsClient code between the receipt of an xDS 172 // response and the user cancelling the associated watch. In this window, 173 // the registered callback may be invoked after the watch is canceled, and 174 // the user is expected to work around this. This event signifies that the 175 // listener is closed (and hence the watch is cancelled), and we drop any 176 // updates received in the callback if this event has fired. 177 closed *grpcsync.Event 178 179 // mu guards access to the current serving mode and the filter chains. The 180 // reason for using an rw lock here is that these fields are read in 181 // Accept() for all incoming connections, but writes happen rarely (when we 182 // get a Listener resource update). 183 mu sync.RWMutex 184 // Current serving mode. 185 mode ServingMode 186 // Filter chains received as part of the last good update. 187 filterChains *xdsclient.FilterChainManager 188} 189 190// Accept blocks on an Accept() on the underlying listener, and wraps the 191// returned net.connWrapper with the configured certificate providers. 192func (l *listenerWrapper) Accept() (net.Conn, error) { 193 var retries int 194 for { 195 conn, err := l.Listener.Accept() 196 if err != nil { 197 // Temporary() method is implemented by certain error types returned 198 // from the net package, and it is useful for us to not shutdown the 199 // server in these conditions. The listen queue being full is one 200 // such case. 201 if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() { 202 return nil, err 203 } 204 retries++ 205 timer := time.NewTimer(backoffFunc(retries)) 206 select { 207 case <-timer.C: 208 case <-l.closed.Done(): 209 timer.Stop() 210 // Continuing here will cause us to call Accept() again 211 // which will return a non-temporary error. 212 continue 213 } 214 continue 215 } 216 // Reset retries after a successful Accept(). 217 retries = 0 218 219 // Since the net.Conn represents an incoming connection, the source and 220 // destination address can be retrieved from the local address and 221 // remote address of the net.Conn respectively. 222 destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr) 223 srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr) 224 if !ok1 || !ok2 { 225 // If the incoming connection is not a TCP connection, which is 226 // really unexpected since we check whether the provided listener is 227 // a TCP listener in Serve(), we return an error which would cause 228 // us to stop serving. 229 return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr()) 230 } 231 232 l.mu.RLock() 233 if l.mode == ServingModeNotServing { 234 // Close connections as soon as we accept them when we are in 235 // "not-serving" mode. Since we accept a net.Listener from the user 236 // in Serve(), we cannot close the listener when we move to 237 // "not-serving". Closing the connection immediately upon accepting 238 // is one of the other ways to implement the "not-serving" mode as 239 // outlined in gRFC A36. 240 l.mu.RUnlock() 241 conn.Close() 242 continue 243 } 244 fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{ 245 IsUnspecifiedListener: l.isUnspecifiedAddr, 246 DestAddr: destAddr.IP, 247 SourceAddr: srcAddr.IP, 248 SourcePort: srcAddr.Port, 249 }) 250 l.mu.RUnlock() 251 if err != nil { 252 // When a matching filter chain is not found, we close the 253 // connection right away, but do not return an error back to 254 // `grpc.Serve()` from where this Accept() was invoked. Returning an 255 // error to `grpc.Serve()` causes the server to shutdown. If we want 256 // to avoid the server from shutting down, we would need to return 257 // an error type which implements the `Temporary() bool` method, 258 // which is invoked by `grpc.Serve()` to see if the returned error 259 // represents a temporary condition. In the case of a temporary 260 // error, `grpc.Serve()` method sleeps for a small duration and 261 // therefore ends up blocking all connection attempts during that 262 // time frame, which is also not ideal for an error like this. 263 l.logger.Warningf("connection from %s to %s failed to find any matching filter chain", conn.RemoteAddr().String(), conn.LocalAddr().String()) 264 conn.Close() 265 continue 266 } 267 return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil 268 } 269} 270 271// Close closes the underlying listener. It also cancels the xDS watch 272// registered in Serve() and closes any certificate provider instances created 273// based on security configuration received in the LDS response. 274func (l *listenerWrapper) Close() error { 275 l.closed.Fire() 276 l.Listener.Close() 277 if l.cancelWatch != nil { 278 l.cancelWatch() 279 } 280 return nil 281} 282 283func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate, err error) { 284 if l.closed.HasFired() { 285 l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err) 286 return 287 } 288 289 if err != nil { 290 l.logger.Warningf("Received error for resource %q: %+v", l.name, err) 291 if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound { 292 l.switchMode(nil, ServingModeNotServing, err) 293 } 294 // For errors which are anything other than "resource-not-found", we 295 // continue to use the old configuration. 296 return 297 } 298 l.logger.Infof("Received update for resource %q: %+v", l.name, update) 299 300 // Make sure that the socket address on the received Listener resource 301 // matches the address of the net.Listener passed to us by the user. This 302 // check is done here instead of at the xdsClient layer because of the 303 // following couple of reasons: 304 // - xdsClient cannot know the listening address of every listener in the 305 // system, and hence cannot perform this check. 306 // - this is a very context-dependent check and only the server has the 307 // appropriate context to perform this check. 308 // 309 // What this means is that the xdsClient has ACKed a resource which can push 310 // the server into a "not serving" mode. This is not ideal, but this is 311 // what we have decided to do. See gRPC A36 for more details. 312 ilc := update.InboundListenerCfg 313 if ilc.Address != l.addr || ilc.Port != l.port { 314 l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) 315 return 316 } 317 318 l.switchMode(ilc.FilterChains, ServingModeServing, nil) 319 l.goodUpdate.Fire() 320} 321 322func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) { 323 l.mu.Lock() 324 defer l.mu.Unlock() 325 326 l.filterChains = fcs 327 l.mode = newMode 328 if l.modeCallback != nil { 329 l.modeCallback(l.Listener.Addr(), newMode, err) 330 } 331 l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err) 332} 333