1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package server
5
6import (
7	"context"
8	"crypto/tls"
9	"errors"
10	"net"
11	"os"
12	"runtime"
13	"sync"
14	"syscall"
15
16	"github.com/zeebo/errs"
17	"go.uber.org/zap"
18	"golang.org/x/sync/errgroup"
19
20	"storj.io/common/identity"
21	"storj.io/common/peertls/tlsopts"
22	"storj.io/common/rpc"
23	"storj.io/common/rpc/quic"
24	"storj.io/common/rpc/rpctracing"
25	"storj.io/drpc/drpcmigrate"
26	"storj.io/drpc/drpcmux"
27	"storj.io/drpc/drpcserver"
28	jaeger "storj.io/monkit-jaeger"
29)
30
31// Config holds server specific configuration parameters.
32type Config struct {
33	tlsopts.Config
34	Address        string `user:"true" help:"public address to listen on" default:":7777"`
35	PrivateAddress string `user:"true" help:"private address to listen on" default:"127.0.0.1:7778"`
36	DisableQUIC    bool   `help:"disable QUIC listener on a server" hidden:"true" default:"false"`
37
38	DisableTCPTLS   bool `help:"disable TCP/TLS listener on a server" internal:"true"`
39	DebugLogTraffic bool `hidden:"true" default:"false"` // Deprecated
40}
41
42type public struct {
43	tcpListener   net.Listener
44	udpConn       *net.UDPConn
45	quicListener  net.Listener
46	addr          net.Addr
47	disableTCPTLS bool
48	disableQUIC   bool
49
50	drpc *drpcserver.Server
51	mux  *drpcmux.Mux
52}
53
54type private struct {
55	listener net.Listener
56	drpc     *drpcserver.Server
57	mux      *drpcmux.Mux
58}
59
60// Server represents a bundle of services defined by a specific ID.
61// Examples of servers are the satellite, the storagenode, and the uplink.
62type Server struct {
63	log        *zap.Logger
64	public     public
65	private    private
66	tlsOptions *tlsopts.Options
67
68	mu   sync.Mutex
69	wg   sync.WaitGroup
70	once sync.Once
71	done chan struct{}
72}
73
74// New creates a Server out of an Identity, a net.Listener,
75// and interceptors.
76func New(log *zap.Logger, tlsOptions *tlsopts.Options, config Config) (_ *Server, err error) {
77	server := &Server{
78		log:        log,
79		tlsOptions: tlsOptions,
80		done:       make(chan struct{}),
81	}
82
83	server.public, err = newPublic(config.Address, config.DisableTCPTLS, config.DisableQUIC)
84	if err != nil {
85		return nil, Error.Wrap(err)
86	}
87
88	serverOptions := drpcserver.Options{
89		Manager: rpc.NewDefaultManagerOptions(),
90	}
91	privateListener, err := net.Listen("tcp", config.PrivateAddress)
92	if err != nil {
93		return nil, errs.Combine(err, server.public.Close())
94	}
95	privateMux := drpcmux.New()
96	privateTracingHandler := rpctracing.NewHandler(privateMux, jaeger.RemoteTraceHandler)
97	server.private = private{
98		listener: wrapListener(privateListener),
99		drpc:     drpcserver.NewWithOptions(privateTracingHandler, serverOptions),
100		mux:      privateMux,
101	}
102
103	return server, nil
104}
105
106// Identity returns the server's identity.
107func (p *Server) Identity() *identity.FullIdentity { return p.tlsOptions.Ident }
108
109// Addr returns the server's public listener address.
110func (p *Server) Addr() net.Addr { return p.public.addr }
111
112// PrivateAddr returns the server's private listener address.
113func (p *Server) PrivateAddr() net.Addr { return p.private.listener.Addr() }
114
115// DRPC returns the server's dRPC mux for registration purposes.
116func (p *Server) DRPC() *drpcmux.Mux { return p.public.mux }
117
118// PrivateDRPC returns the server's dRPC mux for registration purposes.
119func (p *Server) PrivateDRPC() *drpcmux.Mux { return p.private.mux }
120
121// Close shuts down the server.
122func (p *Server) Close() error {
123	p.mu.Lock()
124	defer p.mu.Unlock()
125
126	// Close done and wait for any Runs to exit.
127	p.once.Do(func() { close(p.done) })
128	p.wg.Wait()
129
130	// Ensure the listeners are closed in case Run was never called.
131	// We ignore these errors because there's not really anything to do
132	// even if they happen, and they'll just be errors due to duplicate
133	// closes anyway.
134	_ = p.public.Close()
135	_ = p.private.listener.Close()
136	return nil
137}
138
139// Run will run the server and all of its services.
140func (p *Server) Run(ctx context.Context) (err error) {
141	defer mon.Task()(&ctx)(&err)
142
143	// Make sure the server isn't already closed. If it is, register
144	// ourselves in the wait group so that Close can wait on it.
145	p.mu.Lock()
146	select {
147	case <-p.done:
148		p.mu.Unlock()
149		return errs.New("server closed")
150	default:
151		p.wg.Add(1)
152		defer p.wg.Done()
153	}
154	p.mu.Unlock()
155
156	// We want to launch the muxes in a different group so that they are
157	// only closed after we're sure that p.Close is called. The reason why
158	// is so that we don't get "listener closed" errors because the
159	// Run call exits and closes the listeners before the servers have had
160	// a chance to be notified that they're done running.
161
162	var (
163		publicMux          *drpcmigrate.ListenMux
164		publicDRPCListener net.Listener
165	)
166	if p.public.tcpListener != nil {
167		publicMux = drpcmigrate.NewListenMux(p.public.tcpListener, len(drpcmigrate.DRPCHeader))
168		publicDRPCListener = tls.NewListener(publicMux.Route(drpcmigrate.DRPCHeader), p.tlsOptions.ServerTLSConfig())
169	}
170
171	if p.public.udpConn != nil {
172		p.public.quicListener, err = quic.NewListener(p.public.udpConn, p.tlsOptions.ServerTLSConfig(), nil)
173		if err != nil {
174			return err
175		}
176	}
177
178	privateMux := drpcmigrate.NewListenMux(p.private.listener, len(drpcmigrate.DRPCHeader))
179	privateDRPCListener := privateMux.Route(drpcmigrate.DRPCHeader)
180
181	// We need a new context chain because we require this context to be
182	// canceled only after all of the upcoming drpc servers have
183	// fully exited. The reason why is because Run closes listener for
184	// the mux when it exits, and we can only do that after all of the
185	// Servers are no longer accepting.
186	muxCtx, muxCancel := context.WithCancel(context.Background())
187	defer muxCancel()
188
189	var muxGroup errgroup.Group
190	if publicMux != nil {
191		muxGroup.Go(func() error {
192			return publicMux.Run(muxCtx)
193		})
194	}
195	muxGroup.Go(func() error {
196		return privateMux.Run(muxCtx)
197	})
198
199	// Now we launch all the stuff that uses the listeners.
200	ctx, cancel := context.WithCancel(ctx)
201	defer cancel()
202
203	var group errgroup.Group
204	group.Go(func() error {
205		select {
206		case <-p.done:
207			cancel()
208		case <-ctx.Done():
209		}
210
211		return nil
212	})
213
214	if publicDRPCListener != nil {
215		group.Go(func() error {
216			defer cancel()
217			return p.public.drpc.Serve(ctx, publicDRPCListener)
218		})
219	}
220
221	if p.public.quicListener != nil {
222		group.Go(func() error {
223			defer cancel()
224			return p.public.drpc.Serve(ctx, wrapListener(p.public.quicListener))
225		})
226	}
227
228	group.Go(func() error {
229		defer cancel()
230		return p.private.drpc.Serve(ctx, privateDRPCListener)
231	})
232
233	// Now we wait for all the stuff using the listeners to exit.
234	err = group.Wait()
235
236	// Now we close down our listeners.
237	muxCancel()
238	return errs.Combine(err, muxGroup.Wait())
239}
240
241func newPublic(publicAddr string, disableTCPTLS, disableQUIC bool) (public, error) {
242	var (
243		err               error
244		publicTCPListener net.Listener
245		publicUDPConn     *net.UDPConn
246	)
247
248	for retry := 0; ; retry++ {
249		addr := publicAddr
250		if !disableTCPTLS {
251			publicTCPListener, err = net.Listen("tcp", addr)
252			if err != nil {
253				return public{}, err
254			}
255
256			addr = publicTCPListener.Addr().String()
257		}
258
259		if !disableQUIC {
260			udpAddr, err := net.ResolveUDPAddr("udp", addr)
261			if err != nil {
262				return public{}, err
263			}
264
265			publicUDPConn, err = net.ListenUDP("udp", udpAddr)
266			if err != nil {
267				_, port, _ := net.SplitHostPort(publicAddr)
268				if port == "0" && retry < 10 && isErrorAddressAlreadyInUse(err) {
269					// from here, we know for sure that the tcp port chosen by the
270					// os is available, but we don't know if the same port number
271					// for udp is also available.
272					// if a udp port is already in use, we will close the tcp port and retry
273					// to find one that is available for both udp and tcp.
274					_ = publicTCPListener.Close()
275					continue
276				}
277				return public{}, errs.Combine(err, publicTCPListener.Close())
278			}
279		}
280
281		break
282	}
283
284	publicMux := drpcmux.New()
285	publicTracingHandler := rpctracing.NewHandler(publicMux, jaeger.RemoteTraceHandler)
286	serverOptions := drpcserver.Options{
287		Manager: rpc.NewDefaultManagerOptions(),
288	}
289
290	var netAddr net.Addr
291	if publicTCPListener != nil {
292		netAddr = publicTCPListener.Addr()
293	}
294
295	if publicUDPConn != nil && netAddr == nil {
296		netAddr = publicUDPConn.LocalAddr()
297	}
298
299	return public{
300		tcpListener:   wrapListener(publicTCPListener),
301		udpConn:       publicUDPConn,
302		addr:          netAddr,
303		drpc:          drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
304		mux:           publicMux,
305		disableTCPTLS: disableTCPTLS,
306		disableQUIC:   disableQUIC,
307	}, nil
308}
309
310func (p public) Close() (err error) {
311	if p.quicListener != nil {
312		err = p.quicListener.Close()
313	}
314	if p.udpConn != nil {
315		err = errs.Combine(err, p.udpConn.Close())
316	}
317	if p.tcpListener != nil {
318		err = errs.Combine(err, p.tcpListener.Close())
319	}
320
321	return err
322}
323
324// isErrorAddressAlreadyInUse checks whether the error is corresponding to
325// EADDRINUSE. Taken from https://stackoverflow.com/a/65865898.
326func isErrorAddressAlreadyInUse(err error) bool {
327	var eOsSyscall *os.SyscallError
328	if !errors.As(err, &eOsSyscall) {
329		return false
330	}
331	var errErrno syscall.Errno
332	if !errors.As(eOsSyscall.Err, &errErrno) {
333		return false
334	}
335	if errErrno == syscall.EADDRINUSE {
336		return true
337	}
338	const WSAEADDRINUSE = 10048
339	if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE {
340		return true
341	}
342	return false
343}
344