1package pubsub 2 3import ( 4 "context" 5 "fmt" 6 "math/rand" 7 "sort" 8 "time" 9 10 pb "github.com/libp2p/go-libp2p-pubsub/pb" 11 12 "github.com/libp2p/go-libp2p-core/host" 13 "github.com/libp2p/go-libp2p-core/network" 14 "github.com/libp2p/go-libp2p-core/peer" 15 "github.com/libp2p/go-libp2p-core/peerstore" 16 "github.com/libp2p/go-libp2p-core/protocol" 17 "github.com/libp2p/go-libp2p-core/record" 18) 19 20const ( 21 // GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol. 22 // It is advertised along with GossipSubID_v11 for backwards compatibility. 23 GossipSubID_v10 = protocol.ID("/meshsub/1.0.0") 24 25 // GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol. 26 // See the spec for details about how v1.1.0 compares to v1.0.0: 27 // https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md 28 GossipSubID_v11 = protocol.ID("/meshsub/1.1.0") 29) 30 31var ( 32 // overlay parameters. 33 34 // GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6, 35 // each peer will want to have about six peers in their mesh for each topic they're subscribed to. 36 // GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi. 37 GossipSubD = 6 38 39 // GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh. 40 // If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at 41 // the next heartbeat. 42 GossipSubDlo = 5 43 44 // GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh. 45 // If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat. 46 GossipSubDhi = 12 47 48 // GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription. 49 // At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are 50 // chosen randomly. 51 GossipSubDscore = 4 52 53 // GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh. 54 // When the mesh is pruned due to over subscription, we make sure that we have outbound connections 55 // to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming 56 // our mesh with incoming connections. 57 // 58 // GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2. 59 GossipSubDout = 2 60 61 // gossip parameters 62 63 // GossipSubHistoryLength controls the size of the message cache used for gossip. 64 // The message cache will remember messages for GossipSubHistoryLength heartbeats. 65 GossipSubHistoryLength = 5 66 67 // GossipSubHistoryGossip controls how many cached message ids we will advertise in 68 // IHAVE gossip messages. When asked for our seen message IDs, we will return 69 // only those from the most recent GossipSubHistoryGossip heartbeats. The slack between 70 // GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages 71 // that will be expired by the time they're requested. 72 // 73 // GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to 74 // avoid a runtime panic. 75 GossipSubHistoryGossip = 3 76 77 // GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat. 78 // We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual 79 // number may be more, depending on GossipSubGossipFactor and how many peers we're 80 // connected to. 81 GossipSubDlazy = 6 82 83 // GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat. 84 // We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or 85 // GossipSubDlazy, whichever is greater. 86 GossipSubGossipFactor = 0.25 87 88 // GossipSubGossipRetransmission controls how many times we will allow a peer to request 89 // the same message id through IWANT gossip before we start ignoring them. This is designed 90 // to prevent peers from spamming us with requests and wasting our resources. 91 GossipSubGossipRetransmission = 3 92 93 // heartbeat interval 94 95 // GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins 96 // after the router is initialized. 97 GossipSubHeartbeatInitialDelay = 100 * time.Millisecond 98 99 // GossipSubHeartbeatInterval controls the time between heartbeats. 100 GossipSubHeartbeatInterval = 1 * time.Second 101 102 // GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been 103 // GossipSubFanoutTTL since we've published to a topic that we're not subscribed to, 104 // we'll delete the fanout map for that topic. 105 GossipSubFanoutTTL = 60 * time.Second 106 107 // GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange. 108 // When we prune a peer that's eligible for PX (has a good score, etc), we will try to 109 // send them signed peer records for up to GossipSubPrunePeers other peers that we 110 // know of. 111 GossipSubPrunePeers = 16 112 113 // GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long 114 // a peer must wait before attempting to graft into our mesh again after being pruned. 115 // When pruning a peer, we send them our value of GossipSubPruneBackoff so they know 116 // the minimum time to wait. Peers running older versions may not send a backoff time, 117 // so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff 118 // before attempting to re-graft. 119 GossipSubPruneBackoff = time.Minute 120 121 // GossipSubConnectors controls the number of active connection attempts for peers obtained through PX. 122 GossipSubConnectors = 8 123 124 // GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px. 125 GossipSubMaxPendingConnections = 128 126 127 // GossipSubConnectionTimeout controls the timeout for connection attempts. 128 GossipSubConnectionTimeout = 30 * time.Second 129 130 // GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers 131 // that are not currently connected. 132 GossipSubDirectConnectTicks uint64 = 300 133 134 // GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers 135 GossipSubDirectConnectInitialDelay = time.Second 136 137 // GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh 138 // with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some 139 // high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls 140 // below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). 141 GossipSubOpportunisticGraftTicks uint64 = 60 142 143 // GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft. 144 GossipSubOpportunisticGraftPeers = 2 145 146 // If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE, 147 // then there is an extra score penalty applied to the peer through P7. 148 GossipSubGraftFloodThreshold = 10 * time.Second 149 150 // GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message. 151 // Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a 152 // peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the 153 // default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats; 154 // with the defaults this is 1666 messages/s. 155 GossipSubMaxIHaveLength = 5000 156 157 // GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat. 158 GossipSubMaxIHaveMessages = 10 159 160 // Time to wait for a message requested through IWANT following an IHAVE advertisement. 161 // If the message is not received within this window, a broken promise is declared and 162 // the router may apply bahavioural penalties. 163 GossipSubIWantFollowupTime = 3 * time.Second 164) 165 166// NewGossipSub returns a new PubSub object using GossipSubRouter as the router. 167func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { 168 rt := &GossipSubRouter{ 169 peers: make(map[peer.ID]protocol.ID), 170 mesh: make(map[string]map[peer.ID]struct{}), 171 fanout: make(map[string]map[peer.ID]struct{}), 172 lastpub: make(map[string]int64), 173 gossip: make(map[peer.ID][]*pb.ControlIHave), 174 control: make(map[peer.ID]*pb.ControlMessage), 175 backoff: make(map[string]map[peer.ID]time.Time), 176 peerhave: make(map[peer.ID]int), 177 iasked: make(map[peer.ID]int), 178 outbound: make(map[peer.ID]bool), 179 connect: make(chan connectInfo, GossipSubMaxPendingConnections), 180 mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), 181 182 // these are configured per router to allow variation in tests 183 D: GossipSubD, 184 Dlo: GossipSubDlo, 185 Dhi: GossipSubDhi, 186 Dscore: GossipSubDscore, 187 Dout: GossipSubDout, 188 Dlazy: GossipSubDlazy, 189 190 // these must be pulled in to resolve races in tests... sigh. 191 directConnectTicks: GossipSubDirectConnectTicks, 192 opportunisticGraftTicks: GossipSubOpportunisticGraftTicks, 193 194 fanoutTTL: GossipSubFanoutTTL, 195 196 tagTracer: newTagTracer(h.ConnManager()), 197 } 198 199 // use the withInternalTracer option to hook up the tag tracer 200 opts = append(opts, withInternalTracer(rt.tagTracer)) 201 return NewPubSub(ctx, h, rt, opts...) 202} 203 204// WithPeerScore is a gossipsub router option that enables peer scoring. 205func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option { 206 return func(ps *PubSub) error { 207 gs, ok := ps.rt.(*GossipSubRouter) 208 if !ok { 209 return fmt.Errorf("pubsub router is not gossipsub") 210 } 211 212 // sanity check: validate the score parameters 213 err := params.validate() 214 if err != nil { 215 return err 216 } 217 218 // sanity check: validate the threshold values 219 err = thresholds.validate() 220 if err != nil { 221 return err 222 } 223 224 gs.score = newPeerScore(params) 225 gs.gossipThreshold = thresholds.GossipThreshold 226 gs.publishThreshold = thresholds.PublishThreshold 227 gs.graylistThreshold = thresholds.GraylistThreshold 228 gs.acceptPXThreshold = thresholds.AcceptPXThreshold 229 gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold 230 231 gs.gossipTracer = newGossipTracer() 232 233 // hook the tracer 234 if ps.tracer != nil { 235 ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer) 236 } else { 237 ps.tracer = &pubsubTracer{ 238 internal: []internalTracer{gs.score, gs.gossipTracer}, 239 pid: ps.host.ID(), 240 msgID: ps.msgID, 241 } 242 } 243 244 return nil 245 } 246} 247 248// WithFloodPublish is a gossipsub router option that enables flood publishing. 249// When this is enabled, published messages are forwarded to all peers with score >= 250// to publishThreshold 251func WithFloodPublish(floodPublish bool) Option { 252 return func(ps *PubSub) error { 253 gs, ok := ps.rt.(*GossipSubRouter) 254 if !ok { 255 return fmt.Errorf("pubsub router is not gossipsub") 256 } 257 258 gs.floodPublish = floodPublish 259 260 return nil 261 } 262} 263 264// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE. 265// This should generally be enabled in bootstrappers and well connected/trusted nodes 266// used for bootstrapping. 267func WithPeerExchange(doPX bool) Option { 268 return func(ps *PubSub) error { 269 gs, ok := ps.rt.(*GossipSubRouter) 270 if !ok { 271 return fmt.Errorf("pubsub router is not gossipsub") 272 } 273 274 gs.doPX = doPX 275 276 return nil 277 } 278} 279 280// WithDirectPeers is a gossipsub router option that specifies peers with direct 281// peering agreements. These peers are connected outside of the mesh, with all (valid) 282// message unconditionally forwarded to them. The router will maintain open connections 283// to these peers. Note that the peering agreement should be reciprocal with direct peers 284// symmetrically configured at both ends. 285func WithDirectPeers(pis []peer.AddrInfo) Option { 286 return func(ps *PubSub) error { 287 gs, ok := ps.rt.(*GossipSubRouter) 288 if !ok { 289 return fmt.Errorf("pubsub router is not gossipsub") 290 } 291 292 direct := make(map[peer.ID]struct{}) 293 for _, pi := range pis { 294 direct[pi.ID] = struct{}{} 295 ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL) 296 } 297 298 gs.direct = direct 299 300 if gs.tagTracer != nil { 301 gs.tagTracer.direct = direct 302 } 303 304 return nil 305 } 306} 307 308// WithDirectConnectTicks is a gossipsub router option that sets the number of 309// heartbeat ticks between attempting to reconnect direct peers that are not 310// currently connected. A "tick" is based on the heartbeat interval, which is 311// 1s by default. The default value for direct connect ticks is 300. 312func WithDirectConnectTicks(t uint64) Option { 313 return func(ps *PubSub) error { 314 gs, ok := ps.rt.(*GossipSubRouter) 315 if !ok { 316 return fmt.Errorf("pubsub router is not gossipsub") 317 } 318 gs.directConnectTicks = t 319 return nil 320 } 321} 322 323// GossipSubRouter is a router that implements the gossipsub protocol. 324// For each topic we have joined, we maintain an overlay through which 325// messages flow; this is the mesh map. 326// For each topic we publish to without joining, we maintain a list of peers 327// to use for injecting our messages in the overlay with stable routes; this 328// is the fanout map. Fanout peer lists are expired if we don't publish any 329// messages to their topic for GossipSubFanoutTTL. 330type GossipSubRouter struct { 331 p *PubSub 332 peers map[peer.ID]protocol.ID // peer protocols 333 direct map[peer.ID]struct{} // direct peers 334 mesh map[string]map[peer.ID]struct{} // topic meshes 335 fanout map[string]map[peer.ID]struct{} // topic fanout 336 lastpub map[string]int64 // last publish time for fanout topics 337 gossip map[peer.ID][]*pb.ControlIHave // pending gossip 338 control map[peer.ID]*pb.ControlMessage // pending control messages 339 peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat 340 iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat 341 outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections 342 backoff map[string]map[peer.ID]time.Time // prune backoff 343 connect chan connectInfo // px connection requests 344 345 mcache *MessageCache 346 tracer *pubsubTracer 347 score *peerScore 348 gossipTracer *gossipTracer 349 tagTracer *tagTracer 350 gate *peerGater 351 352 // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted 353 // nodes. 354 doPX bool 355 356 // threshold for accepting PX from a peer; this should be positive and limited to scores 357 // attainable by bootstrappers and trusted nodes 358 acceptPXThreshold float64 359 360 // threshold for peer score to emit/accept gossip 361 // If the peer score is below this threshold, we won't emit or accept gossip from the peer. 362 // When there is no score, this value is 0. 363 gossipThreshold float64 364 365 // flood publish score threshold; we only publish to peers with score >= to the threshold 366 // when using flood publishing or the peer is a fanout or floodsub peer. 367 publishThreshold float64 368 369 // threshold for peer score before we graylist the peer and silently ignore its RPCs 370 graylistThreshold float64 371 372 // threshold for median peer score before triggering opportunistic grafting 373 opportunisticGraftThreshold float64 374 375 // whether to use flood publishing 376 floodPublish bool 377 378 // number of heartbeats since the beginning of time; this allows us to amortize some resource 379 // clean up -- eg backoff clean up. 380 heartbeatTicks uint64 381 382 // overly parameter "constants" 383 // these are pulled from their global value or else the race detector is angry on travis 384 // it also allows us to change them per peer in tests, which is a plus 385 D, Dlo, Dhi, Dscore, Dout, Dlazy int 386 387 // tick "constants" for triggering direct connect and opportunistic grafting 388 // these are pulled from their global value or else the race detector is angry on travis 389 directConnectTicks, opportunisticGraftTicks uint64 390 391 // fanout expiry ttl "constant" 392 // this is pulled from its global value or else the race detector is angry on travis 393 fanoutTTL time.Duration 394} 395 396type connectInfo struct { 397 p peer.ID 398 spr *record.Envelope 399} 400 401func (gs *GossipSubRouter) Protocols() []protocol.ID { 402 return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID} 403} 404 405func (gs *GossipSubRouter) Attach(p *PubSub) { 406 gs.p = p 407 gs.tracer = p.tracer 408 409 // start the scoring 410 gs.score.Start(gs) 411 412 // and the gossip tracing 413 gs.gossipTracer.Start(gs) 414 415 // and the tracer for connmgr tags 416 gs.tagTracer.Start(gs) 417 418 // start using the same msg ID function as PubSub for caching messages. 419 gs.mcache.SetMsgIdFn(p.msgID) 420 421 // start the heartbeat 422 go gs.heartbeatTimer() 423 424 // start the PX connectors 425 for i := 0; i < GossipSubConnectors; i++ { 426 go gs.connector() 427 } 428 429 // connect to direct peers 430 if len(gs.direct) > 0 { 431 go func() { 432 if GossipSubDirectConnectInitialDelay > 0 { 433 time.Sleep(GossipSubDirectConnectInitialDelay) 434 } 435 for p := range gs.direct { 436 gs.connect <- connectInfo{p: p} 437 } 438 }() 439 } 440} 441 442func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { 443 log.Debugf("PEERUP: Add new peer %s using %s", p, proto) 444 gs.tracer.AddPeer(p, proto) 445 gs.peers[p] = proto 446 447 // track the connection direction 448 outbound := false 449 conns := gs.p.host.Network().ConnsToPeer(p) 450loop: 451 for _, c := range conns { 452 if c.Stat().Direction == network.DirOutbound { 453 // only count the connection if it has a pubsub stream 454 for _, s := range c.GetStreams() { 455 if s.Protocol() == proto { 456 outbound = true 457 break loop 458 } 459 } 460 } 461 } 462 gs.outbound[p] = outbound 463} 464 465func (gs *GossipSubRouter) RemovePeer(p peer.ID) { 466 log.Debugf("PEERDOWN: Remove disconnected peer %s", p) 467 gs.tracer.RemovePeer(p) 468 delete(gs.peers, p) 469 for _, peers := range gs.mesh { 470 delete(peers, p) 471 } 472 for _, peers := range gs.fanout { 473 delete(peers, p) 474 } 475 delete(gs.gossip, p) 476 delete(gs.control, p) 477 delete(gs.outbound, p) 478} 479 480func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { 481 // check all peers in the topic 482 tmap, ok := gs.p.topics[topic] 483 if !ok { 484 return false 485 } 486 487 fsPeers, gsPeers := 0, 0 488 // floodsub peers 489 for p := range tmap { 490 if gs.peers[p] == FloodSubID { 491 fsPeers++ 492 } 493 } 494 495 // gossipsub peers 496 gsPeers = len(gs.mesh[topic]) 497 498 if suggested == 0 { 499 suggested = gs.Dlo 500 } 501 502 if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi { 503 return true 504 } 505 506 return false 507} 508 509func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { 510 _, direct := gs.direct[p] 511 if direct { 512 return AcceptAll 513 } 514 515 if gs.score.Score(p) < gs.graylistThreshold { 516 return AcceptNone 517 } 518 519 return gs.gate.AcceptFrom(p) 520} 521 522func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { 523 ctl := rpc.GetControl() 524 if ctl == nil { 525 return 526 } 527 528 iwant := gs.handleIHave(rpc.from, ctl) 529 ihave := gs.handleIWant(rpc.from, ctl) 530 prune := gs.handleGraft(rpc.from, ctl) 531 gs.handlePrune(rpc.from, ctl) 532 533 if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 { 534 return 535 } 536 537 out := rpcWithControl(ihave, nil, iwant, nil, prune) 538 gs.sendRPC(rpc.from, out) 539} 540 541func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant { 542 // we ignore IHAVE gossip from any peer whose score is below the gossip threshold 543 score := gs.score.Score(p) 544 if score < gs.gossipThreshold { 545 log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score) 546 return nil 547 } 548 549 // IHAVE flood protection 550 gs.peerhave[p]++ 551 if gs.peerhave[p] > GossipSubMaxIHaveMessages { 552 log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p]) 553 return nil 554 } 555 556 if gs.iasked[p] >= GossipSubMaxIHaveLength { 557 log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p]) 558 return nil 559 } 560 561 iwant := make(map[string]struct{}) 562 for _, ihave := range ctl.GetIhave() { 563 topic := ihave.GetTopicID() 564 _, ok := gs.mesh[topic] 565 if !ok { 566 continue 567 } 568 569 for _, mid := range ihave.GetMessageIDs() { 570 if gs.p.seenMessage(mid) { 571 continue 572 } 573 iwant[mid] = struct{}{} 574 } 575 } 576 577 if len(iwant) == 0 { 578 return nil 579 } 580 581 iask := len(iwant) 582 if iask+gs.iasked[p] > GossipSubMaxIHaveLength { 583 iask = GossipSubMaxIHaveLength - gs.iasked[p] 584 } 585 586 log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p) 587 588 iwantlst := make([]string, 0, len(iwant)) 589 for mid := range iwant { 590 iwantlst = append(iwantlst, mid) 591 } 592 593 // ask in random order 594 shuffleStrings(iwantlst) 595 596 // truncate to the messages we are actually asking for and update the iasked counter 597 iwantlst = iwantlst[:iask] 598 gs.iasked[p] += iask 599 600 gs.gossipTracer.AddPromise(p, iwantlst) 601 602 return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} 603} 604 605func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message { 606 // we don't respond to IWANT requests from any peer whose score is below the gossip threshold 607 score := gs.score.Score(p) 608 if score < gs.gossipThreshold { 609 log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score) 610 return nil 611 } 612 613 ihave := make(map[string]*pb.Message) 614 for _, iwant := range ctl.GetIwant() { 615 for _, mid := range iwant.GetMessageIDs() { 616 msg, count, ok := gs.mcache.GetForPeer(mid, p) 617 if !ok { 618 continue 619 } 620 621 if count > GossipSubGossipRetransmission { 622 log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid) 623 continue 624 } 625 626 ihave[mid] = msg 627 } 628 } 629 630 if len(ihave) == 0 { 631 return nil 632 } 633 634 log.Debugf("IWANT: Sending %d messages to %s", len(ihave), p) 635 636 msgs := make([]*pb.Message, 0, len(ihave)) 637 for _, msg := range ihave { 638 msgs = append(msgs, msg) 639 } 640 641 return msgs 642} 643 644func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune { 645 var prune []string 646 647 doPX := gs.doPX 648 score := gs.score.Score(p) 649 now := time.Now() 650 651 for _, graft := range ctl.GetGraft() { 652 topic := graft.GetTopicID() 653 peers, ok := gs.mesh[topic] 654 if !ok { 655 // don't do PX when there is an unknown topic to avoid leaking our peers 656 doPX = false 657 // spam hardening: ignore GRAFTs for unknown topics 658 continue 659 } 660 661 // check if it is already in the mesh; if so do nothing (we might have concurrent grafting) 662 _, inMesh := peers[p] 663 if inMesh { 664 continue 665 } 666 667 // we don't GRAFT to/from direct peers; complain loudly if this happens 668 _, direct := gs.direct[p] 669 if direct { 670 log.Warnf("GRAFT: ignoring request from direct peer %s", p) 671 // this is possibly a bug from non-reciprocal configuration; send a PRUNE 672 prune = append(prune, topic) 673 // but don't PX 674 doPX = false 675 continue 676 } 677 678 // make sure we are not backing off that peer 679 expire, backoff := gs.backoff[topic][p] 680 if backoff && now.Before(expire) { 681 log.Debugf("GRAFT: ignoring backed off peer %s", p) 682 // add behavioural penalty 683 gs.score.AddPenalty(p, 1) 684 // no PX 685 doPX = false 686 // check the flood cutoff -- is the GRAFT coming too fast? 687 floodCutoff := expire.Add(GossipSubGraftFloodThreshold - GossipSubPruneBackoff) 688 if now.Before(floodCutoff) { 689 // extra penalty 690 gs.score.AddPenalty(p, 1) 691 } 692 // refresh the backoff 693 gs.addBackoff(p, topic) 694 prune = append(prune, topic) 695 continue 696 } 697 698 // check the score 699 if score < 0 { 700 // we don't GRAFT peers with negative score 701 log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic) 702 // we do send them PRUNE however, because it's a matter of protocol correctness 703 prune = append(prune, topic) 704 // but we won't PX to them 705 doPX = false 706 // add/refresh backoff so that we don't reGRAFT too early even if the score decays back up 707 gs.addBackoff(p, topic) 708 continue 709 } 710 711 // check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts 712 // from peers with outbound connections; this is a defensive check to restrict potential 713 // mesh takeover attacks combined with love bombing 714 if len(peers) >= gs.Dhi && !gs.outbound[p] { 715 prune = append(prune, topic) 716 gs.addBackoff(p, topic) 717 continue 718 } 719 720 log.Debugf("GRAFT: add mesh link from %s in %s", p, topic) 721 gs.tracer.Graft(p, topic) 722 peers[p] = struct{}{} 723 } 724 725 if len(prune) == 0 { 726 return nil 727 } 728 729 cprune := make([]*pb.ControlPrune, 0, len(prune)) 730 for _, topic := range prune { 731 cprune = append(cprune, gs.makePrune(p, topic, doPX)) 732 } 733 734 return cprune 735} 736 737func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { 738 score := gs.score.Score(p) 739 740 for _, prune := range ctl.GetPrune() { 741 topic := prune.GetTopicID() 742 peers, ok := gs.mesh[topic] 743 if !ok { 744 continue 745 } 746 747 log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) 748 gs.tracer.Prune(p, topic) 749 delete(peers, p) 750 // is there a backoff specified by the peer? if so obey it. 751 backoff := prune.GetBackoff() 752 if backoff > 0 { 753 gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second) 754 } else { 755 gs.addBackoff(p, topic) 756 } 757 758 px := prune.GetPeers() 759 if len(px) > 0 { 760 // we ignore PX from peers with insufficient score 761 if score < gs.acceptPXThreshold { 762 log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", p, score, topic) 763 continue 764 } 765 766 gs.pxConnect(px) 767 } 768 } 769} 770 771func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { 772 gs.doAddBackoff(p, topic, GossipSubPruneBackoff) 773} 774 775func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) { 776 backoff, ok := gs.backoff[topic] 777 if !ok { 778 backoff = make(map[peer.ID]time.Time) 779 gs.backoff[topic] = backoff 780 } 781 expire := time.Now().Add(interval) 782 if backoff[p].Before(expire) { 783 backoff[p] = expire 784 } 785} 786 787func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) { 788 if len(peers) > GossipSubPrunePeers { 789 shufflePeerInfo(peers) 790 peers = peers[:GossipSubPrunePeers] 791 } 792 793 toconnect := make([]connectInfo, 0, len(peers)) 794 795 for _, pi := range peers { 796 p := peer.ID(pi.PeerID) 797 798 _, connected := gs.peers[p] 799 if connected { 800 continue 801 } 802 803 var spr *record.Envelope 804 if pi.SignedPeerRecord != nil { 805 // the peer sent us a signed record; ensure that it is valid 806 envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain) 807 if err != nil { 808 log.Warnf("error unmarshalling peer record obtained through px: %s", err) 809 continue 810 } 811 rec, ok := r.(*peer.PeerRecord) 812 if !ok { 813 log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord") 814 continue 815 } 816 if rec.PeerID != p { 817 log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p) 818 continue 819 } 820 spr = envelope 821 } 822 823 toconnect = append(toconnect, connectInfo{p, spr}) 824 } 825 826 if len(toconnect) == 0 { 827 return 828 } 829 830 for _, ci := range toconnect { 831 select { 832 case gs.connect <- ci: 833 default: 834 log.Debugf("ignoring peer connection attempt; too many pending connections") 835 break 836 } 837 } 838} 839 840func (gs *GossipSubRouter) connector() { 841 for { 842 select { 843 case ci := <-gs.connect: 844 if gs.p.host.Network().Connectedness(ci.p) == network.Connected { 845 continue 846 } 847 848 log.Debugf("connecting to %s", ci.p) 849 cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) 850 if ok && ci.spr != nil { 851 _, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL) 852 if err != nil { 853 log.Debugf("error processing peer record: %s", err) 854 } 855 } 856 857 ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout) 858 err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p}) 859 cancel() 860 if err != nil { 861 log.Debugf("error connecting to %s: %s", ci.p, err) 862 } 863 864 case <-gs.p.ctx.Done(): 865 return 866 } 867 } 868} 869 870func (gs *GossipSubRouter) Publish(msg *Message) { 871 gs.mcache.Put(msg.Message) 872 873 from := msg.ReceivedFrom 874 topic := msg.GetTopic() 875 876 tosend := make(map[peer.ID]struct{}) 877 878 // any peers in the topic? 879 tmap, ok := gs.p.topics[topic] 880 if !ok { 881 return 882 } 883 884 if gs.floodPublish && from == gs.p.host.ID() { 885 for p := range tmap { 886 _, direct := gs.direct[p] 887 if direct || gs.score.Score(p) >= gs.publishThreshold { 888 tosend[p] = struct{}{} 889 } 890 } 891 } else { 892 // direct peers 893 for p := range gs.direct { 894 _, inTopic := tmap[p] 895 if inTopic { 896 tosend[p] = struct{}{} 897 } 898 } 899 900 // floodsub peers 901 for p := range tmap { 902 if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold { 903 tosend[p] = struct{}{} 904 } 905 } 906 907 // gossipsub peers 908 gmap, ok := gs.mesh[topic] 909 if !ok { 910 // we are not in the mesh for topic, use fanout peers 911 gmap, ok = gs.fanout[topic] 912 if !ok || len(gmap) == 0 { 913 // we don't have any, pick some with score above the publish threshold 914 peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool { 915 _, direct := gs.direct[p] 916 return !direct && gs.score.Score(p) >= gs.publishThreshold 917 }) 918 919 if len(peers) > 0 { 920 gmap = peerListToMap(peers) 921 gs.fanout[topic] = gmap 922 } 923 } 924 gs.lastpub[topic] = time.Now().UnixNano() 925 } 926 927 for p := range gmap { 928 tosend[p] = struct{}{} 929 } 930 } 931 932 out := rpcWithMessages(msg.Message) 933 for pid := range tosend { 934 if pid == from || pid == peer.ID(msg.GetFrom()) { 935 continue 936 } 937 938 gs.sendRPC(pid, out) 939 } 940} 941 942func (gs *GossipSubRouter) Join(topic string) { 943 gmap, ok := gs.mesh[topic] 944 if ok { 945 return 946 } 947 948 log.Debugf("JOIN %s", topic) 949 gs.tracer.Join(topic) 950 951 gmap, ok = gs.fanout[topic] 952 if ok { 953 // these peers have a score above the publish threshold, which may be negative 954 // so drop the ones with a negative score 955 for p := range gmap { 956 if gs.score.Score(p) < 0 { 957 delete(gmap, p) 958 } 959 } 960 961 if len(gmap) < gs.D { 962 // we need more peers; eager, as this would get fixed in the next heartbeat 963 more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool { 964 // filter our current peers, direct peers, and peers with negative scores 965 _, inMesh := gmap[p] 966 _, direct := gs.direct[p] 967 return !inMesh && !direct && gs.score.Score(p) >= 0 968 }) 969 for _, p := range more { 970 gmap[p] = struct{}{} 971 } 972 } 973 gs.mesh[topic] = gmap 974 delete(gs.fanout, topic) 975 delete(gs.lastpub, topic) 976 } else { 977 peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool { 978 // filter direct peers and peers with negative score 979 _, direct := gs.direct[p] 980 return !direct && gs.score.Score(p) >= 0 981 }) 982 gmap = peerListToMap(peers) 983 gs.mesh[topic] = gmap 984 } 985 986 for p := range gmap { 987 log.Debugf("JOIN: Add mesh link to %s in %s", p, topic) 988 gs.tracer.Graft(p, topic) 989 gs.sendGraft(p, topic) 990 } 991} 992 993func (gs *GossipSubRouter) Leave(topic string) { 994 gmap, ok := gs.mesh[topic] 995 if !ok { 996 return 997 } 998 999 log.Debugf("LEAVE %s", topic) 1000 gs.tracer.Leave(topic) 1001 1002 delete(gs.mesh, topic) 1003 1004 for p := range gmap { 1005 log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) 1006 gs.tracer.Prune(p, topic) 1007 gs.sendPrune(p, topic) 1008 } 1009} 1010 1011func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { 1012 graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} 1013 out := rpcWithControl(nil, nil, nil, graft, nil) 1014 gs.sendRPC(p, out) 1015} 1016 1017func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { 1018 prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)} 1019 out := rpcWithControl(nil, nil, nil, nil, prune) 1020 gs.sendRPC(p, out) 1021} 1022 1023func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { 1024 // do we own the RPC? 1025 own := false 1026 1027 // piggyback control message retries 1028 ctl, ok := gs.control[p] 1029 if ok { 1030 out = copyRPC(out) 1031 own = true 1032 gs.piggybackControl(p, out, ctl) 1033 delete(gs.control, p) 1034 } 1035 1036 // piggyback gossip 1037 ihave, ok := gs.gossip[p] 1038 if ok { 1039 if !own { 1040 out = copyRPC(out) 1041 own = true 1042 } 1043 gs.piggybackGossip(p, out, ihave) 1044 delete(gs.gossip, p) 1045 } 1046 1047 mch, ok := gs.p.peers[p] 1048 if !ok { 1049 return 1050 } 1051 1052 // If we're below the max message size, go ahead and send 1053 if out.Size() < gs.p.maxMessageSize { 1054 gs.doSendRPC(out, p, mch) 1055 return 1056 } 1057 1058 // If we're too big, fragment into multiple RPCs and send each sequentially 1059 outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) 1060 if err != nil { 1061 gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err)) 1062 return 1063 } 1064 1065 for _, rpc := range outRPCs { 1066 gs.doSendRPC(rpc, p, mch) 1067 } 1068} 1069 1070func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) { 1071 log.Debugf("dropping message to peer %s: %s", p.Pretty(), reason) 1072 gs.tracer.DropRPC(rpc, p) 1073 // push control messages that need to be retried 1074 ctl := rpc.GetControl() 1075 if ctl != nil { 1076 gs.pushControl(p, ctl) 1077 } 1078} 1079 1080func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) { 1081 select { 1082 case mch <- rpc: 1083 gs.tracer.SendRPC(rpc, p) 1084 default: 1085 gs.doDropRPC(rpc, p, "queue full") 1086 } 1087} 1088 1089func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) { 1090 if rpc.Size() < limit { 1091 return []*RPC{rpc}, nil 1092 } 1093 1094 c := (rpc.Size() / limit) + 1 1095 rpcs := make([]*RPC, 1, c) 1096 rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from} 1097 1098 // outRPC returns the current RPC message if it will fit sizeToAdd more bytes 1099 // otherwise, it will create a new RPC message and add it to the list. 1100 // if withCtl is true, the returned message will have a non-nil empty Control message. 1101 outRPC := func(sizeToAdd int, withCtl bool) *RPC { 1102 current := rpcs[len(rpcs)-1] 1103 // check if we can fit the new data, plus an extra byte for the protobuf field tag 1104 if current.Size()+sizeToAdd+1 < limit { 1105 if withCtl && current.Control == nil { 1106 current.Control = &pb.ControlMessage{} 1107 } 1108 return current 1109 } 1110 var ctl *pb.ControlMessage 1111 if withCtl { 1112 ctl = &pb.ControlMessage{} 1113 } 1114 next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from} 1115 rpcs = append(rpcs, next) 1116 return next 1117 } 1118 1119 for _, msg := range rpc.GetPublish() { 1120 s := msg.Size() 1121 // if an individual message is too large, we can't fragment it and have to fail entirely 1122 if s > limit { 1123 return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit) 1124 } 1125 out := outRPC(s, false) 1126 out.Publish = append(out.Publish, msg) 1127 } 1128 1129 for _, sub := range rpc.GetSubscriptions() { 1130 out := outRPC(sub.Size(), false) 1131 out.Subscriptions = append(out.Subscriptions, sub) 1132 } 1133 1134 ctl := rpc.GetControl() 1135 if ctl == nil { 1136 // if there were no control messages, we're done 1137 return rpcs, nil 1138 } 1139 // if all the control messages fit into one RPC, we just add it to the end and return 1140 ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from} 1141 if ctlOut.Size() < limit { 1142 rpcs = append(rpcs, ctlOut) 1143 return rpcs, nil 1144 } 1145 1146 // we need to split up the control messages into multiple RPCs 1147 for _, graft := range ctl.Graft { 1148 out := outRPC(graft.Size(), true) 1149 out.Control.Graft = append(out.Control.Graft, graft) 1150 } 1151 for _, prune := range ctl.Prune { 1152 out := outRPC(prune.Size(), true) 1153 out.Control.Prune = append(out.Control.Prune, prune) 1154 } 1155 1156 // An individual IWANT or IHAVE message could be larger than the limit if we have 1157 // a lot of message IDs. fragmentMessageIds will split them into buckets that 1158 // fit within the limit, with some overhead for the control messages themselves 1159 for _, iwant := range ctl.Iwant { 1160 const protobufOverhead = 6 1161 idBuckets := fragmentMessageIds(iwant.MessageIDs, limit-protobufOverhead) 1162 for _, ids := range idBuckets { 1163 iwant := &pb.ControlIWant{MessageIDs: ids} 1164 out := outRPC(iwant.Size(), true) 1165 out.Control.Iwant = append(out.Control.Iwant, iwant) 1166 } 1167 } 1168 for _, ihave := range ctl.Ihave { 1169 const protobufOverhead = 6 1170 idBuckets := fragmentMessageIds(ihave.MessageIDs, limit-protobufOverhead) 1171 for _, ids := range idBuckets { 1172 ihave := &pb.ControlIHave{MessageIDs: ids} 1173 out := outRPC(ihave.Size(), true) 1174 out.Control.Ihave = append(out.Control.Ihave, ihave) 1175 } 1176 } 1177 return rpcs, nil 1178} 1179 1180func fragmentMessageIds(msgIds []string, limit int) [][]string { 1181 // account for two bytes of protobuf overhead per array element 1182 const protobufOverhead = 2 1183 1184 out := [][]string{{}} 1185 var currentBucket int 1186 var bucketLen int 1187 for i := 0; i < len(msgIds); i++ { 1188 size := len(msgIds[i]) + protobufOverhead 1189 if size > limit { 1190 // pathological case where a single message ID exceeds the limit. 1191 log.Warnf("message ID length %d exceeds limit %d, removing from outgoing gossip", size, limit) 1192 continue 1193 } 1194 bucketLen += size 1195 if bucketLen > limit { 1196 out = append(out, []string{}) 1197 currentBucket++ 1198 bucketLen = size 1199 } 1200 out[currentBucket] = append(out[currentBucket], msgIds[i]) 1201 } 1202 return out 1203} 1204 1205func (gs *GossipSubRouter) heartbeatTimer() { 1206 time.Sleep(GossipSubHeartbeatInitialDelay) 1207 select { 1208 case gs.p.eval <- gs.heartbeat: 1209 case <-gs.p.ctx.Done(): 1210 return 1211 } 1212 1213 ticker := time.NewTicker(GossipSubHeartbeatInterval) 1214 defer ticker.Stop() 1215 1216 for { 1217 select { 1218 case <-ticker.C: 1219 select { 1220 case gs.p.eval <- gs.heartbeat: 1221 case <-gs.p.ctx.Done(): 1222 return 1223 } 1224 case <-gs.p.ctx.Done(): 1225 return 1226 } 1227 } 1228} 1229 1230func (gs *GossipSubRouter) heartbeat() { 1231 defer log.EventBegin(gs.p.ctx, "heartbeat").Done() 1232 1233 gs.heartbeatTicks++ 1234 1235 tograft := make(map[peer.ID][]string) 1236 toprune := make(map[peer.ID][]string) 1237 noPX := make(map[peer.ID]bool) 1238 1239 // clean up expired backoffs 1240 gs.clearBackoff() 1241 1242 // clean up iasked counters 1243 gs.clearIHaveCounters() 1244 1245 // apply IWANT request penalties 1246 gs.applyIwantPenalties() 1247 1248 // ensure direct peers are connected 1249 gs.directConnect() 1250 1251 // cache scores throughout the heartbeat 1252 scores := make(map[peer.ID]float64) 1253 score := func(p peer.ID) float64 { 1254 s, ok := scores[p] 1255 if !ok { 1256 s = gs.score.Score(p) 1257 scores[p] = s 1258 } 1259 return s 1260 } 1261 1262 // maintain the mesh for topics we have joined 1263 for topic, peers := range gs.mesh { 1264 prunePeer := func(p peer.ID) { 1265 gs.tracer.Prune(p, topic) 1266 delete(peers, p) 1267 gs.addBackoff(p, topic) 1268 topics := toprune[p] 1269 toprune[p] = append(topics, topic) 1270 } 1271 1272 graftPeer := func(p peer.ID) { 1273 log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) 1274 gs.tracer.Graft(p, topic) 1275 peers[p] = struct{}{} 1276 topics := tograft[p] 1277 tograft[p] = append(topics, topic) 1278 } 1279 1280 // drop all peers with negative score, without PX 1281 for p := range peers { 1282 if score(p) < 0 { 1283 log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic) 1284 prunePeer(p) 1285 noPX[p] = true 1286 } 1287 } 1288 1289 // do we have enough peers? 1290 if l := len(peers); l < gs.Dlo { 1291 backoff := gs.backoff[topic] 1292 ineed := gs.D - l 1293 plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { 1294 // filter our current and direct peers, peers we are backing off, and peers with negative score 1295 _, inMesh := peers[p] 1296 _, doBackoff := backoff[p] 1297 _, direct := gs.direct[p] 1298 return !inMesh && !doBackoff && !direct && score(p) >= 0 1299 }) 1300 1301 for _, p := range plst { 1302 graftPeer(p) 1303 } 1304 } 1305 1306 // do we have too many peers? 1307 if len(peers) > gs.Dhi { 1308 plst := peerMapToList(peers) 1309 1310 // sort by score (but shuffle first for the case we don't use the score) 1311 shufflePeers(plst) 1312 sort.Slice(plst, func(i, j int) bool { 1313 return score(plst[i]) > score(plst[j]) 1314 }) 1315 1316 // We keep the first D_score peers by score and the remaining up to D randomly 1317 // under the constraint that we keep D_out peers in the mesh (if we have that many) 1318 shufflePeers(plst[gs.Dscore:]) 1319 1320 // count the outbound peers we are keeping 1321 outbound := 0 1322 for _, p := range plst[:gs.D] { 1323 if gs.outbound[p] { 1324 outbound++ 1325 } 1326 } 1327 1328 // if it's less than D_out, bubble up some outbound peers from the random selection 1329 if outbound < gs.Dout { 1330 rotate := func(i int) { 1331 // rotate the plst to the right and put the ith peer in the front 1332 p := plst[i] 1333 for j := i; j > 0; j-- { 1334 plst[j] = plst[j-1] 1335 } 1336 plst[0] = p 1337 } 1338 1339 // first bubble up all outbound peers already in the selection to the front 1340 if outbound > 0 { 1341 ihave := outbound 1342 for i := 1; i < gs.D && ihave > 0; i++ { 1343 p := plst[i] 1344 if gs.outbound[p] { 1345 rotate(i) 1346 ihave-- 1347 } 1348 } 1349 } 1350 1351 // now bubble up enough outbound peers outside the selection to the front 1352 ineed := gs.Dout - outbound 1353 for i := gs.D; i < len(plst) && ineed > 0; i++ { 1354 p := plst[i] 1355 if gs.outbound[p] { 1356 rotate(i) 1357 ineed-- 1358 } 1359 } 1360 } 1361 1362 // prune the excess peers 1363 for _, p := range plst[gs.D:] { 1364 log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic) 1365 prunePeer(p) 1366 } 1367 } 1368 1369 // do we have enough outboud peers? 1370 if len(peers) >= gs.Dlo { 1371 // count the outbound peers we have 1372 outbound := 0 1373 for p := range peers { 1374 if gs.outbound[p] { 1375 outbound++ 1376 } 1377 } 1378 1379 // if it's less than D_out, select some peers with outbound connections and graft them 1380 if outbound < gs.Dout { 1381 ineed := gs.Dout - outbound 1382 backoff := gs.backoff[topic] 1383 plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { 1384 // filter our current and direct peers, peers we are backing off, and peers with negative score 1385 _, inMesh := peers[p] 1386 _, doBackoff := backoff[p] 1387 _, direct := gs.direct[p] 1388 return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0 1389 }) 1390 1391 for _, p := range plst { 1392 graftPeer(p) 1393 } 1394 } 1395 } 1396 1397 // should we try to improve the mesh with opportunistic grafting? 1398 if gs.heartbeatTicks%gs.opportunisticGraftTicks == 0 && len(peers) > 1 { 1399 // Opportunistic grafting works as follows: we check the median score of peers in the 1400 // mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at 1401 // random with score over the median. 1402 // The intention is to (slowly) improve an underperforming mesh by introducing good 1403 // scoring peers that may have been gossiping at us. This allows us to get out of sticky 1404 // situations where we are stuck with poor peers and also recover from churn of good peers. 1405 1406 // now compute the median peer score in the mesh 1407 plst := peerMapToList(peers) 1408 sort.Slice(plst, func(i, j int) bool { 1409 return score(plst[i]) < score(plst[j]) 1410 }) 1411 medianIndex := len(peers) / 2 1412 medianScore := scores[plst[medianIndex]] 1413 1414 // if the median score is below the threshold, select a better peer (if any) and GRAFT 1415 if medianScore < gs.opportunisticGraftThreshold { 1416 backoff := gs.backoff[topic] 1417 plst = gs.getPeers(topic, GossipSubOpportunisticGraftPeers, func(p peer.ID) bool { 1418 _, inMesh := peers[p] 1419 _, doBackoff := backoff[p] 1420 _, direct := gs.direct[p] 1421 return !inMesh && !doBackoff && !direct && score(p) > medianScore 1422 }) 1423 1424 for _, p := range plst { 1425 log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic) 1426 graftPeer(p) 1427 } 1428 } 1429 } 1430 1431 // 2nd arg are mesh peers excluded from gossip. We already push 1432 // messages to them, so its redundant to gossip IHAVEs. 1433 gs.emitGossip(topic, peers) 1434 } 1435 1436 // expire fanout for topics we haven't published to in a while 1437 now := time.Now().UnixNano() 1438 for topic, lastpub := range gs.lastpub { 1439 if lastpub+int64(gs.fanoutTTL) < now { 1440 delete(gs.fanout, topic) 1441 delete(gs.lastpub, topic) 1442 } 1443 } 1444 1445 // maintain our fanout for topics we are publishing but we have not joined 1446 for topic, peers := range gs.fanout { 1447 // check whether our peers are still in the topic and have a score above the publish threshold 1448 for p := range peers { 1449 _, ok := gs.p.topics[topic][p] 1450 if !ok || score(p) < gs.publishThreshold { 1451 delete(peers, p) 1452 } 1453 } 1454 1455 // do we need more peers? 1456 if len(peers) < gs.D { 1457 ineed := gs.D - len(peers) 1458 plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { 1459 // filter our current and direct peers and peers with score above the publish threshold 1460 _, inFanout := peers[p] 1461 _, direct := gs.direct[p] 1462 return !inFanout && !direct && score(p) >= gs.publishThreshold 1463 }) 1464 1465 for _, p := range plst { 1466 peers[p] = struct{}{} 1467 } 1468 } 1469 1470 // 2nd arg are fanout peers excluded from gossip. We already push 1471 // messages to them, so its redundant to gossip IHAVEs. 1472 gs.emitGossip(topic, peers) 1473 } 1474 1475 // send coalesced GRAFT/PRUNE messages (will piggyback gossip) 1476 gs.sendGraftPrune(tograft, toprune, noPX) 1477 1478 // flush all pending gossip that wasn't piggybacked above 1479 gs.flush() 1480 1481 // advance the message history window 1482 gs.mcache.Shift() 1483} 1484 1485func (gs *GossipSubRouter) clearIHaveCounters() { 1486 if len(gs.peerhave) > 0 { 1487 // throw away the old map and make a new one 1488 gs.peerhave = make(map[peer.ID]int) 1489 } 1490 1491 if len(gs.iasked) > 0 { 1492 // throw away the old map and make a new one 1493 gs.iasked = make(map[peer.ID]int) 1494 } 1495} 1496 1497func (gs *GossipSubRouter) applyIwantPenalties() { 1498 for p, count := range gs.gossipTracer.GetBrokenPromises() { 1499 log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count) 1500 gs.score.AddPenalty(p, count) 1501 } 1502} 1503 1504func (gs *GossipSubRouter) clearBackoff() { 1505 // we only clear once every 15 ticks to avoid iterating over the map(s) too much 1506 if gs.heartbeatTicks%15 != 0 { 1507 return 1508 } 1509 1510 now := time.Now() 1511 for topic, backoff := range gs.backoff { 1512 for p, expire := range backoff { 1513 // add some slack time to the expiration 1514 // https://github.com/libp2p/specs/pull/289 1515 if expire.Add(2 * GossipSubHeartbeatInterval).Before(now) { 1516 delete(backoff, p) 1517 } 1518 } 1519 if len(backoff) == 0 { 1520 delete(gs.backoff, topic) 1521 } 1522 } 1523} 1524 1525func (gs *GossipSubRouter) directConnect() { 1526 // we donly do this every some ticks to allow pending connections to complete and account 1527 // for restarts/downtime 1528 if gs.heartbeatTicks%gs.directConnectTicks != 0 { 1529 return 1530 } 1531 1532 var toconnect []peer.ID 1533 for p := range gs.direct { 1534 _, connected := gs.peers[p] 1535 if !connected { 1536 toconnect = append(toconnect, p) 1537 } 1538 } 1539 1540 if len(toconnect) > 0 { 1541 go func() { 1542 for _, p := range toconnect { 1543 gs.connect <- connectInfo{p: p} 1544 } 1545 }() 1546 } 1547} 1548 1549func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) { 1550 for p, topics := range tograft { 1551 graft := make([]*pb.ControlGraft, 0, len(topics)) 1552 for _, topic := range topics { 1553 // copy topic string here since 1554 // the reference to the string 1555 // topic here changes with every 1556 // iteration of the slice. 1557 copiedID := topic 1558 graft = append(graft, &pb.ControlGraft{TopicID: &copiedID}) 1559 } 1560 1561 var prune []*pb.ControlPrune 1562 pruning, ok := toprune[p] 1563 if ok { 1564 delete(toprune, p) 1565 prune = make([]*pb.ControlPrune, 0, len(pruning)) 1566 for _, topic := range pruning { 1567 prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) 1568 } 1569 } 1570 1571 out := rpcWithControl(nil, nil, nil, graft, prune) 1572 gs.sendRPC(p, out) 1573 } 1574 1575 for p, topics := range toprune { 1576 prune := make([]*pb.ControlPrune, 0, len(topics)) 1577 for _, topic := range topics { 1578 prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) 1579 } 1580 1581 out := rpcWithControl(nil, nil, nil, nil, prune) 1582 gs.sendRPC(p, out) 1583 } 1584 1585} 1586 1587// emitGossip emits IHAVE gossip advertising items in the message cache window 1588// of this topic. 1589func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) { 1590 mids := gs.mcache.GetGossipIDs(topic) 1591 if len(mids) == 0 { 1592 return 1593 } 1594 1595 // shuffle to emit in random order 1596 shuffleStrings(mids) 1597 1598 // if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list 1599 if len(mids) > GossipSubMaxIHaveLength { 1600 // we do the truncation (with shuffling) per peer below 1601 log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids)) 1602 } 1603 1604 // Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy. 1605 // First we collect the peers above gossipThreshold that are not in the exclude set 1606 // and then randomly select from that set. 1607 // We also exclude direct peers, as there is no reason to emit gossip to them. 1608 peers := make([]peer.ID, 0, len(gs.p.topics[topic])) 1609 for p := range gs.p.topics[topic] { 1610 _, inExclude := exclude[p] 1611 _, direct := gs.direct[p] 1612 if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold { 1613 peers = append(peers, p) 1614 } 1615 } 1616 1617 target := gs.Dlazy 1618 factor := int(GossipSubGossipFactor * float64(len(peers))) 1619 if factor > target { 1620 target = factor 1621 } 1622 1623 if target > len(peers) { 1624 target = len(peers) 1625 } else { 1626 shufflePeers(peers) 1627 } 1628 peers = peers[:target] 1629 1630 // Emit the IHAVE gossip to the selected peers. 1631 for _, p := range peers { 1632 peerMids := mids 1633 if len(mids) > GossipSubMaxIHaveLength { 1634 // we do this per peer so that we emit a different set for each peer. 1635 // we have enough redundancy in the system that this will significantly increase the message 1636 // coverage when we do truncate. 1637 peerMids = make([]string, GossipSubMaxIHaveLength) 1638 shuffleStrings(mids) 1639 copy(peerMids, mids) 1640 } 1641 gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids}) 1642 } 1643} 1644 1645func (gs *GossipSubRouter) flush() { 1646 // send gossip first, which will also piggyback pending control 1647 for p, ihave := range gs.gossip { 1648 delete(gs.gossip, p) 1649 out := rpcWithControl(nil, ihave, nil, nil, nil) 1650 gs.sendRPC(p, out) 1651 } 1652 1653 // send the remaining control messages that wasn't merged with gossip 1654 for p, ctl := range gs.control { 1655 delete(gs.control, p) 1656 out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune) 1657 gs.sendRPC(p, out) 1658 } 1659} 1660 1661func (gs *GossipSubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) { 1662 gossip := gs.gossip[p] 1663 gossip = append(gossip, ihave) 1664 gs.gossip[p] = gossip 1665} 1666 1667func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { 1668 ctl := out.GetControl() 1669 if ctl == nil { 1670 ctl = &pb.ControlMessage{} 1671 out.Control = ctl 1672 } 1673 1674 ctl.Ihave = ihave 1675} 1676 1677func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { 1678 // remove IHAVE/IWANT from control message, gossip is not retried 1679 ctl.Ihave = nil 1680 ctl.Iwant = nil 1681 if ctl.Graft != nil || ctl.Prune != nil { 1682 gs.control[p] = ctl 1683 } 1684} 1685 1686func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { 1687 // check control message for staleness first 1688 var tograft []*pb.ControlGraft 1689 var toprune []*pb.ControlPrune 1690 1691 for _, graft := range ctl.GetGraft() { 1692 topic := graft.GetTopicID() 1693 peers, ok := gs.mesh[topic] 1694 if !ok { 1695 continue 1696 } 1697 _, ok = peers[p] 1698 if ok { 1699 tograft = append(tograft, graft) 1700 } 1701 } 1702 1703 for _, prune := range ctl.GetPrune() { 1704 topic := prune.GetTopicID() 1705 peers, ok := gs.mesh[topic] 1706 if !ok { 1707 toprune = append(toprune, prune) 1708 continue 1709 } 1710 _, ok = peers[p] 1711 if !ok { 1712 toprune = append(toprune, prune) 1713 } 1714 } 1715 1716 if len(tograft) == 0 && len(toprune) == 0 { 1717 return 1718 } 1719 1720 xctl := out.Control 1721 if xctl == nil { 1722 xctl = &pb.ControlMessage{} 1723 out.Control = xctl 1724 } 1725 1726 if len(tograft) > 0 { 1727 xctl.Graft = append(xctl.Graft, tograft...) 1728 } 1729 if len(toprune) > 0 { 1730 xctl.Prune = append(xctl.Prune, toprune...) 1731 } 1732} 1733 1734func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune { 1735 if gs.peers[p] == GossipSubID_v10 { 1736 // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway 1737 return &pb.ControlPrune{TopicID: &topic} 1738 } 1739 1740 backoff := uint64(GossipSubPruneBackoff / time.Second) 1741 var px []*pb.PeerInfo 1742 if doPX { 1743 // select peers for Peer eXchange 1744 peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { 1745 return p != xp && gs.score.Score(xp) >= 0 1746 }) 1747 1748 cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) 1749 px = make([]*pb.PeerInfo, 0, len(peers)) 1750 for _, p := range peers { 1751 // see if we have a signed peer record to send back; if we don't, just send 1752 // the peer ID and let the pruned peer find them in the DHT -- we can't trust 1753 // unsigned address records through px anyway. 1754 var recordBytes []byte 1755 if ok { 1756 spr := cab.GetPeerRecord(p) 1757 var err error 1758 if spr != nil { 1759 recordBytes, err = spr.Marshal() 1760 if err != nil { 1761 log.Warnf("error marshaling signed peer record for %s: %s", p, err) 1762 } 1763 } 1764 } 1765 px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes}) 1766 } 1767 } 1768 1769 return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff} 1770} 1771 1772func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { 1773 tmap, ok := gs.p.topics[topic] 1774 if !ok { 1775 return nil 1776 } 1777 1778 peers := make([]peer.ID, 0, len(tmap)) 1779 for p := range tmap { 1780 if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) { 1781 peers = append(peers, p) 1782 } 1783 } 1784 1785 shufflePeers(peers) 1786 1787 if count > 0 && len(peers) > count { 1788 peers = peers[:count] 1789 } 1790 1791 return peers 1792} 1793 1794func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { 1795 pmap := make(map[peer.ID]struct{}) 1796 for _, p := range peers { 1797 pmap[p] = struct{}{} 1798 } 1799 return pmap 1800} 1801 1802func peerMapToList(peers map[peer.ID]struct{}) []peer.ID { 1803 plst := make([]peer.ID, 0, len(peers)) 1804 for p := range peers { 1805 plst = append(plst, p) 1806 } 1807 return plst 1808} 1809 1810func shufflePeers(peers []peer.ID) { 1811 for i := range peers { 1812 j := rand.Intn(i + 1) 1813 peers[i], peers[j] = peers[j], peers[i] 1814 } 1815} 1816 1817func shufflePeerInfo(peers []*pb.PeerInfo) { 1818 for i := range peers { 1819 j := rand.Intn(i + 1) 1820 peers[i], peers[j] = peers[j], peers[i] 1821 } 1822} 1823 1824func shuffleStrings(lst []string) { 1825 for i := range lst { 1826 j := rand.Intn(i + 1) 1827 lst[i], lst[j] = lst[j], lst[i] 1828 } 1829} 1830