1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package server 5 6import ( 7 "context" 8 "crypto/tls" 9 "errors" 10 "net" 11 "os" 12 "runtime" 13 "sync" 14 "syscall" 15 16 "github.com/zeebo/errs" 17 "go.uber.org/zap" 18 "golang.org/x/sync/errgroup" 19 20 "storj.io/common/identity" 21 "storj.io/common/peertls/tlsopts" 22 "storj.io/common/rpc" 23 "storj.io/common/rpc/quic" 24 "storj.io/common/rpc/rpctracing" 25 "storj.io/drpc/drpcmigrate" 26 "storj.io/drpc/drpcmux" 27 "storj.io/drpc/drpcserver" 28 jaeger "storj.io/monkit-jaeger" 29) 30 31// Config holds server specific configuration parameters. 32type Config struct { 33 tlsopts.Config 34 Address string `user:"true" help:"public address to listen on" default:":7777"` 35 PrivateAddress string `user:"true" help:"private address to listen on" default:"127.0.0.1:7778"` 36 DisableQUIC bool `help:"disable QUIC listener on a server" hidden:"true" default:"false"` 37 38 DisableTCPTLS bool `help:"disable TCP/TLS listener on a server" internal:"true"` 39 DebugLogTraffic bool `hidden:"true" default:"false"` // Deprecated 40} 41 42type public struct { 43 tcpListener net.Listener 44 udpConn *net.UDPConn 45 quicListener net.Listener 46 addr net.Addr 47 disableTCPTLS bool 48 disableQUIC bool 49 50 drpc *drpcserver.Server 51 mux *drpcmux.Mux 52} 53 54type private struct { 55 listener net.Listener 56 drpc *drpcserver.Server 57 mux *drpcmux.Mux 58} 59 60// Server represents a bundle of services defined by a specific ID. 61// Examples of servers are the satellite, the storagenode, and the uplink. 62type Server struct { 63 log *zap.Logger 64 public public 65 private private 66 tlsOptions *tlsopts.Options 67 68 mu sync.Mutex 69 wg sync.WaitGroup 70 once sync.Once 71 done chan struct{} 72} 73 74// New creates a Server out of an Identity, a net.Listener, 75// and interceptors. 76func New(log *zap.Logger, tlsOptions *tlsopts.Options, config Config) (_ *Server, err error) { 77 server := &Server{ 78 log: log, 79 tlsOptions: tlsOptions, 80 done: make(chan struct{}), 81 } 82 83 server.public, err = newPublic(config.Address, config.DisableTCPTLS, config.DisableQUIC) 84 if err != nil { 85 return nil, Error.Wrap(err) 86 } 87 88 serverOptions := drpcserver.Options{ 89 Manager: rpc.NewDefaultManagerOptions(), 90 } 91 privateListener, err := net.Listen("tcp", config.PrivateAddress) 92 if err != nil { 93 return nil, errs.Combine(err, server.public.Close()) 94 } 95 privateMux := drpcmux.New() 96 privateTracingHandler := rpctracing.NewHandler(privateMux, jaeger.RemoteTraceHandler) 97 server.private = private{ 98 listener: wrapListener(privateListener), 99 drpc: drpcserver.NewWithOptions(privateTracingHandler, serverOptions), 100 mux: privateMux, 101 } 102 103 return server, nil 104} 105 106// Identity returns the server's identity. 107func (p *Server) Identity() *identity.FullIdentity { return p.tlsOptions.Ident } 108 109// Addr returns the server's public listener address. 110func (p *Server) Addr() net.Addr { return p.public.addr } 111 112// PrivateAddr returns the server's private listener address. 113func (p *Server) PrivateAddr() net.Addr { return p.private.listener.Addr() } 114 115// DRPC returns the server's dRPC mux for registration purposes. 116func (p *Server) DRPC() *drpcmux.Mux { return p.public.mux } 117 118// PrivateDRPC returns the server's dRPC mux for registration purposes. 119func (p *Server) PrivateDRPC() *drpcmux.Mux { return p.private.mux } 120 121// Close shuts down the server. 122func (p *Server) Close() error { 123 p.mu.Lock() 124 defer p.mu.Unlock() 125 126 // Close done and wait for any Runs to exit. 127 p.once.Do(func() { close(p.done) }) 128 p.wg.Wait() 129 130 // Ensure the listeners are closed in case Run was never called. 131 // We ignore these errors because there's not really anything to do 132 // even if they happen, and they'll just be errors due to duplicate 133 // closes anyway. 134 _ = p.public.Close() 135 _ = p.private.listener.Close() 136 return nil 137} 138 139// Run will run the server and all of its services. 140func (p *Server) Run(ctx context.Context) (err error) { 141 defer mon.Task()(&ctx)(&err) 142 143 // Make sure the server isn't already closed. If it is, register 144 // ourselves in the wait group so that Close can wait on it. 145 p.mu.Lock() 146 select { 147 case <-p.done: 148 p.mu.Unlock() 149 return errs.New("server closed") 150 default: 151 p.wg.Add(1) 152 defer p.wg.Done() 153 } 154 p.mu.Unlock() 155 156 // We want to launch the muxes in a different group so that they are 157 // only closed after we're sure that p.Close is called. The reason why 158 // is so that we don't get "listener closed" errors because the 159 // Run call exits and closes the listeners before the servers have had 160 // a chance to be notified that they're done running. 161 162 var ( 163 publicMux *drpcmigrate.ListenMux 164 publicDRPCListener net.Listener 165 ) 166 if p.public.tcpListener != nil { 167 publicMux = drpcmigrate.NewListenMux(p.public.tcpListener, len(drpcmigrate.DRPCHeader)) 168 publicDRPCListener = tls.NewListener(publicMux.Route(drpcmigrate.DRPCHeader), p.tlsOptions.ServerTLSConfig()) 169 } 170 171 if p.public.udpConn != nil { 172 p.public.quicListener, err = quic.NewListener(p.public.udpConn, p.tlsOptions.ServerTLSConfig(), nil) 173 if err != nil { 174 return err 175 } 176 } 177 178 privateMux := drpcmigrate.NewListenMux(p.private.listener, len(drpcmigrate.DRPCHeader)) 179 privateDRPCListener := privateMux.Route(drpcmigrate.DRPCHeader) 180 181 // We need a new context chain because we require this context to be 182 // canceled only after all of the upcoming drpc servers have 183 // fully exited. The reason why is because Run closes listener for 184 // the mux when it exits, and we can only do that after all of the 185 // Servers are no longer accepting. 186 muxCtx, muxCancel := context.WithCancel(context.Background()) 187 defer muxCancel() 188 189 var muxGroup errgroup.Group 190 if publicMux != nil { 191 muxGroup.Go(func() error { 192 return publicMux.Run(muxCtx) 193 }) 194 } 195 muxGroup.Go(func() error { 196 return privateMux.Run(muxCtx) 197 }) 198 199 // Now we launch all the stuff that uses the listeners. 200 ctx, cancel := context.WithCancel(ctx) 201 defer cancel() 202 203 var group errgroup.Group 204 group.Go(func() error { 205 select { 206 case <-p.done: 207 cancel() 208 case <-ctx.Done(): 209 } 210 211 return nil 212 }) 213 214 if publicDRPCListener != nil { 215 group.Go(func() error { 216 defer cancel() 217 return p.public.drpc.Serve(ctx, publicDRPCListener) 218 }) 219 } 220 221 if p.public.quicListener != nil { 222 group.Go(func() error { 223 defer cancel() 224 return p.public.drpc.Serve(ctx, wrapListener(p.public.quicListener)) 225 }) 226 } 227 228 group.Go(func() error { 229 defer cancel() 230 return p.private.drpc.Serve(ctx, privateDRPCListener) 231 }) 232 233 // Now we wait for all the stuff using the listeners to exit. 234 err = group.Wait() 235 236 // Now we close down our listeners. 237 muxCancel() 238 return errs.Combine(err, muxGroup.Wait()) 239} 240 241func newPublic(publicAddr string, disableTCPTLS, disableQUIC bool) (public, error) { 242 var ( 243 err error 244 publicTCPListener net.Listener 245 publicUDPConn *net.UDPConn 246 ) 247 248 for retry := 0; ; retry++ { 249 addr := publicAddr 250 if !disableTCPTLS { 251 publicTCPListener, err = net.Listen("tcp", addr) 252 if err != nil { 253 return public{}, err 254 } 255 256 addr = publicTCPListener.Addr().String() 257 } 258 259 if !disableQUIC { 260 udpAddr, err := net.ResolveUDPAddr("udp", addr) 261 if err != nil { 262 return public{}, err 263 } 264 265 publicUDPConn, err = net.ListenUDP("udp", udpAddr) 266 if err != nil { 267 _, port, _ := net.SplitHostPort(publicAddr) 268 if port == "0" && retry < 10 && isErrorAddressAlreadyInUse(err) { 269 // from here, we know for sure that the tcp port chosen by the 270 // os is available, but we don't know if the same port number 271 // for udp is also available. 272 // if a udp port is already in use, we will close the tcp port and retry 273 // to find one that is available for both udp and tcp. 274 _ = publicTCPListener.Close() 275 continue 276 } 277 return public{}, errs.Combine(err, publicTCPListener.Close()) 278 } 279 } 280 281 break 282 } 283 284 publicMux := drpcmux.New() 285 publicTracingHandler := rpctracing.NewHandler(publicMux, jaeger.RemoteTraceHandler) 286 serverOptions := drpcserver.Options{ 287 Manager: rpc.NewDefaultManagerOptions(), 288 } 289 290 var netAddr net.Addr 291 if publicTCPListener != nil { 292 netAddr = publicTCPListener.Addr() 293 } 294 295 if publicUDPConn != nil && netAddr == nil { 296 netAddr = publicUDPConn.LocalAddr() 297 } 298 299 return public{ 300 tcpListener: wrapListener(publicTCPListener), 301 udpConn: publicUDPConn, 302 addr: netAddr, 303 drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions), 304 mux: publicMux, 305 disableTCPTLS: disableTCPTLS, 306 disableQUIC: disableQUIC, 307 }, nil 308} 309 310func (p public) Close() (err error) { 311 if p.quicListener != nil { 312 err = p.quicListener.Close() 313 } 314 if p.udpConn != nil { 315 err = errs.Combine(err, p.udpConn.Close()) 316 } 317 if p.tcpListener != nil { 318 err = errs.Combine(err, p.tcpListener.Close()) 319 } 320 321 return err 322} 323 324// isErrorAddressAlreadyInUse checks whether the error is corresponding to 325// EADDRINUSE. Taken from https://stackoverflow.com/a/65865898. 326func isErrorAddressAlreadyInUse(err error) bool { 327 var eOsSyscall *os.SyscallError 328 if !errors.As(err, &eOsSyscall) { 329 return false 330 } 331 var errErrno syscall.Errno 332 if !errors.As(eOsSyscall.Err, &errErrno) { 333 return false 334 } 335 if errErrno == syscall.EADDRINUSE { 336 return true 337 } 338 const WSAEADDRINUSE = 10048 339 if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE { 340 return true 341 } 342 return false 343} 344