1package identify
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"runtime/debug"
8	"sync"
9	"time"
10
11	ic "github.com/libp2p/go-libp2p-core/crypto"
12	"github.com/libp2p/go-libp2p-core/event"
13	"github.com/libp2p/go-libp2p-core/host"
14	"github.com/libp2p/go-libp2p-core/network"
15	"github.com/libp2p/go-libp2p-core/peer"
16	"github.com/libp2p/go-libp2p-core/peerstore"
17	"github.com/libp2p/go-libp2p-core/record"
18
19	"github.com/libp2p/go-eventbus"
20	"github.com/libp2p/go-msgio/protoio"
21
22	pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
23
24	ma "github.com/multiformats/go-multiaddr"
25	manet "github.com/multiformats/go-multiaddr/net"
26	msmux "github.com/multiformats/go-multistream"
27
28	"github.com/gogo/protobuf/proto"
29	logging "github.com/ipfs/go-log"
30)
31
32var log = logging.Logger("net/identify")
33
34// ID is the protocol.ID of version 1.0.0 of the identify
35// service.
36const ID = "/ipfs/id/1.0.0"
37
38// LibP2PVersion holds the current protocol version for a client running this code
39// TODO(jbenet): fix the versioning mess.
40// XXX: Don't change this till 2020. You'll break all go-ipfs versions prior to
41// 0.4.17 which asserted an exact version match.
42const LibP2PVersion = "ipfs/0.1.0"
43
44// ClientVersion is the default user agent.
45//
46// Deprecated: Set this with the UserAgent option.
47var ClientVersion = "github.com/libp2p/go-libp2p"
48
49var (
50	legacyIDSize = 2 * 1024 // 2k Bytes
51	signedIDSize = 8 * 1024 // 8K
52	maxMessages  = 10
53)
54
55func init() {
56	bi, ok := debug.ReadBuildInfo()
57	if !ok {
58		return
59	}
60	version := bi.Main.Version
61	if version == "(devel)" {
62		ClientVersion = bi.Main.Path
63	} else {
64		ClientVersion = fmt.Sprintf("%s@%s", bi.Main.Path, bi.Main.Version)
65	}
66}
67
68type addPeerHandlerReq struct {
69	rp   peer.ID
70	resp chan *peerHandler
71}
72
73type rmPeerHandlerReq struct {
74	p peer.ID
75}
76
77// IDService is a structure that implements ProtocolIdentify.
78// It is a trivial service that gives the other peer some
79// useful information about the local peer. A sort of hello.
80//
81// The IDService sends:
82//  * Our IPFS Protocol Version
83//  * Our IPFS Agent Version
84//  * Our public Listen Addresses
85type IDService struct {
86	Host      host.Host
87	UserAgent string
88
89	ctx       context.Context
90	ctxCancel context.CancelFunc
91	// ensure we shutdown ONLY once
92	closeSync sync.Once
93	// track resources that need to be shut down before we shut down
94	refCount sync.WaitGroup
95
96	disableSignedPeerRecord bool
97
98	// Identified connections (finished and in progress).
99	connsMu sync.RWMutex
100	conns   map[network.Conn]chan struct{}
101
102	addrMu sync.Mutex
103
104	// our own observed addresses.
105	observedAddrs *ObservedAddrManager
106
107	emitters struct {
108		evtPeerProtocolsUpdated        event.Emitter
109		evtPeerIdentificationCompleted event.Emitter
110		evtPeerIdentificationFailed    event.Emitter
111	}
112
113	addPeerHandlerCh chan addPeerHandlerReq
114	rmPeerHandlerCh  chan rmPeerHandlerReq
115}
116
117// NewIDService constructs a new *IDService and activates it by
118// attaching its stream handler to the given host.Host.
119func NewIDService(h host.Host, opts ...Option) *IDService {
120	var cfg config
121	for _, opt := range opts {
122		opt(&cfg)
123	}
124
125	userAgent := ClientVersion
126	if cfg.userAgent != "" {
127		userAgent = cfg.userAgent
128	}
129
130	hostCtx, cancel := context.WithCancel(context.Background())
131	s := &IDService{
132		Host:      h,
133		UserAgent: userAgent,
134
135		ctx:           hostCtx,
136		ctxCancel:     cancel,
137		conns:         make(map[network.Conn]chan struct{}),
138		observedAddrs: NewObservedAddrManager(hostCtx, h),
139
140		disableSignedPeerRecord: cfg.disableSignedPeerRecord,
141
142		addPeerHandlerCh: make(chan addPeerHandlerReq),
143		rmPeerHandlerCh:  make(chan rmPeerHandlerReq),
144	}
145
146	// handle local protocol handler updates, and push deltas to peers.
147	var err error
148
149	s.refCount.Add(1)
150	go s.loop()
151
152	s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
153	if err != nil {
154		log.Warnf("identify service not emitting peer protocol updates; err: %s", err)
155	}
156	s.emitters.evtPeerIdentificationCompleted, err = h.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
157	if err != nil {
158		log.Warnf("identify service not emitting identification completed events; err: %s", err)
159	}
160	s.emitters.evtPeerIdentificationFailed, err = h.EventBus().Emitter(&event.EvtPeerIdentificationFailed{})
161	if err != nil {
162		log.Warnf("identify service not emitting identification failed events; err: %s", err)
163	}
164
165	// register protocols that do not depend on peer records.
166	h.SetStreamHandler(IDDelta, s.deltaHandler)
167	h.SetStreamHandler(ID, s.sendIdentifyResp)
168	h.SetStreamHandler(IDPush, s.pushHandler)
169
170	h.Network().Notify((*netNotifiee)(s))
171	return s
172}
173
174func (ids *IDService) loop() {
175	defer ids.refCount.Done()
176
177	phs := make(map[peer.ID]*peerHandler)
178	sub, err := ids.Host.EventBus().Subscribe([]interface{}{&event.EvtLocalProtocolsUpdated{},
179		&event.EvtLocalAddressesUpdated{}}, eventbus.BufSize(256))
180	if err != nil {
181		log.Errorf("failed to subscribe to events on the bus, err=%s", err)
182		return
183	}
184
185	phClosedCh := make(chan peer.ID)
186
187	defer func() {
188		sub.Close()
189		// The context will cancel the workers. Now, wait for them to
190		// exit.
191		for range phs {
192			<-phClosedCh
193		}
194	}()
195
196	// Use a fresh context for the handlers. Otherwise, they'll get canceled
197	// before we're ready to shutdown and they'll have "stopped" without us
198	// _calling_ stop.
199	handlerCtx, cancel := context.WithCancel(context.Background())
200	defer cancel()
201
202	for {
203		select {
204		case addReq := <-ids.addPeerHandlerCh:
205			rp := addReq.rp
206			ph, ok := phs[rp]
207			if !ok && ids.Host.Network().Connectedness(rp) == network.Connected {
208				ph = newPeerHandler(rp, ids)
209				ph.start(handlerCtx, func() { phClosedCh <- rp })
210				phs[rp] = ph
211			}
212			addReq.resp <- ph
213		case rmReq := <-ids.rmPeerHandlerCh:
214			rp := rmReq.p
215			if ids.Host.Network().Connectedness(rp) != network.Connected {
216				// before we remove the peerhandler, we should ensure that it will not send any
217				// more messages. Otherwise, we might create a new handler and the Identify response
218				// synchronized with the new handler might be overwritten by a message sent by this "old" handler.
219				ph, ok := phs[rp]
220				if !ok {
221					// move on, move on, there's nothing to see here.
222					continue
223				}
224				// This is idempotent if already stopped.
225				ph.stop()
226			}
227
228		case rp := <-phClosedCh:
229			ph := phs[rp]
230
231			// If we are connected to the peer, it means that we got a connection from the peer
232			// before we could finish removing it's handler on the previous disconnection.
233			// If we delete the handler, we wont be able to push updates to it
234			// till we see a new connection. So, we should restart the handler.
235			// The fact that we got the handler on this channel means that it's context and handler
236			// have completed because we write the handler to this chanel only after it closed.
237			if ids.Host.Network().Connectedness(rp) == network.Connected {
238				ph.start(handlerCtx, func() { phClosedCh <- rp })
239			} else {
240				delete(phs, rp)
241			}
242
243		case e, more := <-sub.Out():
244			if !more {
245				return
246			}
247			switch e.(type) {
248			case event.EvtLocalAddressesUpdated:
249				for pid := range phs {
250					select {
251					case phs[pid].pushCh <- struct{}{}:
252					default:
253						log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty())
254					}
255				}
256
257			case event.EvtLocalProtocolsUpdated:
258				for pid := range phs {
259					select {
260					case phs[pid].deltaCh <- struct{}{}:
261					default:
262						log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty())
263					}
264				}
265			}
266
267		case <-ids.ctx.Done():
268			return
269		}
270	}
271}
272
273// Close shuts down the IDService
274func (ids *IDService) Close() error {
275	ids.closeSync.Do(func() {
276		ids.ctxCancel()
277		ids.refCount.Wait()
278	})
279	return nil
280}
281
282// OwnObservedAddrs returns the addresses peers have reported we've dialed from
283func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr {
284	return ids.observedAddrs.Addrs()
285}
286
287func (ids *IDService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr {
288	return ids.observedAddrs.AddrsFor(local)
289}
290
291// IdentifyConn synchronously triggers an identify request on the connection and
292// waits for it to complete. If the connection is being identified by another
293// caller, this call will wait. If the connection has already been identified,
294// it will return immediately.
295func (ids *IDService) IdentifyConn(c network.Conn) {
296	<-ids.IdentifyWait(c)
297}
298
299// IdentifyWait triggers an identify (if the connection has not already been
300// identified) and returns a channel that is closed when the identify protocol
301// completes.
302func (ids *IDService) IdentifyWait(c network.Conn) <-chan struct{} {
303	ids.connsMu.RLock()
304	wait, found := ids.conns[c]
305	ids.connsMu.RUnlock()
306
307	if found {
308		return wait
309	}
310
311	ids.connsMu.Lock()
312	defer ids.connsMu.Unlock()
313
314	wait, found = ids.conns[c]
315
316	if !found {
317		wait = make(chan struct{})
318		ids.conns[c] = wait
319
320		// Spawn an identify. The connection may actually be closed
321		// already, but that doesn't really matter. We'll fail to open a
322		// stream then forget the connection.
323		go ids.identifyConn(c, wait)
324	}
325
326	return wait
327}
328
329func (ids *IDService) removeConn(c network.Conn) {
330	ids.connsMu.Lock()
331	delete(ids.conns, c)
332	ids.connsMu.Unlock()
333}
334
335func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
336	var (
337		s   network.Stream
338		err error
339	)
340
341	defer func() {
342		close(signal)
343
344		// emit the appropriate event.
345		if p := c.RemotePeer(); err == nil {
346			ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{Peer: p})
347		} else {
348			ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: p, Reason: err})
349		}
350	}()
351
352	s, err = c.NewStream(context.TODO())
353	if err != nil {
354		log.Debugw("error opening identify stream", "error", err)
355		// the connection is probably already closed if we hit this.
356		// TODO: Remove this?
357		c.Close()
358
359		// We usually do this on disconnect, but we may have already
360		// processed the disconnect event.
361		ids.removeConn(c)
362		return
363	}
364	s.SetProtocol(ID)
365
366	// ok give the response to our handler.
367	if err = msmux.SelectProtoOrFail(ID, s); err != nil {
368		log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err})
369		s.Reset()
370		return
371	}
372	ids.handleIdentifyResponse(s)
373}
374
375func (ids *IDService) sendIdentifyResp(s network.Stream) {
376	var ph *peerHandler
377
378	defer func() {
379		_ = s.Close()
380		if ph != nil {
381			ph.snapshotMu.RUnlock()
382		}
383	}()
384
385	c := s.Conn()
386
387	phCh := make(chan *peerHandler, 1)
388	select {
389	case ids.addPeerHandlerCh <- addPeerHandlerReq{c.RemotePeer(), phCh}:
390	case <-ids.ctx.Done():
391		return
392	}
393
394	select {
395	case ph = <-phCh:
396	case <-ids.ctx.Done():
397		return
398	}
399
400	if ph == nil {
401		// Peer disconnected, abort.
402		s.Reset()
403		return
404	}
405
406	ph.snapshotMu.RLock()
407	ids.writeChunkedIdentifyMsg(c, ph.snapshot, s)
408	log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
409}
410
411func (ids *IDService) handleIdentifyResponse(s network.Stream) {
412	c := s.Conn()
413
414	r := protoio.NewDelimitedReader(s, signedIDSize)
415	mes := &pb.Identify{}
416
417	if err := readAllIDMessages(r, mes); err != nil {
418		log.Warning("error reading identify message: ", err)
419		s.Reset()
420		return
421	}
422
423	defer s.Close()
424
425	log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())
426
427	ids.consumeMessage(mes, c)
428}
429
430func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error {
431	mes := &pb.Identify{}
432	for i := 0; i < maxMessages; i++ {
433		switch err := r.ReadMsg(mes); err {
434		case io.EOF:
435			return nil
436		case nil:
437			proto.Merge(finalMsg, mes)
438		default:
439			return err
440		}
441	}
442
443	return fmt.Errorf("too many parts")
444}
445
446func (ids *IDService) getSnapshot() *identifySnapshot {
447	snapshot := new(identifySnapshot)
448	if !ids.disableSignedPeerRecord {
449		if cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()); ok {
450			snapshot.record = cab.GetPeerRecord(ids.Host.ID())
451		}
452	}
453	snapshot.addrs = ids.Host.Addrs()
454	snapshot.protocols = ids.Host.Mux().Protocols()
455	return snapshot
456}
457
458func (ids *IDService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error {
459	mes := ids.createBaseIdentifyResponse(c, snapshot)
460	sr := ids.getSignedRecord(snapshot)
461	mes.SignedPeerRecord = sr
462	writer := protoio.NewDelimitedWriter(s)
463
464	if sr == nil || proto.Size(mes) <= legacyIDSize {
465		return writer.WriteMsg(mes)
466	}
467	mes.SignedPeerRecord = nil
468	if err := writer.WriteMsg(mes); err != nil {
469		return err
470	}
471
472	// then write just the signed record
473	m := &pb.Identify{SignedPeerRecord: sr}
474	err := writer.WriteMsg(m)
475	return err
476
477}
478
479func (ids *IDService) createBaseIdentifyResponse(
480	conn network.Conn,
481	snapshot *identifySnapshot,
482) *pb.Identify {
483	mes := &pb.Identify{}
484
485	remoteAddr := conn.RemoteMultiaddr()
486	localAddr := conn.LocalMultiaddr()
487
488	// set protocols this node is currently handling
489	mes.Protocols = snapshot.protocols
490
491	// observed address so other side is informed of their
492	// "public" address, at least in relation to us.
493	mes.ObservedAddr = remoteAddr.Bytes()
494
495	// populate unsigned addresses.
496	// peers that do not yet support signed addresses will need this.
497	// Note: LocalMultiaddr is sometimes 0.0.0.0
498	viaLoopback := manet.IsIPLoopback(localAddr) || manet.IsIPLoopback(remoteAddr)
499	mes.ListenAddrs = make([][]byte, 0, len(snapshot.addrs))
500	for _, addr := range snapshot.addrs {
501		if !viaLoopback && manet.IsIPLoopback(addr) {
502			continue
503		}
504		mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
505	}
506	// set our public key
507	ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID())
508
509	// check if we even have a public key.
510	if ownKey == nil {
511		// public key is nil. We are either using insecure transport or something erratic happened.
512		// check if we're even operating in "secure mode"
513		if ids.Host.Peerstore().PrivKey(ids.Host.ID()) != nil {
514			// private key is present. But NO public key. Something bad happened.
515			log.Errorf("did not have own public key in Peerstore")
516		}
517		// if neither of the key is present it is safe to assume that we are using an insecure transport.
518	} else {
519		// public key is present. Safe to proceed.
520		if kb, err := ownKey.Bytes(); err != nil {
521			log.Errorf("failed to convert key to bytes")
522		} else {
523			mes.PublicKey = kb
524		}
525	}
526
527	// set protocol versions
528	pv := LibP2PVersion
529	av := ids.UserAgent
530	mes.ProtocolVersion = &pv
531	mes.AgentVersion = &av
532
533	return mes
534}
535
536func (ids *IDService) getSignedRecord(snapshot *identifySnapshot) []byte {
537	if ids.disableSignedPeerRecord || snapshot.record == nil {
538		return nil
539	}
540
541	recBytes, err := snapshot.record.Marshal()
542	if err != nil {
543		log.Errorw("failed to marshal signed record", "err", err)
544		return nil
545	}
546
547	return recBytes
548}
549
550func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
551	p := c.RemotePeer()
552
553	// mes.Protocols
554	ids.Host.Peerstore().SetProtocols(p, mes.Protocols...)
555
556	// mes.ObservedAddr
557	ids.consumeObservedAddress(mes.GetObservedAddr(), c)
558
559	// mes.ListenAddrs
560	laddrs := mes.GetListenAddrs()
561	lmaddrs := make([]ma.Multiaddr, 0, len(laddrs))
562	for _, addr := range laddrs {
563		maddr, err := ma.NewMultiaddrBytes(addr)
564		if err != nil {
565			log.Debugf("%s failed to parse multiaddr from %s %s", ID,
566				p, c.RemoteMultiaddr())
567			continue
568		}
569		lmaddrs = append(lmaddrs, maddr)
570	}
571
572	// NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote
573	// peer doesn't tell us to do so. Otherwise, we'll advertise it.
574	//
575	// This can cause an "addr-splosion" issue where the network will slowly
576	// gossip and collect observed but unadvertised addresses. Given a NAT
577	// that picks random source ports, this can cause DHT nodes to collect
578	// many undialable addresses for other peers.
579
580	// add certified addresses for the peer, if they sent us a signed peer record
581	// otherwise use the unsigned addresses.
582	var signedPeerRecord *record.Envelope
583	signedPeerRecord, err := signedPeerRecordFromMessage(mes)
584	if err != nil {
585		log.Errorf("error getting peer record from Identify message: %v", err)
586	}
587
588	// Extend the TTLs on the known (probably) good addresses.
589	// Taking the lock ensures that we don't concurrently process a disconnect.
590	ids.addrMu.Lock()
591	ttl := peerstore.RecentlyConnectedAddrTTL
592	if ids.Host.Network().Connectedness(p) == network.Connected {
593		ttl = peerstore.ConnectedAddrTTL
594	}
595
596	// Downgrade connected and recently connected addrs to a temporary TTL.
597	for _, ttl := range []time.Duration{
598		peerstore.RecentlyConnectedAddrTTL,
599		peerstore.ConnectedAddrTTL,
600	} {
601		ids.Host.Peerstore().UpdateAddrs(p, ttl, peerstore.TempAddrTTL)
602	}
603
604	// add signed addrs if we have them and the peerstore supports them
605	cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore())
606	if ok && signedPeerRecord != nil {
607		_, addErr := cab.ConsumePeerRecord(signedPeerRecord, ttl)
608		if addErr != nil {
609			log.Debugf("error adding signed addrs to peerstore: %v", addErr)
610		}
611	} else {
612		ids.Host.Peerstore().AddAddrs(p, lmaddrs, ttl)
613	}
614
615	// Finally, expire all temporary addrs.
616	ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0)
617	ids.addrMu.Unlock()
618
619	log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
620
621	// get protocol versions
622	pv := mes.GetProtocolVersion()
623	av := mes.GetAgentVersion()
624
625	ids.Host.Peerstore().Put(p, "ProtocolVersion", pv)
626	ids.Host.Peerstore().Put(p, "AgentVersion", av)
627
628	// get the key from the other side. we may not have it (no-auth transport)
629	ids.consumeReceivedPubKey(c, mes.PublicKey)
630}
631
632func (ids *IDService) consumeReceivedPubKey(c network.Conn, kb []byte) {
633	lp := c.LocalPeer()
634	rp := c.RemotePeer()
635
636	if kb == nil {
637		log.Debugf("%s did not receive public key for remote peer: %s", lp, rp)
638		return
639	}
640
641	newKey, err := ic.UnmarshalPublicKey(kb)
642	if err != nil {
643		log.Warningf("%s cannot unmarshal key from remote peer: %s, %s", lp, rp, err)
644		return
645	}
646
647	// verify key matches peer.ID
648	np, err := peer.IDFromPublicKey(newKey)
649	if err != nil {
650		log.Debugf("%s cannot get peer.ID from key of remote peer: %s, %s", lp, rp, err)
651		return
652	}
653
654	if np != rp {
655		// if the newKey's peer.ID does not match known peer.ID...
656
657		if rp == "" && np != "" {
658			// if local peerid is empty, then use the new, sent key.
659			err := ids.Host.Peerstore().AddPubKey(rp, newKey)
660			if err != nil {
661				log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err)
662			}
663
664		} else {
665			// we have a local peer.ID and it does not match the sent key... error.
666			log.Errorf("%s received key for remote peer %s mismatch: %s", lp, rp, np)
667		}
668		return
669	}
670
671	currKey := ids.Host.Peerstore().PubKey(rp)
672	if currKey == nil {
673		// no key? no auth transport. set this one.
674		err := ids.Host.Peerstore().AddPubKey(rp, newKey)
675		if err != nil {
676			log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err)
677		}
678		return
679	}
680
681	// ok, we have a local key, we should verify they match.
682	if currKey.Equals(newKey) {
683		return // ok great. we're done.
684	}
685
686	// weird, got a different key... but the different key MATCHES the peer.ID.
687	// this odd. let's log error and investigate. this should basically never happen
688	// and it means we have something funky going on and possibly a bug.
689	log.Errorf("%s identify got a different key for: %s", lp, rp)
690
691	// okay... does ours NOT match the remote peer.ID?
692	cp, err := peer.IDFromPublicKey(currKey)
693	if err != nil {
694		log.Errorf("%s cannot get peer.ID from local key of remote peer: %s, %s", lp, rp, err)
695		return
696	}
697	if cp != rp {
698		log.Errorf("%s local key for remote peer %s yields different peer.ID: %s", lp, rp, cp)
699		return
700	}
701
702	// okay... curr key DOES NOT match new key. both match peer.ID. wat?
703	log.Errorf("%s local key and received key for %s do not match, but match peer.ID", lp, rp)
704}
705
706// HasConsistentTransport returns true if the address 'a' shares a
707// protocol set with any address in the green set. This is used
708// to check if a given address might be one of the addresses a peer is
709// listening on.
710func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool {
711	protosMatch := func(a, b []ma.Protocol) bool {
712		if len(a) != len(b) {
713			return false
714		}
715
716		for i, p := range a {
717			if b[i].Code != p.Code {
718				return false
719			}
720		}
721		return true
722	}
723
724	protos := a.Protocols()
725
726	for _, ga := range green {
727		if protosMatch(protos, ga.Protocols()) {
728			return true
729		}
730	}
731
732	return false
733}
734
735func (ids *IDService) consumeObservedAddress(observed []byte, c network.Conn) {
736	if observed == nil {
737		return
738	}
739
740	maddr, err := ma.NewMultiaddrBytes(observed)
741	if err != nil {
742		log.Debugf("error parsing received observed addr for %s: %s", c, err)
743		return
744	}
745
746	ids.observedAddrs.Record(c, maddr)
747}
748
749func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
750	for _, b := range as {
751		if a.Equal(b) {
752			return true
753		}
754	}
755	return false
756}
757
758func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) {
759	if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 {
760		return nil, nil
761	}
762	env, _, err := record.ConsumeEnvelope(msg.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
763	return env, err
764}
765
766// netNotifiee defines methods to be used with the IpfsDHT
767type netNotifiee IDService
768
769func (nn *netNotifiee) IDService() *IDService {
770	return (*IDService)(nn)
771}
772
773func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
774	nn.IDService().IdentifyWait(v)
775}
776
777func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
778	ids := nn.IDService()
779
780	// Stop tracking the connection.
781	ids.removeConn(v)
782
783	// undo the setting of addresses to peer.ConnectedAddrTTL we did
784	ids.addrMu.Lock()
785	defer ids.addrMu.Unlock()
786
787	if ids.Host.Network().Connectedness(v.RemotePeer()) != network.Connected {
788		// consider removing the peer handler for this
789		select {
790		case ids.rmPeerHandlerCh <- rmPeerHandlerReq{v.RemotePeer()}:
791		case <-ids.ctx.Done():
792			return
793		}
794
795		// Last disconnect.
796		ps := ids.Host.Peerstore()
797		ps.UpdateAddrs(v.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
798	}
799}
800
801func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {}
802func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
803func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr)         {}
804func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr)    {}
805