1package identify 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "runtime/debug" 8 "sync" 9 "time" 10 11 ic "github.com/libp2p/go-libp2p-core/crypto" 12 "github.com/libp2p/go-libp2p-core/event" 13 "github.com/libp2p/go-libp2p-core/host" 14 "github.com/libp2p/go-libp2p-core/network" 15 "github.com/libp2p/go-libp2p-core/peer" 16 "github.com/libp2p/go-libp2p-core/peerstore" 17 "github.com/libp2p/go-libp2p-core/record" 18 19 "github.com/libp2p/go-eventbus" 20 "github.com/libp2p/go-msgio/protoio" 21 22 pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" 23 24 ma "github.com/multiformats/go-multiaddr" 25 manet "github.com/multiformats/go-multiaddr/net" 26 msmux "github.com/multiformats/go-multistream" 27 28 "github.com/gogo/protobuf/proto" 29 logging "github.com/ipfs/go-log" 30) 31 32var log = logging.Logger("net/identify") 33 34// ID is the protocol.ID of version 1.0.0 of the identify 35// service. 36const ID = "/ipfs/id/1.0.0" 37 38// LibP2PVersion holds the current protocol version for a client running this code 39// TODO(jbenet): fix the versioning mess. 40// XXX: Don't change this till 2020. You'll break all go-ipfs versions prior to 41// 0.4.17 which asserted an exact version match. 42const LibP2PVersion = "ipfs/0.1.0" 43 44// ClientVersion is the default user agent. 45// 46// Deprecated: Set this with the UserAgent option. 47var ClientVersion = "github.com/libp2p/go-libp2p" 48 49var ( 50 legacyIDSize = 2 * 1024 // 2k Bytes 51 signedIDSize = 8 * 1024 // 8K 52 maxMessages = 10 53) 54 55func init() { 56 bi, ok := debug.ReadBuildInfo() 57 if !ok { 58 return 59 } 60 version := bi.Main.Version 61 if version == "(devel)" { 62 ClientVersion = bi.Main.Path 63 } else { 64 ClientVersion = fmt.Sprintf("%s@%s", bi.Main.Path, bi.Main.Version) 65 } 66} 67 68type addPeerHandlerReq struct { 69 rp peer.ID 70 resp chan *peerHandler 71} 72 73type rmPeerHandlerReq struct { 74 p peer.ID 75} 76 77// IDService is a structure that implements ProtocolIdentify. 78// It is a trivial service that gives the other peer some 79// useful information about the local peer. A sort of hello. 80// 81// The IDService sends: 82// * Our IPFS Protocol Version 83// * Our IPFS Agent Version 84// * Our public Listen Addresses 85type IDService struct { 86 Host host.Host 87 UserAgent string 88 89 ctx context.Context 90 ctxCancel context.CancelFunc 91 // ensure we shutdown ONLY once 92 closeSync sync.Once 93 // track resources that need to be shut down before we shut down 94 refCount sync.WaitGroup 95 96 disableSignedPeerRecord bool 97 98 // Identified connections (finished and in progress). 99 connsMu sync.RWMutex 100 conns map[network.Conn]chan struct{} 101 102 addrMu sync.Mutex 103 104 // our own observed addresses. 105 observedAddrs *ObservedAddrManager 106 107 emitters struct { 108 evtPeerProtocolsUpdated event.Emitter 109 evtPeerIdentificationCompleted event.Emitter 110 evtPeerIdentificationFailed event.Emitter 111 } 112 113 addPeerHandlerCh chan addPeerHandlerReq 114 rmPeerHandlerCh chan rmPeerHandlerReq 115} 116 117// NewIDService constructs a new *IDService and activates it by 118// attaching its stream handler to the given host.Host. 119func NewIDService(h host.Host, opts ...Option) *IDService { 120 var cfg config 121 for _, opt := range opts { 122 opt(&cfg) 123 } 124 125 userAgent := ClientVersion 126 if cfg.userAgent != "" { 127 userAgent = cfg.userAgent 128 } 129 130 hostCtx, cancel := context.WithCancel(context.Background()) 131 s := &IDService{ 132 Host: h, 133 UserAgent: userAgent, 134 135 ctx: hostCtx, 136 ctxCancel: cancel, 137 conns: make(map[network.Conn]chan struct{}), 138 observedAddrs: NewObservedAddrManager(hostCtx, h), 139 140 disableSignedPeerRecord: cfg.disableSignedPeerRecord, 141 142 addPeerHandlerCh: make(chan addPeerHandlerReq), 143 rmPeerHandlerCh: make(chan rmPeerHandlerReq), 144 } 145 146 // handle local protocol handler updates, and push deltas to peers. 147 var err error 148 149 s.refCount.Add(1) 150 go s.loop() 151 152 s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{}) 153 if err != nil { 154 log.Warnf("identify service not emitting peer protocol updates; err: %s", err) 155 } 156 s.emitters.evtPeerIdentificationCompleted, err = h.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{}) 157 if err != nil { 158 log.Warnf("identify service not emitting identification completed events; err: %s", err) 159 } 160 s.emitters.evtPeerIdentificationFailed, err = h.EventBus().Emitter(&event.EvtPeerIdentificationFailed{}) 161 if err != nil { 162 log.Warnf("identify service not emitting identification failed events; err: %s", err) 163 } 164 165 // register protocols that do not depend on peer records. 166 h.SetStreamHandler(IDDelta, s.deltaHandler) 167 h.SetStreamHandler(ID, s.sendIdentifyResp) 168 h.SetStreamHandler(IDPush, s.pushHandler) 169 170 h.Network().Notify((*netNotifiee)(s)) 171 return s 172} 173 174func (ids *IDService) loop() { 175 defer ids.refCount.Done() 176 177 phs := make(map[peer.ID]*peerHandler) 178 sub, err := ids.Host.EventBus().Subscribe([]interface{}{&event.EvtLocalProtocolsUpdated{}, 179 &event.EvtLocalAddressesUpdated{}}, eventbus.BufSize(256)) 180 if err != nil { 181 log.Errorf("failed to subscribe to events on the bus, err=%s", err) 182 return 183 } 184 185 phClosedCh := make(chan peer.ID) 186 187 defer func() { 188 sub.Close() 189 // The context will cancel the workers. Now, wait for them to 190 // exit. 191 for range phs { 192 <-phClosedCh 193 } 194 }() 195 196 // Use a fresh context for the handlers. Otherwise, they'll get canceled 197 // before we're ready to shutdown and they'll have "stopped" without us 198 // _calling_ stop. 199 handlerCtx, cancel := context.WithCancel(context.Background()) 200 defer cancel() 201 202 for { 203 select { 204 case addReq := <-ids.addPeerHandlerCh: 205 rp := addReq.rp 206 ph, ok := phs[rp] 207 if !ok && ids.Host.Network().Connectedness(rp) == network.Connected { 208 ph = newPeerHandler(rp, ids) 209 ph.start(handlerCtx, func() { phClosedCh <- rp }) 210 phs[rp] = ph 211 } 212 addReq.resp <- ph 213 case rmReq := <-ids.rmPeerHandlerCh: 214 rp := rmReq.p 215 if ids.Host.Network().Connectedness(rp) != network.Connected { 216 // before we remove the peerhandler, we should ensure that it will not send any 217 // more messages. Otherwise, we might create a new handler and the Identify response 218 // synchronized with the new handler might be overwritten by a message sent by this "old" handler. 219 ph, ok := phs[rp] 220 if !ok { 221 // move on, move on, there's nothing to see here. 222 continue 223 } 224 // This is idempotent if already stopped. 225 ph.stop() 226 } 227 228 case rp := <-phClosedCh: 229 ph := phs[rp] 230 231 // If we are connected to the peer, it means that we got a connection from the peer 232 // before we could finish removing it's handler on the previous disconnection. 233 // If we delete the handler, we wont be able to push updates to it 234 // till we see a new connection. So, we should restart the handler. 235 // The fact that we got the handler on this channel means that it's context and handler 236 // have completed because we write the handler to this chanel only after it closed. 237 if ids.Host.Network().Connectedness(rp) == network.Connected { 238 ph.start(handlerCtx, func() { phClosedCh <- rp }) 239 } else { 240 delete(phs, rp) 241 } 242 243 case e, more := <-sub.Out(): 244 if !more { 245 return 246 } 247 switch e.(type) { 248 case event.EvtLocalAddressesUpdated: 249 for pid := range phs { 250 select { 251 case phs[pid].pushCh <- struct{}{}: 252 default: 253 log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty()) 254 } 255 } 256 257 case event.EvtLocalProtocolsUpdated: 258 for pid := range phs { 259 select { 260 case phs[pid].deltaCh <- struct{}{}: 261 default: 262 log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty()) 263 } 264 } 265 } 266 267 case <-ids.ctx.Done(): 268 return 269 } 270 } 271} 272 273// Close shuts down the IDService 274func (ids *IDService) Close() error { 275 ids.closeSync.Do(func() { 276 ids.ctxCancel() 277 ids.refCount.Wait() 278 }) 279 return nil 280} 281 282// OwnObservedAddrs returns the addresses peers have reported we've dialed from 283func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr { 284 return ids.observedAddrs.Addrs() 285} 286 287func (ids *IDService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr { 288 return ids.observedAddrs.AddrsFor(local) 289} 290 291// IdentifyConn synchronously triggers an identify request on the connection and 292// waits for it to complete. If the connection is being identified by another 293// caller, this call will wait. If the connection has already been identified, 294// it will return immediately. 295func (ids *IDService) IdentifyConn(c network.Conn) { 296 <-ids.IdentifyWait(c) 297} 298 299// IdentifyWait triggers an identify (if the connection has not already been 300// identified) and returns a channel that is closed when the identify protocol 301// completes. 302func (ids *IDService) IdentifyWait(c network.Conn) <-chan struct{} { 303 ids.connsMu.RLock() 304 wait, found := ids.conns[c] 305 ids.connsMu.RUnlock() 306 307 if found { 308 return wait 309 } 310 311 ids.connsMu.Lock() 312 defer ids.connsMu.Unlock() 313 314 wait, found = ids.conns[c] 315 316 if !found { 317 wait = make(chan struct{}) 318 ids.conns[c] = wait 319 320 // Spawn an identify. The connection may actually be closed 321 // already, but that doesn't really matter. We'll fail to open a 322 // stream then forget the connection. 323 go ids.identifyConn(c, wait) 324 } 325 326 return wait 327} 328 329func (ids *IDService) removeConn(c network.Conn) { 330 ids.connsMu.Lock() 331 delete(ids.conns, c) 332 ids.connsMu.Unlock() 333} 334 335func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { 336 var ( 337 s network.Stream 338 err error 339 ) 340 341 defer func() { 342 close(signal) 343 344 // emit the appropriate event. 345 if p := c.RemotePeer(); err == nil { 346 ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{Peer: p}) 347 } else { 348 ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: p, Reason: err}) 349 } 350 }() 351 352 s, err = c.NewStream(context.TODO()) 353 if err != nil { 354 log.Debugw("error opening identify stream", "error", err) 355 // the connection is probably already closed if we hit this. 356 // TODO: Remove this? 357 c.Close() 358 359 // We usually do this on disconnect, but we may have already 360 // processed the disconnect event. 361 ids.removeConn(c) 362 return 363 } 364 s.SetProtocol(ID) 365 366 // ok give the response to our handler. 367 if err = msmux.SelectProtoOrFail(ID, s); err != nil { 368 log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err}) 369 s.Reset() 370 return 371 } 372 ids.handleIdentifyResponse(s) 373} 374 375func (ids *IDService) sendIdentifyResp(s network.Stream) { 376 var ph *peerHandler 377 378 defer func() { 379 _ = s.Close() 380 if ph != nil { 381 ph.snapshotMu.RUnlock() 382 } 383 }() 384 385 c := s.Conn() 386 387 phCh := make(chan *peerHandler, 1) 388 select { 389 case ids.addPeerHandlerCh <- addPeerHandlerReq{c.RemotePeer(), phCh}: 390 case <-ids.ctx.Done(): 391 return 392 } 393 394 select { 395 case ph = <-phCh: 396 case <-ids.ctx.Done(): 397 return 398 } 399 400 if ph == nil { 401 // Peer disconnected, abort. 402 s.Reset() 403 return 404 } 405 406 ph.snapshotMu.RLock() 407 ids.writeChunkedIdentifyMsg(c, ph.snapshot, s) 408 log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) 409} 410 411func (ids *IDService) handleIdentifyResponse(s network.Stream) { 412 c := s.Conn() 413 414 r := protoio.NewDelimitedReader(s, signedIDSize) 415 mes := &pb.Identify{} 416 417 if err := readAllIDMessages(r, mes); err != nil { 418 log.Warning("error reading identify message: ", err) 419 s.Reset() 420 return 421 } 422 423 defer s.Close() 424 425 log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) 426 427 ids.consumeMessage(mes, c) 428} 429 430func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error { 431 mes := &pb.Identify{} 432 for i := 0; i < maxMessages; i++ { 433 switch err := r.ReadMsg(mes); err { 434 case io.EOF: 435 return nil 436 case nil: 437 proto.Merge(finalMsg, mes) 438 default: 439 return err 440 } 441 } 442 443 return fmt.Errorf("too many parts") 444} 445 446func (ids *IDService) getSnapshot() *identifySnapshot { 447 snapshot := new(identifySnapshot) 448 if !ids.disableSignedPeerRecord { 449 if cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()); ok { 450 snapshot.record = cab.GetPeerRecord(ids.Host.ID()) 451 } 452 } 453 snapshot.addrs = ids.Host.Addrs() 454 snapshot.protocols = ids.Host.Mux().Protocols() 455 return snapshot 456} 457 458func (ids *IDService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error { 459 mes := ids.createBaseIdentifyResponse(c, snapshot) 460 sr := ids.getSignedRecord(snapshot) 461 mes.SignedPeerRecord = sr 462 writer := protoio.NewDelimitedWriter(s) 463 464 if sr == nil || proto.Size(mes) <= legacyIDSize { 465 return writer.WriteMsg(mes) 466 } 467 mes.SignedPeerRecord = nil 468 if err := writer.WriteMsg(mes); err != nil { 469 return err 470 } 471 472 // then write just the signed record 473 m := &pb.Identify{SignedPeerRecord: sr} 474 err := writer.WriteMsg(m) 475 return err 476 477} 478 479func (ids *IDService) createBaseIdentifyResponse( 480 conn network.Conn, 481 snapshot *identifySnapshot, 482) *pb.Identify { 483 mes := &pb.Identify{} 484 485 remoteAddr := conn.RemoteMultiaddr() 486 localAddr := conn.LocalMultiaddr() 487 488 // set protocols this node is currently handling 489 mes.Protocols = snapshot.protocols 490 491 // observed address so other side is informed of their 492 // "public" address, at least in relation to us. 493 mes.ObservedAddr = remoteAddr.Bytes() 494 495 // populate unsigned addresses. 496 // peers that do not yet support signed addresses will need this. 497 // Note: LocalMultiaddr is sometimes 0.0.0.0 498 viaLoopback := manet.IsIPLoopback(localAddr) || manet.IsIPLoopback(remoteAddr) 499 mes.ListenAddrs = make([][]byte, 0, len(snapshot.addrs)) 500 for _, addr := range snapshot.addrs { 501 if !viaLoopback && manet.IsIPLoopback(addr) { 502 continue 503 } 504 mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes()) 505 } 506 // set our public key 507 ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID()) 508 509 // check if we even have a public key. 510 if ownKey == nil { 511 // public key is nil. We are either using insecure transport or something erratic happened. 512 // check if we're even operating in "secure mode" 513 if ids.Host.Peerstore().PrivKey(ids.Host.ID()) != nil { 514 // private key is present. But NO public key. Something bad happened. 515 log.Errorf("did not have own public key in Peerstore") 516 } 517 // if neither of the key is present it is safe to assume that we are using an insecure transport. 518 } else { 519 // public key is present. Safe to proceed. 520 if kb, err := ownKey.Bytes(); err != nil { 521 log.Errorf("failed to convert key to bytes") 522 } else { 523 mes.PublicKey = kb 524 } 525 } 526 527 // set protocol versions 528 pv := LibP2PVersion 529 av := ids.UserAgent 530 mes.ProtocolVersion = &pv 531 mes.AgentVersion = &av 532 533 return mes 534} 535 536func (ids *IDService) getSignedRecord(snapshot *identifySnapshot) []byte { 537 if ids.disableSignedPeerRecord || snapshot.record == nil { 538 return nil 539 } 540 541 recBytes, err := snapshot.record.Marshal() 542 if err != nil { 543 log.Errorw("failed to marshal signed record", "err", err) 544 return nil 545 } 546 547 return recBytes 548} 549 550func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) { 551 p := c.RemotePeer() 552 553 // mes.Protocols 554 ids.Host.Peerstore().SetProtocols(p, mes.Protocols...) 555 556 // mes.ObservedAddr 557 ids.consumeObservedAddress(mes.GetObservedAddr(), c) 558 559 // mes.ListenAddrs 560 laddrs := mes.GetListenAddrs() 561 lmaddrs := make([]ma.Multiaddr, 0, len(laddrs)) 562 for _, addr := range laddrs { 563 maddr, err := ma.NewMultiaddrBytes(addr) 564 if err != nil { 565 log.Debugf("%s failed to parse multiaddr from %s %s", ID, 566 p, c.RemoteMultiaddr()) 567 continue 568 } 569 lmaddrs = append(lmaddrs, maddr) 570 } 571 572 // NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote 573 // peer doesn't tell us to do so. Otherwise, we'll advertise it. 574 // 575 // This can cause an "addr-splosion" issue where the network will slowly 576 // gossip and collect observed but unadvertised addresses. Given a NAT 577 // that picks random source ports, this can cause DHT nodes to collect 578 // many undialable addresses for other peers. 579 580 // add certified addresses for the peer, if they sent us a signed peer record 581 // otherwise use the unsigned addresses. 582 var signedPeerRecord *record.Envelope 583 signedPeerRecord, err := signedPeerRecordFromMessage(mes) 584 if err != nil { 585 log.Errorf("error getting peer record from Identify message: %v", err) 586 } 587 588 // Extend the TTLs on the known (probably) good addresses. 589 // Taking the lock ensures that we don't concurrently process a disconnect. 590 ids.addrMu.Lock() 591 ttl := peerstore.RecentlyConnectedAddrTTL 592 if ids.Host.Network().Connectedness(p) == network.Connected { 593 ttl = peerstore.ConnectedAddrTTL 594 } 595 596 // Downgrade connected and recently connected addrs to a temporary TTL. 597 for _, ttl := range []time.Duration{ 598 peerstore.RecentlyConnectedAddrTTL, 599 peerstore.ConnectedAddrTTL, 600 } { 601 ids.Host.Peerstore().UpdateAddrs(p, ttl, peerstore.TempAddrTTL) 602 } 603 604 // add signed addrs if we have them and the peerstore supports them 605 cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()) 606 if ok && signedPeerRecord != nil { 607 _, addErr := cab.ConsumePeerRecord(signedPeerRecord, ttl) 608 if addErr != nil { 609 log.Debugf("error adding signed addrs to peerstore: %v", addErr) 610 } 611 } else { 612 ids.Host.Peerstore().AddAddrs(p, lmaddrs, ttl) 613 } 614 615 // Finally, expire all temporary addrs. 616 ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0) 617 ids.addrMu.Unlock() 618 619 log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) 620 621 // get protocol versions 622 pv := mes.GetProtocolVersion() 623 av := mes.GetAgentVersion() 624 625 ids.Host.Peerstore().Put(p, "ProtocolVersion", pv) 626 ids.Host.Peerstore().Put(p, "AgentVersion", av) 627 628 // get the key from the other side. we may not have it (no-auth transport) 629 ids.consumeReceivedPubKey(c, mes.PublicKey) 630} 631 632func (ids *IDService) consumeReceivedPubKey(c network.Conn, kb []byte) { 633 lp := c.LocalPeer() 634 rp := c.RemotePeer() 635 636 if kb == nil { 637 log.Debugf("%s did not receive public key for remote peer: %s", lp, rp) 638 return 639 } 640 641 newKey, err := ic.UnmarshalPublicKey(kb) 642 if err != nil { 643 log.Warningf("%s cannot unmarshal key from remote peer: %s, %s", lp, rp, err) 644 return 645 } 646 647 // verify key matches peer.ID 648 np, err := peer.IDFromPublicKey(newKey) 649 if err != nil { 650 log.Debugf("%s cannot get peer.ID from key of remote peer: %s, %s", lp, rp, err) 651 return 652 } 653 654 if np != rp { 655 // if the newKey's peer.ID does not match known peer.ID... 656 657 if rp == "" && np != "" { 658 // if local peerid is empty, then use the new, sent key. 659 err := ids.Host.Peerstore().AddPubKey(rp, newKey) 660 if err != nil { 661 log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err) 662 } 663 664 } else { 665 // we have a local peer.ID and it does not match the sent key... error. 666 log.Errorf("%s received key for remote peer %s mismatch: %s", lp, rp, np) 667 } 668 return 669 } 670 671 currKey := ids.Host.Peerstore().PubKey(rp) 672 if currKey == nil { 673 // no key? no auth transport. set this one. 674 err := ids.Host.Peerstore().AddPubKey(rp, newKey) 675 if err != nil { 676 log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err) 677 } 678 return 679 } 680 681 // ok, we have a local key, we should verify they match. 682 if currKey.Equals(newKey) { 683 return // ok great. we're done. 684 } 685 686 // weird, got a different key... but the different key MATCHES the peer.ID. 687 // this odd. let's log error and investigate. this should basically never happen 688 // and it means we have something funky going on and possibly a bug. 689 log.Errorf("%s identify got a different key for: %s", lp, rp) 690 691 // okay... does ours NOT match the remote peer.ID? 692 cp, err := peer.IDFromPublicKey(currKey) 693 if err != nil { 694 log.Errorf("%s cannot get peer.ID from local key of remote peer: %s, %s", lp, rp, err) 695 return 696 } 697 if cp != rp { 698 log.Errorf("%s local key for remote peer %s yields different peer.ID: %s", lp, rp, cp) 699 return 700 } 701 702 // okay... curr key DOES NOT match new key. both match peer.ID. wat? 703 log.Errorf("%s local key and received key for %s do not match, but match peer.ID", lp, rp) 704} 705 706// HasConsistentTransport returns true if the address 'a' shares a 707// protocol set with any address in the green set. This is used 708// to check if a given address might be one of the addresses a peer is 709// listening on. 710func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool { 711 protosMatch := func(a, b []ma.Protocol) bool { 712 if len(a) != len(b) { 713 return false 714 } 715 716 for i, p := range a { 717 if b[i].Code != p.Code { 718 return false 719 } 720 } 721 return true 722 } 723 724 protos := a.Protocols() 725 726 for _, ga := range green { 727 if protosMatch(protos, ga.Protocols()) { 728 return true 729 } 730 } 731 732 return false 733} 734 735func (ids *IDService) consumeObservedAddress(observed []byte, c network.Conn) { 736 if observed == nil { 737 return 738 } 739 740 maddr, err := ma.NewMultiaddrBytes(observed) 741 if err != nil { 742 log.Debugf("error parsing received observed addr for %s: %s", c, err) 743 return 744 } 745 746 ids.observedAddrs.Record(c, maddr) 747} 748 749func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { 750 for _, b := range as { 751 if a.Equal(b) { 752 return true 753 } 754 } 755 return false 756} 757 758func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) { 759 if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 { 760 return nil, nil 761 } 762 env, _, err := record.ConsumeEnvelope(msg.SignedPeerRecord, peer.PeerRecordEnvelopeDomain) 763 return env, err 764} 765 766// netNotifiee defines methods to be used with the IpfsDHT 767type netNotifiee IDService 768 769func (nn *netNotifiee) IDService() *IDService { 770 return (*IDService)(nn) 771} 772 773func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { 774 nn.IDService().IdentifyWait(v) 775} 776 777func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { 778 ids := nn.IDService() 779 780 // Stop tracking the connection. 781 ids.removeConn(v) 782 783 // undo the setting of addresses to peer.ConnectedAddrTTL we did 784 ids.addrMu.Lock() 785 defer ids.addrMu.Unlock() 786 787 if ids.Host.Network().Connectedness(v.RemotePeer()) != network.Connected { 788 // consider removing the peer handler for this 789 select { 790 case ids.rmPeerHandlerCh <- rmPeerHandlerReq{v.RemotePeer()}: 791 case <-ids.ctx.Done(): 792 return 793 } 794 795 // Last disconnect. 796 ps := ids.Host.Peerstore() 797 ps.UpdateAddrs(v.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) 798 } 799} 800 801func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {} 802func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} 803func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} 804func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} 805