1package autonat
2
3import (
4	"context"
5	"errors"
6	"math/rand"
7	"sync/atomic"
8	"time"
9
10	"github.com/libp2p/go-eventbus"
11	"github.com/libp2p/go-libp2p-core/event"
12	"github.com/libp2p/go-libp2p-core/host"
13	"github.com/libp2p/go-libp2p-core/network"
14	"github.com/libp2p/go-libp2p-core/peer"
15
16	logging "github.com/ipfs/go-log"
17	ma "github.com/multiformats/go-multiaddr"
18	manet "github.com/multiformats/go-multiaddr/net"
19)
20
21var log = logging.Logger("autonat")
22
23// AmbientAutoNAT is the implementation of ambient NAT autodiscovery
24type AmbientAutoNAT struct {
25	ctx  context.Context
26	host host.Host
27
28	*config
29
30	inboundConn  chan network.Conn
31	observations chan autoNATResult
32	// status is an autoNATResult reflecting current status.
33	status atomic.Value
34	// Reflects the confidence on of the NATStatus being private, as a single
35	// dialback may fail for reasons unrelated to NAT.
36	// If it is <3, then multiple autoNAT peers may be contacted for dialback
37	// If only a single autoNAT peer is known, then the confidence increases
38	// for each failure until it reaches 3.
39	confidence   int
40	lastInbound  time.Time
41	lastProbeTry time.Time
42	lastProbe    time.Time
43	recentProbes map[peer.ID]time.Time
44
45	service *autoNATService
46
47	emitReachabilityChanged event.Emitter
48	subscriber              event.Subscription
49}
50
51// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired.
52type StaticAutoNAT struct {
53	ctx          context.Context
54	host         host.Host
55	reachability network.Reachability
56	service      *autoNATService
57}
58
59type autoNATResult struct {
60	network.Reachability
61	address ma.Multiaddr
62}
63
64// New creates a new NAT autodiscovery system attached to a host
65func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
66	var err error
67	conf := new(config)
68	conf.host = h
69	conf.dialPolicy.host = h
70
71	if err = defaults(conf); err != nil {
72		return nil, err
73	}
74	if conf.addressFunc == nil {
75		conf.addressFunc = h.Addrs
76	}
77
78	for _, o := range options {
79		if err = o(conf); err != nil {
80			return nil, err
81		}
82	}
83	emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
84
85	var service *autoNATService
86	if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
87		service, err = newAutoNATService(ctx, conf)
88		if err != nil {
89			return nil, err
90		}
91		service.Enable()
92	}
93
94	if conf.forceReachability {
95		emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})
96
97		return &StaticAutoNAT{
98			ctx:          ctx,
99			host:         h,
100			reachability: conf.reachability,
101			service:      service,
102		}, nil
103	}
104
105	as := &AmbientAutoNAT{
106		ctx:          ctx,
107		host:         h,
108		config:       conf,
109		inboundConn:  make(chan network.Conn, 5),
110		observations: make(chan autoNATResult, 1),
111
112		emitReachabilityChanged: emitReachabilityChanged,
113		service:                 service,
114		recentProbes:            make(map[peer.ID]time.Time),
115	}
116	as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
117
118	subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)})
119	if err != nil {
120		return nil, err
121	}
122	as.subscriber = subscriber
123
124	h.Network().Notify(as)
125	go as.background()
126
127	return as, nil
128}
129
130// Status returns the AutoNAT observed reachability status.
131func (as *AmbientAutoNAT) Status() network.Reachability {
132	s := as.status.Load().(autoNATResult)
133	return s.Reachability
134}
135
136func (as *AmbientAutoNAT) emitStatus() {
137	status := as.status.Load().(autoNATResult)
138	as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
139}
140
141// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
142func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
143	s := as.status.Load().(autoNATResult)
144	if s.Reachability != network.ReachabilityPublic {
145		return nil, errors.New("NAT status is not public")
146	}
147
148	return s.address, nil
149}
150
151func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
152	candidateIP, _ := manet.ToIP(candidate)
153	for _, i := range list {
154		if ip, err := manet.ToIP(i); err == nil && ip.Equal(candidateIP) {
155			return true
156		}
157	}
158	return false
159}
160
161func (as *AmbientAutoNAT) background() {
162	// wait a bit for the node to come online and establish some connections
163	// before starting autodetection
164	delay := as.config.bootDelay
165
166	var lastAddrUpdated time.Time
167	subChan := as.subscriber.Out()
168	defer as.subscriber.Close()
169	defer as.emitReachabilityChanged.Close()
170
171	timer := time.NewTimer(delay)
172	defer timer.Stop()
173	timerRunning := true
174	for {
175		select {
176		// new inbound connection.
177		case conn := <-as.inboundConn:
178			localAddrs := as.host.Addrs()
179			ca := as.status.Load().(autoNATResult)
180			if ca.address != nil {
181				localAddrs = append(localAddrs, ca.address)
182			}
183			if manet.IsPublicAddr(conn.RemoteMultiaddr()) &&
184				!ipInList(conn.RemoteMultiaddr(), localAddrs) {
185				as.lastInbound = time.Now()
186			}
187
188		case e := <-subChan:
189			switch e := e.(type) {
190			case event.EvtLocalAddressesUpdated:
191				if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
192					lastAddrUpdated = time.Now()
193					if as.confidence > 1 {
194						as.confidence--
195					}
196				}
197			case event.EvtPeerIdentificationCompleted:
198				if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
199					currentStatus := as.status.Load().(autoNATResult)
200					if currentStatus.Reachability == network.ReachabilityUnknown {
201						as.tryProbe(e.Peer)
202					}
203				}
204			default:
205				log.Errorf("unknown event type: %T", e)
206			}
207
208		// probe finished.
209		case result, ok := <-as.observations:
210			if !ok {
211				return
212			}
213			as.recordObservation(result)
214		case <-timer.C:
215			peer := as.getPeerToProbe()
216			as.tryProbe(peer)
217			timerRunning = false
218		case <-as.ctx.Done():
219			return
220		}
221
222		// Drain the timer channel if it hasn't fired in preparation for Resetting it.
223		if timerRunning && !timer.Stop() {
224			<-timer.C
225		}
226		timer.Reset(as.scheduleProbe())
227		timerRunning = true
228	}
229}
230
231func (as *AmbientAutoNAT) cleanupRecentProbes() {
232	fixedNow := time.Now()
233	for k, v := range as.recentProbes {
234		if fixedNow.Sub(v) > as.throttlePeerPeriod {
235			delete(as.recentProbes, k)
236		}
237	}
238}
239
240// scheduleProbe calculates when the next probe should be scheduled for.
241func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
242	// Our baseline is a probe every 'AutoNATRefreshInterval'
243	// This is modulated by:
244	// * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval'
245	// * recent inbound connections (implying continued connectivity) should decrease the retry when public
246	// * recent inbound connections when not public mean we should try more actively to see if we're public.
247	fixedNow := time.Now()
248	currentStatus := as.status.Load().(autoNATResult)
249
250	nextProbe := fixedNow
251	// Don't look for peers in the peer store more than once per second.
252	if !as.lastProbeTry.IsZero() {
253		backoff := as.lastProbeTry.Add(time.Second)
254		if backoff.After(nextProbe) {
255			nextProbe = backoff
256		}
257	}
258	if !as.lastProbe.IsZero() {
259		untilNext := as.config.refreshInterval
260		if currentStatus.Reachability == network.ReachabilityUnknown {
261			untilNext = as.config.retryInterval
262		} else if as.confidence < 3 {
263			untilNext = as.config.retryInterval
264		} else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
265			untilNext *= 2
266		} else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
267			untilNext /= 5
268		}
269
270		if as.lastProbe.Add(untilNext).After(nextProbe) {
271			nextProbe = as.lastProbe.Add(untilNext)
272		}
273	}
274
275	return nextProbe.Sub(fixedNow)
276}
277
278// Update the current status based on an observed result.
279func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
280	currentStatus := as.status.Load().(autoNATResult)
281	if observation.Reachability == network.ReachabilityPublic {
282		log.Debugf("NAT status is public")
283		changed := false
284		if currentStatus.Reachability != network.ReachabilityPublic {
285			// we are flipping our NATStatus, so confidence drops to 0
286			as.confidence = 0
287			if as.service != nil {
288				as.service.Enable()
289			}
290			changed = true
291		} else if as.confidence < 3 {
292			as.confidence++
293		}
294		if observation.address != nil {
295			if !changed && currentStatus.address != nil && !observation.address.Equal(currentStatus.address) {
296				as.confidence--
297			}
298			if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) {
299				changed = true
300			}
301			as.status.Store(observation)
302		}
303		if observation.address != nil && changed {
304			as.emitStatus()
305		}
306	} else if observation.Reachability == network.ReachabilityPrivate {
307		log.Debugf("NAT status is private")
308		if currentStatus.Reachability == network.ReachabilityPublic {
309			if as.confidence > 0 {
310				as.confidence--
311			} else {
312				// we are flipping our NATStatus, so confidence drops to 0
313				as.confidence = 0
314				as.status.Store(observation)
315				if as.service != nil {
316					as.service.Disable()
317				}
318				as.emitStatus()
319			}
320		} else if as.confidence < 3 {
321			as.confidence++
322			as.status.Store(observation)
323			if currentStatus.Reachability != network.ReachabilityPrivate {
324				as.emitStatus()
325			}
326		}
327	} else if as.confidence > 0 {
328		// don't just flip to unknown, reduce confidence first
329		as.confidence--
330	} else {
331		log.Debugf("NAT status is unknown")
332		as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
333		if currentStatus.Reachability != network.ReachabilityUnknown {
334			if as.service != nil {
335				as.service.Enable()
336			}
337			as.emitStatus()
338		}
339	}
340}
341
342func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool {
343	as.lastProbeTry = time.Now()
344	if p.Validate() != nil {
345		return false
346	}
347
348	if lastTime, ok := as.recentProbes[p]; ok {
349		if time.Since(lastTime) < as.throttlePeerPeriod {
350			return false
351		}
352	}
353	as.cleanupRecentProbes()
354
355	info := as.host.Peerstore().PeerInfo(p)
356
357	if !as.config.dialPolicy.skipPeer(info.Addrs) {
358		as.recentProbes[p] = time.Now()
359		as.lastProbe = time.Now()
360		go as.probe(&info)
361		return true
362	}
363	return false
364}
365
366func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
367	cli := NewAutoNATClient(as.host, as.config.addressFunc)
368	ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout)
369	defer cancel()
370
371	a, err := cli.DialBack(ctx, pi.ID)
372
373	var result autoNATResult
374	switch {
375	case err == nil:
376		log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
377		result.Reachability = network.ReachabilityPublic
378		result.address = a
379	case IsDialError(err):
380		log.Debugf("Dialback through %s failed", pi.ID.Pretty())
381		result.Reachability = network.ReachabilityPrivate
382	default:
383		result.Reachability = network.ReachabilityUnknown
384	}
385
386	select {
387	case as.observations <- result:
388	case <-as.ctx.Done():
389		return
390	}
391}
392
393func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
394	peers := as.host.Network().Peers()
395	if len(peers) == 0 {
396		return ""
397	}
398
399	candidates := make([]peer.ID, 0, len(peers))
400
401	for _, p := range peers {
402		info := as.host.Peerstore().PeerInfo(p)
403		// Exclude peers which don't support the autonat protocol.
404		if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil {
405			continue
406		}
407
408		// Exclude peers in backoff.
409		if lastTime, ok := as.recentProbes[p]; ok {
410			if time.Since(lastTime) < as.throttlePeerPeriod {
411				continue
412			}
413		}
414
415		if as.config.dialPolicy.skipPeer(info.Addrs) {
416			continue
417		}
418		candidates = append(candidates, p)
419	}
420
421	if len(candidates) == 0 {
422		return ""
423	}
424
425	shufflePeers(candidates)
426	return candidates[0]
427}
428
429func shufflePeers(peers []peer.ID) {
430	for i := range peers {
431		j := rand.Intn(i + 1)
432		peers[i], peers[j] = peers[j], peers[i]
433	}
434}
435
436// Status returns the AutoNAT observed reachability status.
437func (s *StaticAutoNAT) Status() network.Reachability {
438	return s.reachability
439}
440
441// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
442func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
443	if s.reachability != network.ReachabilityPublic {
444		return nil, errors.New("NAT status is not public")
445	}
446	return nil, errors.New("No available address")
447}
448