1package autonat 2 3import ( 4 "context" 5 "errors" 6 "math/rand" 7 "sync/atomic" 8 "time" 9 10 "github.com/libp2p/go-eventbus" 11 "github.com/libp2p/go-libp2p-core/event" 12 "github.com/libp2p/go-libp2p-core/host" 13 "github.com/libp2p/go-libp2p-core/network" 14 "github.com/libp2p/go-libp2p-core/peer" 15 16 logging "github.com/ipfs/go-log" 17 ma "github.com/multiformats/go-multiaddr" 18 manet "github.com/multiformats/go-multiaddr/net" 19) 20 21var log = logging.Logger("autonat") 22 23// AmbientAutoNAT is the implementation of ambient NAT autodiscovery 24type AmbientAutoNAT struct { 25 ctx context.Context 26 host host.Host 27 28 *config 29 30 inboundConn chan network.Conn 31 observations chan autoNATResult 32 // status is an autoNATResult reflecting current status. 33 status atomic.Value 34 // Reflects the confidence on of the NATStatus being private, as a single 35 // dialback may fail for reasons unrelated to NAT. 36 // If it is <3, then multiple autoNAT peers may be contacted for dialback 37 // If only a single autoNAT peer is known, then the confidence increases 38 // for each failure until it reaches 3. 39 confidence int 40 lastInbound time.Time 41 lastProbeTry time.Time 42 lastProbe time.Time 43 recentProbes map[peer.ID]time.Time 44 45 service *autoNATService 46 47 emitReachabilityChanged event.Emitter 48 subscriber event.Subscription 49} 50 51// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired. 52type StaticAutoNAT struct { 53 ctx context.Context 54 host host.Host 55 reachability network.Reachability 56 service *autoNATService 57} 58 59type autoNATResult struct { 60 network.Reachability 61 address ma.Multiaddr 62} 63 64// New creates a new NAT autodiscovery system attached to a host 65func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) { 66 var err error 67 conf := new(config) 68 conf.host = h 69 conf.dialPolicy.host = h 70 71 if err = defaults(conf); err != nil { 72 return nil, err 73 } 74 if conf.addressFunc == nil { 75 conf.addressFunc = h.Addrs 76 } 77 78 for _, o := range options { 79 if err = o(conf); err != nil { 80 return nil, err 81 } 82 } 83 emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) 84 85 var service *autoNATService 86 if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil { 87 service, err = newAutoNATService(ctx, conf) 88 if err != nil { 89 return nil, err 90 } 91 service.Enable() 92 } 93 94 if conf.forceReachability { 95 emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability}) 96 97 return &StaticAutoNAT{ 98 ctx: ctx, 99 host: h, 100 reachability: conf.reachability, 101 service: service, 102 }, nil 103 } 104 105 as := &AmbientAutoNAT{ 106 ctx: ctx, 107 host: h, 108 config: conf, 109 inboundConn: make(chan network.Conn, 5), 110 observations: make(chan autoNATResult, 1), 111 112 emitReachabilityChanged: emitReachabilityChanged, 113 service: service, 114 recentProbes: make(map[peer.ID]time.Time), 115 } 116 as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) 117 118 subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)}) 119 if err != nil { 120 return nil, err 121 } 122 as.subscriber = subscriber 123 124 h.Network().Notify(as) 125 go as.background() 126 127 return as, nil 128} 129 130// Status returns the AutoNAT observed reachability status. 131func (as *AmbientAutoNAT) Status() network.Reachability { 132 s := as.status.Load().(autoNATResult) 133 return s.Reachability 134} 135 136func (as *AmbientAutoNAT) emitStatus() { 137 status := as.status.Load().(autoNATResult) 138 as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability}) 139} 140 141// PublicAddr returns the publicly connectable Multiaddr of this node if one is known. 142func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) { 143 s := as.status.Load().(autoNATResult) 144 if s.Reachability != network.ReachabilityPublic { 145 return nil, errors.New("NAT status is not public") 146 } 147 148 return s.address, nil 149} 150 151func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool { 152 candidateIP, _ := manet.ToIP(candidate) 153 for _, i := range list { 154 if ip, err := manet.ToIP(i); err == nil && ip.Equal(candidateIP) { 155 return true 156 } 157 } 158 return false 159} 160 161func (as *AmbientAutoNAT) background() { 162 // wait a bit for the node to come online and establish some connections 163 // before starting autodetection 164 delay := as.config.bootDelay 165 166 var lastAddrUpdated time.Time 167 subChan := as.subscriber.Out() 168 defer as.subscriber.Close() 169 defer as.emitReachabilityChanged.Close() 170 171 timer := time.NewTimer(delay) 172 defer timer.Stop() 173 timerRunning := true 174 for { 175 select { 176 // new inbound connection. 177 case conn := <-as.inboundConn: 178 localAddrs := as.host.Addrs() 179 ca := as.status.Load().(autoNATResult) 180 if ca.address != nil { 181 localAddrs = append(localAddrs, ca.address) 182 } 183 if manet.IsPublicAddr(conn.RemoteMultiaddr()) && 184 !ipInList(conn.RemoteMultiaddr(), localAddrs) { 185 as.lastInbound = time.Now() 186 } 187 188 case e := <-subChan: 189 switch e := e.(type) { 190 case event.EvtLocalAddressesUpdated: 191 if !lastAddrUpdated.Add(time.Second).After(time.Now()) { 192 lastAddrUpdated = time.Now() 193 if as.confidence > 1 { 194 as.confidence-- 195 } 196 } 197 case event.EvtPeerIdentificationCompleted: 198 if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 { 199 currentStatus := as.status.Load().(autoNATResult) 200 if currentStatus.Reachability == network.ReachabilityUnknown { 201 as.tryProbe(e.Peer) 202 } 203 } 204 default: 205 log.Errorf("unknown event type: %T", e) 206 } 207 208 // probe finished. 209 case result, ok := <-as.observations: 210 if !ok { 211 return 212 } 213 as.recordObservation(result) 214 case <-timer.C: 215 peer := as.getPeerToProbe() 216 as.tryProbe(peer) 217 timerRunning = false 218 case <-as.ctx.Done(): 219 return 220 } 221 222 // Drain the timer channel if it hasn't fired in preparation for Resetting it. 223 if timerRunning && !timer.Stop() { 224 <-timer.C 225 } 226 timer.Reset(as.scheduleProbe()) 227 timerRunning = true 228 } 229} 230 231func (as *AmbientAutoNAT) cleanupRecentProbes() { 232 fixedNow := time.Now() 233 for k, v := range as.recentProbes { 234 if fixedNow.Sub(v) > as.throttlePeerPeriod { 235 delete(as.recentProbes, k) 236 } 237 } 238} 239 240// scheduleProbe calculates when the next probe should be scheduled for. 241func (as *AmbientAutoNAT) scheduleProbe() time.Duration { 242 // Our baseline is a probe every 'AutoNATRefreshInterval' 243 // This is modulated by: 244 // * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval' 245 // * recent inbound connections (implying continued connectivity) should decrease the retry when public 246 // * recent inbound connections when not public mean we should try more actively to see if we're public. 247 fixedNow := time.Now() 248 currentStatus := as.status.Load().(autoNATResult) 249 250 nextProbe := fixedNow 251 // Don't look for peers in the peer store more than once per second. 252 if !as.lastProbeTry.IsZero() { 253 backoff := as.lastProbeTry.Add(time.Second) 254 if backoff.After(nextProbe) { 255 nextProbe = backoff 256 } 257 } 258 if !as.lastProbe.IsZero() { 259 untilNext := as.config.refreshInterval 260 if currentStatus.Reachability == network.ReachabilityUnknown { 261 untilNext = as.config.retryInterval 262 } else if as.confidence < 3 { 263 untilNext = as.config.retryInterval 264 } else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { 265 untilNext *= 2 266 } else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { 267 untilNext /= 5 268 } 269 270 if as.lastProbe.Add(untilNext).After(nextProbe) { 271 nextProbe = as.lastProbe.Add(untilNext) 272 } 273 } 274 275 return nextProbe.Sub(fixedNow) 276} 277 278// Update the current status based on an observed result. 279func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { 280 currentStatus := as.status.Load().(autoNATResult) 281 if observation.Reachability == network.ReachabilityPublic { 282 log.Debugf("NAT status is public") 283 changed := false 284 if currentStatus.Reachability != network.ReachabilityPublic { 285 // we are flipping our NATStatus, so confidence drops to 0 286 as.confidence = 0 287 if as.service != nil { 288 as.service.Enable() 289 } 290 changed = true 291 } else if as.confidence < 3 { 292 as.confidence++ 293 } 294 if observation.address != nil { 295 if !changed && currentStatus.address != nil && !observation.address.Equal(currentStatus.address) { 296 as.confidence-- 297 } 298 if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) { 299 changed = true 300 } 301 as.status.Store(observation) 302 } 303 if observation.address != nil && changed { 304 as.emitStatus() 305 } 306 } else if observation.Reachability == network.ReachabilityPrivate { 307 log.Debugf("NAT status is private") 308 if currentStatus.Reachability == network.ReachabilityPublic { 309 if as.confidence > 0 { 310 as.confidence-- 311 } else { 312 // we are flipping our NATStatus, so confidence drops to 0 313 as.confidence = 0 314 as.status.Store(observation) 315 if as.service != nil { 316 as.service.Disable() 317 } 318 as.emitStatus() 319 } 320 } else if as.confidence < 3 { 321 as.confidence++ 322 as.status.Store(observation) 323 if currentStatus.Reachability != network.ReachabilityPrivate { 324 as.emitStatus() 325 } 326 } 327 } else if as.confidence > 0 { 328 // don't just flip to unknown, reduce confidence first 329 as.confidence-- 330 } else { 331 log.Debugf("NAT status is unknown") 332 as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) 333 if currentStatus.Reachability != network.ReachabilityUnknown { 334 if as.service != nil { 335 as.service.Enable() 336 } 337 as.emitStatus() 338 } 339 } 340} 341 342func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool { 343 as.lastProbeTry = time.Now() 344 if p.Validate() != nil { 345 return false 346 } 347 348 if lastTime, ok := as.recentProbes[p]; ok { 349 if time.Since(lastTime) < as.throttlePeerPeriod { 350 return false 351 } 352 } 353 as.cleanupRecentProbes() 354 355 info := as.host.Peerstore().PeerInfo(p) 356 357 if !as.config.dialPolicy.skipPeer(info.Addrs) { 358 as.recentProbes[p] = time.Now() 359 as.lastProbe = time.Now() 360 go as.probe(&info) 361 return true 362 } 363 return false 364} 365 366func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) { 367 cli := NewAutoNATClient(as.host, as.config.addressFunc) 368 ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout) 369 defer cancel() 370 371 a, err := cli.DialBack(ctx, pi.ID) 372 373 var result autoNATResult 374 switch { 375 case err == nil: 376 log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String()) 377 result.Reachability = network.ReachabilityPublic 378 result.address = a 379 case IsDialError(err): 380 log.Debugf("Dialback through %s failed", pi.ID.Pretty()) 381 result.Reachability = network.ReachabilityPrivate 382 default: 383 result.Reachability = network.ReachabilityUnknown 384 } 385 386 select { 387 case as.observations <- result: 388 case <-as.ctx.Done(): 389 return 390 } 391} 392 393func (as *AmbientAutoNAT) getPeerToProbe() peer.ID { 394 peers := as.host.Network().Peers() 395 if len(peers) == 0 { 396 return "" 397 } 398 399 candidates := make([]peer.ID, 0, len(peers)) 400 401 for _, p := range peers { 402 info := as.host.Peerstore().PeerInfo(p) 403 // Exclude peers which don't support the autonat protocol. 404 if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil { 405 continue 406 } 407 408 // Exclude peers in backoff. 409 if lastTime, ok := as.recentProbes[p]; ok { 410 if time.Since(lastTime) < as.throttlePeerPeriod { 411 continue 412 } 413 } 414 415 if as.config.dialPolicy.skipPeer(info.Addrs) { 416 continue 417 } 418 candidates = append(candidates, p) 419 } 420 421 if len(candidates) == 0 { 422 return "" 423 } 424 425 shufflePeers(candidates) 426 return candidates[0] 427} 428 429func shufflePeers(peers []peer.ID) { 430 for i := range peers { 431 j := rand.Intn(i + 1) 432 peers[i], peers[j] = peers[j], peers[i] 433 } 434} 435 436// Status returns the AutoNAT observed reachability status. 437func (s *StaticAutoNAT) Status() network.Reachability { 438 return s.reachability 439} 440 441// PublicAddr returns the publicly connectable Multiaddr of this node if one is known. 442func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) { 443 if s.reachability != network.ReachabilityPublic { 444 return nil, errors.New("NAT status is not public") 445 } 446 return nil, errors.New("No available address") 447} 448