1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package server contains internal server-side functionality used by the public
20// facing xds package.
21package server
22
23import (
24	"fmt"
25	"net"
26	"sync"
27	"time"
28
29	"google.golang.org/grpc/backoff"
30	"google.golang.org/grpc/grpclog"
31	internalbackoff "google.golang.org/grpc/internal/backoff"
32	internalgrpclog "google.golang.org/grpc/internal/grpclog"
33	"google.golang.org/grpc/internal/grpcsync"
34	xdsclient "google.golang.org/grpc/xds/internal/client"
35	"google.golang.org/grpc/xds/internal/client/bootstrap"
36)
37
38var (
39	logger = grpclog.Component("xds")
40
41	// Backoff strategy for temporary errors received from Accept(). If this
42	// needs to be configurable, we can inject it through ListenerWrapperParams.
43	bs = internalbackoff.Exponential{Config: backoff.Config{
44		BaseDelay:  5 * time.Millisecond,
45		Multiplier: 2.0,
46		MaxDelay:   1 * time.Second,
47	}}
48	backoffFunc = bs.Backoff
49)
50
51// ServingMode indicates the current mode of operation of the server.
52//
53// This API exactly mirrors the one in the public xds package. We have to
54// redefine it here to avoid a cyclic dependency.
55type ServingMode int
56
57const (
58	// ServingModeStarting indicates that the serving is starting up.
59	ServingModeStarting ServingMode = iota
60	// ServingModeServing indicates the the server contains all required xDS
61	// configuration is serving RPCs.
62	ServingModeServing
63	// ServingModeNotServing indicates that the server is not accepting new
64	// connections. Existing connections will be closed gracefully, allowing
65	// in-progress RPCs to complete. A server enters this mode when it does not
66	// contain the required xDS configuration to serve RPCs.
67	ServingModeNotServing
68)
69
70func (s ServingMode) String() string {
71	switch s {
72	case ServingModeNotServing:
73		return "not-serving"
74	case ServingModeServing:
75		return "serving"
76	default:
77		return "starting"
78	}
79}
80
81// ServingModeCallback is the callback that users can register to get notified
82// about the server's serving mode changes. The callback is invoked with the
83// address of the listener and its new mode. The err parameter is set to a
84// non-nil error if the server has transitioned into not-serving mode.
85type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
86
87func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
88	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
89}
90
91// XDSClientInterface wraps the methods on the xdsClient which are required by
92// the listenerWrapper.
93type XDSClientInterface interface {
94	WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
95	BootstrapConfig() *bootstrap.Config
96}
97
98// ListenerWrapperParams wraps parameters required to create a listenerWrapper.
99type ListenerWrapperParams struct {
100	// Listener is the net.Listener passed by the user that is to be wrapped.
101	Listener net.Listener
102	// ListenerResourceName is the xDS Listener resource to request.
103	ListenerResourceName string
104	// XDSCredsInUse specifies whether or not the user expressed interest to
105	// receive security configuration from the control plane.
106	XDSCredsInUse bool
107	// XDSClient provides the functionality from the xdsClient required here.
108	XDSClient XDSClientInterface
109	// ModeCallback is the callback to invoke when the serving mode changes.
110	ModeCallback ServingModeCallback
111}
112
113// NewListenerWrapper creates a new listenerWrapper with params. It returns a
114// net.Listener and a channel which is written to, indicating that the former is
115// ready to be passed to grpc.Serve().
116//
117// Only TCP listeners are supported.
118func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan struct{}) {
119	lw := &listenerWrapper{
120		Listener:          params.Listener,
121		name:              params.ListenerResourceName,
122		xdsCredsInUse:     params.XDSCredsInUse,
123		xdsC:              params.XDSClient,
124		modeCallback:      params.ModeCallback,
125		isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
126
127		closed:     grpcsync.NewEvent(),
128		goodUpdate: grpcsync.NewEvent(),
129	}
130	lw.logger = prefixLogger(lw)
131
132	// Serve() verifies that Addr() returns a valid TCPAddr. So, it is safe to
133	// ignore the error from SplitHostPort().
134	lisAddr := lw.Listener.Addr().String()
135	lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)
136
137	cancelWatch := lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
138	lw.logger.Infof("Watch started on resource name %v", lw.name)
139	lw.cancelWatch = func() {
140		cancelWatch()
141		lw.logger.Infof("Watch cancelled on resource name %v", lw.name)
142	}
143	return lw, lw.goodUpdate.Done()
144}
145
146// listenerWrapper wraps the net.Listener associated with the listening address
147// passed to Serve(). It also contains all other state associated with this
148// particular invocation of Serve().
149type listenerWrapper struct {
150	net.Listener
151	logger *internalgrpclog.PrefixLogger
152
153	name          string
154	xdsCredsInUse bool
155	xdsC          XDSClientInterface
156	cancelWatch   func()
157	modeCallback  ServingModeCallback
158
159	// Set to true if the listener is bound to the IP_ANY address (which is
160	// "0.0.0.0" for IPv4 and "::" for IPv6).
161	isUnspecifiedAddr bool
162	// Listening address and port. Used to validate the socket address in the
163	// Listener resource received from the control plane.
164	addr, port string
165
166	// This is used to notify that a good update has been received and that
167	// Serve() can be invoked on the underlying gRPC server. Using an event
168	// instead of a vanilla channel simplifies the update handler as it need not
169	// keep track of whether the received update is the first one or not.
170	goodUpdate *grpcsync.Event
171	// A small race exists in the xdsClient code between the receipt of an xDS
172	// response and the user cancelling the associated watch. In this window,
173	// the registered callback may be invoked after the watch is canceled, and
174	// the user is expected to work around this. This event signifies that the
175	// listener is closed (and hence the watch is cancelled), and we drop any
176	// updates received in the callback if this event has fired.
177	closed *grpcsync.Event
178
179	// mu guards access to the current serving mode and the filter chains. The
180	// reason for using an rw lock here is that these fields are read in
181	// Accept() for all incoming connections, but writes happen rarely (when we
182	// get a Listener resource update).
183	mu sync.RWMutex
184	// Current serving mode.
185	mode ServingMode
186	// Filter chains received as part of the last good update.
187	filterChains *xdsclient.FilterChainManager
188}
189
190// Accept blocks on an Accept() on the underlying listener, and wraps the
191// returned net.connWrapper with the configured certificate providers.
192func (l *listenerWrapper) Accept() (net.Conn, error) {
193	var retries int
194	for {
195		conn, err := l.Listener.Accept()
196		if err != nil {
197			// Temporary() method is implemented by certain error types returned
198			// from the net package, and it is useful for us to not shutdown the
199			// server in these conditions. The listen queue being full is one
200			// such case.
201			if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() {
202				return nil, err
203			}
204			retries++
205			timer := time.NewTimer(backoffFunc(retries))
206			select {
207			case <-timer.C:
208			case <-l.closed.Done():
209				timer.Stop()
210				// Continuing here will cause us to call Accept() again
211				// which will return a non-temporary error.
212				continue
213			}
214			continue
215		}
216		// Reset retries after a successful Accept().
217		retries = 0
218
219		// Since the net.Conn represents an incoming connection, the source and
220		// destination address can be retrieved from the local address and
221		// remote address of the net.Conn respectively.
222		destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr)
223		srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr)
224		if !ok1 || !ok2 {
225			// If the incoming connection is not a TCP connection, which is
226			// really unexpected since we check whether the provided listener is
227			// a TCP listener in Serve(), we return an error which would cause
228			// us to stop serving.
229			return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr())
230		}
231
232		l.mu.RLock()
233		if l.mode == ServingModeNotServing {
234			// Close connections as soon as we accept them when we are in
235			// "not-serving" mode. Since we accept a net.Listener from the user
236			// in Serve(), we cannot close the listener when we move to
237			// "not-serving". Closing the connection immediately upon accepting
238			// is one of the other ways to implement the "not-serving" mode as
239			// outlined in gRFC A36.
240			l.mu.RUnlock()
241			conn.Close()
242			continue
243		}
244		fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{
245			IsUnspecifiedListener: l.isUnspecifiedAddr,
246			DestAddr:              destAddr.IP,
247			SourceAddr:            srcAddr.IP,
248			SourcePort:            srcAddr.Port,
249		})
250		l.mu.RUnlock()
251		if err != nil {
252			// When a matching filter chain is not found, we close the
253			// connection right away, but do not return an error back to
254			// `grpc.Serve()` from where this Accept() was invoked. Returning an
255			// error to `grpc.Serve()` causes the server to shutdown. If we want
256			// to avoid the server from shutting down, we would need to return
257			// an error type which implements the `Temporary() bool` method,
258			// which is invoked by `grpc.Serve()` to see if the returned error
259			// represents a temporary condition. In the case of a temporary
260			// error, `grpc.Serve()` method sleeps for a small duration and
261			// therefore ends up blocking all connection attempts during that
262			// time frame, which is also not ideal for an error like this.
263			l.logger.Warningf("connection from %s to %s failed to find any matching filter chain", conn.RemoteAddr().String(), conn.LocalAddr().String())
264			conn.Close()
265			continue
266		}
267		return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil
268	}
269}
270
271// Close closes the underlying listener. It also cancels the xDS watch
272// registered in Serve() and closes any certificate provider instances created
273// based on security configuration received in the LDS response.
274func (l *listenerWrapper) Close() error {
275	l.closed.Fire()
276	l.Listener.Close()
277	if l.cancelWatch != nil {
278		l.cancelWatch()
279	}
280	return nil
281}
282
283func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate, err error) {
284	if l.closed.HasFired() {
285		l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
286		return
287	}
288
289	if err != nil {
290		l.logger.Warningf("Received error for resource %q: %+v", l.name, err)
291		if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
292			l.switchMode(nil, ServingModeNotServing, err)
293		}
294		// For errors which are anything other than "resource-not-found", we
295		// continue to use the old configuration.
296		return
297	}
298	l.logger.Infof("Received update for resource %q: %+v", l.name, update)
299
300	// Make sure that the socket address on the received Listener resource
301	// matches the address of the net.Listener passed to us by the user. This
302	// check is done here instead of at the xdsClient layer because of the
303	// following couple of reasons:
304	// - xdsClient cannot know the listening address of every listener in the
305	//   system, and hence cannot perform this check.
306	// - this is a very context-dependent check and only the server has the
307	//   appropriate context to perform this check.
308	//
309	// What this means is that the xdsClient has ACKed a resource which can push
310	// the server into a "not serving" mode. This is not ideal, but this is
311	// what we have decided to do. See gRPC A36 for more details.
312	ilc := update.InboundListenerCfg
313	if ilc.Address != l.addr || ilc.Port != l.port {
314		l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
315		return
316	}
317
318	l.switchMode(ilc.FilterChains, ServingModeServing, nil)
319	l.goodUpdate.Fire()
320}
321
322func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
323	l.mu.Lock()
324	defer l.mu.Unlock()
325
326	l.filterChains = fcs
327	l.mode = newMode
328	if l.modeCallback != nil {
329		l.modeCallback(l.Listener.Addr(), newMode, err)
330	}
331	l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err)
332}
333