1package pubsub
2
3import (
4	"context"
5	"fmt"
6	"math/rand"
7	"sort"
8	"time"
9
10	pb "github.com/libp2p/go-libp2p-pubsub/pb"
11
12	"github.com/libp2p/go-libp2p-core/host"
13	"github.com/libp2p/go-libp2p-core/network"
14	"github.com/libp2p/go-libp2p-core/peer"
15	"github.com/libp2p/go-libp2p-core/peerstore"
16	"github.com/libp2p/go-libp2p-core/protocol"
17	"github.com/libp2p/go-libp2p-core/record"
18)
19
20const (
21	// GossipSubID_v10 is the protocol ID for version 1.0.0 of the GossipSub protocol.
22	// It is advertised along with GossipSubID_v11 for backwards compatibility.
23	GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
24
25	// GossipSubID_v11 is the protocol ID for version 1.1.0 of the GossipSub protocol.
26	// See the spec for details about how v1.1.0 compares to v1.0.0:
27	// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
28	GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
29)
30
31var (
32	// overlay parameters.
33
34	// GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6,
35	// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
36	// GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi.
37	GossipSubD = 6
38
39	// GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
40	// If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at
41	// the next heartbeat.
42	GossipSubDlo = 5
43
44	// GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
45	// If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat.
46	GossipSubDhi = 12
47
48	// GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription.
49	// At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are
50	// chosen randomly.
51	GossipSubDscore = 4
52
53	// GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh.
54	// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
55	// to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming
56	// our mesh with incoming connections.
57	//
58	// GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2.
59	GossipSubDout = 2
60
61	// gossip parameters
62
63	// GossipSubHistoryLength controls the size of the message cache used for gossip.
64	// The message cache will remember messages for GossipSubHistoryLength heartbeats.
65	GossipSubHistoryLength = 5
66
67	// GossipSubHistoryGossip controls how many cached message ids we will advertise in
68	// IHAVE gossip messages. When asked for our seen message IDs, we will return
69	// only those from the most recent GossipSubHistoryGossip heartbeats. The slack between
70	// GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages
71	// that will be expired by the time they're requested.
72	//
73	// GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to
74	// avoid a runtime panic.
75	GossipSubHistoryGossip = 3
76
77	// GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat.
78	// We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual
79	// number may be more, depending on GossipSubGossipFactor and how many peers we're
80	// connected to.
81	GossipSubDlazy = 6
82
83	// GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat.
84	// We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or
85	// GossipSubDlazy, whichever is greater.
86	GossipSubGossipFactor = 0.25
87
88	// GossipSubGossipRetransmission controls how many times we will allow a peer to request
89	// the same message id through IWANT gossip before we start ignoring them. This is designed
90	// to prevent peers from spamming us with requests and wasting our resources.
91	GossipSubGossipRetransmission = 3
92
93	// heartbeat interval
94
95	// GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
96	// after the router is initialized.
97	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
98
99	// GossipSubHeartbeatInterval controls the time between heartbeats.
100	GossipSubHeartbeatInterval = 1 * time.Second
101
102	// GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been
103	// GossipSubFanoutTTL since we've published to a topic that we're not subscribed to,
104	// we'll delete the fanout map for that topic.
105	GossipSubFanoutTTL = 60 * time.Second
106
107	// GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange.
108	// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
109	// send them signed peer records for up to GossipSubPrunePeers other peers that we
110	// know of.
111	GossipSubPrunePeers = 16
112
113	// GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long
114	// a peer must wait before attempting to graft into our mesh again after being pruned.
115	// When pruning a peer, we send them our value of GossipSubPruneBackoff so they know
116	// the minimum time to wait. Peers running older versions may not send a backoff time,
117	// so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff
118	// before attempting to re-graft.
119	GossipSubPruneBackoff = time.Minute
120
121	// GossipSubConnectors controls the number of active connection attempts for peers obtained through PX.
122	GossipSubConnectors = 8
123
124	// GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
125	GossipSubMaxPendingConnections = 128
126
127	// GossipSubConnectionTimeout controls the timeout for connection attempts.
128	GossipSubConnectionTimeout = 30 * time.Second
129
130	// GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
131	// that are not currently connected.
132	GossipSubDirectConnectTicks uint64 = 300
133
134	// GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers
135	GossipSubDirectConnectInitialDelay = time.Second
136
137	// GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
138	// with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some
139	// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
140	// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
141	GossipSubOpportunisticGraftTicks uint64 = 60
142
143	// GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft.
144	GossipSubOpportunisticGraftPeers = 2
145
146	// If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE,
147	// then there is an extra score penalty applied to the peer through P7.
148	GossipSubGraftFloodThreshold = 10 * time.Second
149
150	// GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
151	// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
152	// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
153	// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats;
154	// with the defaults this is 1666 messages/s.
155	GossipSubMaxIHaveLength = 5000
156
157	// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
158	GossipSubMaxIHaveMessages = 10
159
160	// Time to wait for a message requested through IWANT following an IHAVE advertisement.
161	// If the message is not received within this window, a broken promise is declared and
162	// the router may apply bahavioural penalties.
163	GossipSubIWantFollowupTime = 3 * time.Second
164)
165
166// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
167func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
168	rt := &GossipSubRouter{
169		peers:    make(map[peer.ID]protocol.ID),
170		mesh:     make(map[string]map[peer.ID]struct{}),
171		fanout:   make(map[string]map[peer.ID]struct{}),
172		lastpub:  make(map[string]int64),
173		gossip:   make(map[peer.ID][]*pb.ControlIHave),
174		control:  make(map[peer.ID]*pb.ControlMessage),
175		backoff:  make(map[string]map[peer.ID]time.Time),
176		peerhave: make(map[peer.ID]int),
177		iasked:   make(map[peer.ID]int),
178		outbound: make(map[peer.ID]bool),
179		connect:  make(chan connectInfo, GossipSubMaxPendingConnections),
180		mcache:   NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
181
182		// these are configured per router to allow variation in tests
183		D:      GossipSubD,
184		Dlo:    GossipSubDlo,
185		Dhi:    GossipSubDhi,
186		Dscore: GossipSubDscore,
187		Dout:   GossipSubDout,
188		Dlazy:  GossipSubDlazy,
189
190		// these must be pulled in to resolve races in tests... sigh.
191		directConnectTicks:      GossipSubDirectConnectTicks,
192		opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
193
194		fanoutTTL: GossipSubFanoutTTL,
195
196		tagTracer: newTagTracer(h.ConnManager()),
197	}
198
199	// use the withInternalTracer option to hook up the tag tracer
200	opts = append(opts, withInternalTracer(rt.tagTracer))
201	return NewPubSub(ctx, h, rt, opts...)
202}
203
204// WithPeerScore is a gossipsub router option that enables peer scoring.
205func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option {
206	return func(ps *PubSub) error {
207		gs, ok := ps.rt.(*GossipSubRouter)
208		if !ok {
209			return fmt.Errorf("pubsub router is not gossipsub")
210		}
211
212		// sanity check: validate the score parameters
213		err := params.validate()
214		if err != nil {
215			return err
216		}
217
218		// sanity check: validate the threshold values
219		err = thresholds.validate()
220		if err != nil {
221			return err
222		}
223
224		gs.score = newPeerScore(params)
225		gs.gossipThreshold = thresholds.GossipThreshold
226		gs.publishThreshold = thresholds.PublishThreshold
227		gs.graylistThreshold = thresholds.GraylistThreshold
228		gs.acceptPXThreshold = thresholds.AcceptPXThreshold
229		gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold
230
231		gs.gossipTracer = newGossipTracer()
232
233		// hook the tracer
234		if ps.tracer != nil {
235			ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
236		} else {
237			ps.tracer = &pubsubTracer{
238				internal: []internalTracer{gs.score, gs.gossipTracer},
239				pid:      ps.host.ID(),
240				msgID:    ps.msgID,
241			}
242		}
243
244		return nil
245	}
246}
247
248// WithFloodPublish is a gossipsub router option that enables flood publishing.
249// When this is enabled, published messages are forwarded to all peers with score >=
250// to publishThreshold
251func WithFloodPublish(floodPublish bool) Option {
252	return func(ps *PubSub) error {
253		gs, ok := ps.rt.(*GossipSubRouter)
254		if !ok {
255			return fmt.Errorf("pubsub router is not gossipsub")
256		}
257
258		gs.floodPublish = floodPublish
259
260		return nil
261	}
262}
263
264// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
265// This should generally be enabled in bootstrappers and well connected/trusted nodes
266// used for bootstrapping.
267func WithPeerExchange(doPX bool) Option {
268	return func(ps *PubSub) error {
269		gs, ok := ps.rt.(*GossipSubRouter)
270		if !ok {
271			return fmt.Errorf("pubsub router is not gossipsub")
272		}
273
274		gs.doPX = doPX
275
276		return nil
277	}
278}
279
280// WithDirectPeers is a gossipsub router option that specifies peers with direct
281// peering agreements. These peers are connected outside of the mesh, with all (valid)
282// message unconditionally forwarded to them. The router will maintain open connections
283// to these peers. Note that the peering agreement should be reciprocal with direct peers
284// symmetrically configured at both ends.
285func WithDirectPeers(pis []peer.AddrInfo) Option {
286	return func(ps *PubSub) error {
287		gs, ok := ps.rt.(*GossipSubRouter)
288		if !ok {
289			return fmt.Errorf("pubsub router is not gossipsub")
290		}
291
292		direct := make(map[peer.ID]struct{})
293		for _, pi := range pis {
294			direct[pi.ID] = struct{}{}
295			ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
296		}
297
298		gs.direct = direct
299
300		if gs.tagTracer != nil {
301			gs.tagTracer.direct = direct
302		}
303
304		return nil
305	}
306}
307
308// WithDirectConnectTicks is a gossipsub router option that sets the number of
309// heartbeat ticks between attempting to reconnect direct peers that are not
310// currently connected. A "tick" is based on the heartbeat interval, which is
311// 1s by default. The default value for direct connect ticks is 300.
312func WithDirectConnectTicks(t uint64) Option {
313	return func(ps *PubSub) error {
314		gs, ok := ps.rt.(*GossipSubRouter)
315		if !ok {
316			return fmt.Errorf("pubsub router is not gossipsub")
317		}
318		gs.directConnectTicks = t
319		return nil
320	}
321}
322
323// GossipSubRouter is a router that implements the gossipsub protocol.
324// For each topic we have joined, we maintain an overlay through which
325// messages flow; this is the mesh map.
326// For each topic we publish to without joining, we maintain a list of peers
327// to use for injecting our messages in the overlay with stable routes; this
328// is the fanout map. Fanout peer lists are expired if we don't publish any
329// messages to their topic for GossipSubFanoutTTL.
330type GossipSubRouter struct {
331	p        *PubSub
332	peers    map[peer.ID]protocol.ID          // peer protocols
333	direct   map[peer.ID]struct{}             // direct peers
334	mesh     map[string]map[peer.ID]struct{}  // topic meshes
335	fanout   map[string]map[peer.ID]struct{}  // topic fanout
336	lastpub  map[string]int64                 // last publish time for fanout topics
337	gossip   map[peer.ID][]*pb.ControlIHave   // pending gossip
338	control  map[peer.ID]*pb.ControlMessage   // pending control messages
339	peerhave map[peer.ID]int                  // number of IHAVEs received from peer in the last heartbeat
340	iasked   map[peer.ID]int                  // number of messages we have asked from peer in the last heartbeat
341	outbound map[peer.ID]bool                 // connection direction cache, marks peers with outbound connections
342	backoff  map[string]map[peer.ID]time.Time // prune backoff
343	connect  chan connectInfo                 // px connection requests
344
345	mcache       *MessageCache
346	tracer       *pubsubTracer
347	score        *peerScore
348	gossipTracer *gossipTracer
349	tagTracer    *tagTracer
350	gate         *peerGater
351
352	// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
353	// nodes.
354	doPX bool
355
356	// threshold for accepting PX from a peer; this should be positive and limited to scores
357	// attainable by bootstrappers and trusted nodes
358	acceptPXThreshold float64
359
360	// threshold for peer score to emit/accept gossip
361	// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
362	// When there is no score, this value is 0.
363	gossipThreshold float64
364
365	// flood publish score threshold; we only publish to peers with score >= to the threshold
366	// when using flood publishing or the peer is a fanout or floodsub peer.
367	publishThreshold float64
368
369	// threshold for peer score before we graylist the peer and silently ignore its RPCs
370	graylistThreshold float64
371
372	// threshold for median peer score before triggering opportunistic grafting
373	opportunisticGraftThreshold float64
374
375	// whether to use flood publishing
376	floodPublish bool
377
378	// number of heartbeats since the beginning of time; this allows us to amortize some resource
379	// clean up -- eg backoff clean up.
380	heartbeatTicks uint64
381
382	// overly parameter "constants"
383	// these are pulled from their global value or else the race detector is angry on travis
384	// it also allows us to change them per peer in tests, which is a plus
385	D, Dlo, Dhi, Dscore, Dout, Dlazy int
386
387	// tick "constants" for triggering direct connect and opportunistic grafting
388	// these are pulled from their global value or else the race detector is angry on travis
389	directConnectTicks, opportunisticGraftTicks uint64
390
391	// fanout expiry ttl "constant"
392	// this is pulled from its global value or else the race detector is angry on travis
393	fanoutTTL time.Duration
394}
395
396type connectInfo struct {
397	p   peer.ID
398	spr *record.Envelope
399}
400
401func (gs *GossipSubRouter) Protocols() []protocol.ID {
402	return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
403}
404
405func (gs *GossipSubRouter) Attach(p *PubSub) {
406	gs.p = p
407	gs.tracer = p.tracer
408
409	// start the scoring
410	gs.score.Start(gs)
411
412	// and the gossip tracing
413	gs.gossipTracer.Start(gs)
414
415	// and the tracer for connmgr tags
416	gs.tagTracer.Start(gs)
417
418	// start using the same msg ID function as PubSub for caching messages.
419	gs.mcache.SetMsgIdFn(p.msgID)
420
421	// start the heartbeat
422	go gs.heartbeatTimer()
423
424	// start the PX connectors
425	for i := 0; i < GossipSubConnectors; i++ {
426		go gs.connector()
427	}
428
429	// connect to direct peers
430	if len(gs.direct) > 0 {
431		go func() {
432			if GossipSubDirectConnectInitialDelay > 0 {
433				time.Sleep(GossipSubDirectConnectInitialDelay)
434			}
435			for p := range gs.direct {
436				gs.connect <- connectInfo{p: p}
437			}
438		}()
439	}
440}
441
442func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
443	log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
444	gs.tracer.AddPeer(p, proto)
445	gs.peers[p] = proto
446
447	// track the connection direction
448	outbound := false
449	conns := gs.p.host.Network().ConnsToPeer(p)
450loop:
451	for _, c := range conns {
452		if c.Stat().Direction == network.DirOutbound {
453			// only count the connection if it has a pubsub stream
454			for _, s := range c.GetStreams() {
455				if s.Protocol() == proto {
456					outbound = true
457					break loop
458				}
459			}
460		}
461	}
462	gs.outbound[p] = outbound
463}
464
465func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
466	log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
467	gs.tracer.RemovePeer(p)
468	delete(gs.peers, p)
469	for _, peers := range gs.mesh {
470		delete(peers, p)
471	}
472	for _, peers := range gs.fanout {
473		delete(peers, p)
474	}
475	delete(gs.gossip, p)
476	delete(gs.control, p)
477	delete(gs.outbound, p)
478}
479
480func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
481	// check all peers in the topic
482	tmap, ok := gs.p.topics[topic]
483	if !ok {
484		return false
485	}
486
487	fsPeers, gsPeers := 0, 0
488	// floodsub peers
489	for p := range tmap {
490		if gs.peers[p] == FloodSubID {
491			fsPeers++
492		}
493	}
494
495	// gossipsub peers
496	gsPeers = len(gs.mesh[topic])
497
498	if suggested == 0 {
499		suggested = gs.Dlo
500	}
501
502	if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi {
503		return true
504	}
505
506	return false
507}
508
509func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
510	_, direct := gs.direct[p]
511	if direct {
512		return AcceptAll
513	}
514
515	if gs.score.Score(p) < gs.graylistThreshold {
516		return AcceptNone
517	}
518
519	return gs.gate.AcceptFrom(p)
520}
521
522func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
523	ctl := rpc.GetControl()
524	if ctl == nil {
525		return
526	}
527
528	iwant := gs.handleIHave(rpc.from, ctl)
529	ihave := gs.handleIWant(rpc.from, ctl)
530	prune := gs.handleGraft(rpc.from, ctl)
531	gs.handlePrune(rpc.from, ctl)
532
533	if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 {
534		return
535	}
536
537	out := rpcWithControl(ihave, nil, iwant, nil, prune)
538	gs.sendRPC(rpc.from, out)
539}
540
541func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
542	// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
543	score := gs.score.Score(p)
544	if score < gs.gossipThreshold {
545		log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score)
546		return nil
547	}
548
549	// IHAVE flood protection
550	gs.peerhave[p]++
551	if gs.peerhave[p] > GossipSubMaxIHaveMessages {
552		log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p])
553		return nil
554	}
555
556	if gs.iasked[p] >= GossipSubMaxIHaveLength {
557		log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p])
558		return nil
559	}
560
561	iwant := make(map[string]struct{})
562	for _, ihave := range ctl.GetIhave() {
563		topic := ihave.GetTopicID()
564		_, ok := gs.mesh[topic]
565		if !ok {
566			continue
567		}
568
569		for _, mid := range ihave.GetMessageIDs() {
570			if gs.p.seenMessage(mid) {
571				continue
572			}
573			iwant[mid] = struct{}{}
574		}
575	}
576
577	if len(iwant) == 0 {
578		return nil
579	}
580
581	iask := len(iwant)
582	if iask+gs.iasked[p] > GossipSubMaxIHaveLength {
583		iask = GossipSubMaxIHaveLength - gs.iasked[p]
584	}
585
586	log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p)
587
588	iwantlst := make([]string, 0, len(iwant))
589	for mid := range iwant {
590		iwantlst = append(iwantlst, mid)
591	}
592
593	// ask in random order
594	shuffleStrings(iwantlst)
595
596	// truncate to the messages we are actually asking for and update the iasked counter
597	iwantlst = iwantlst[:iask]
598	gs.iasked[p] += iask
599
600	gs.gossipTracer.AddPromise(p, iwantlst)
601
602	return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
603}
604
605func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
606	// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
607	score := gs.score.Score(p)
608	if score < gs.gossipThreshold {
609		log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score)
610		return nil
611	}
612
613	ihave := make(map[string]*pb.Message)
614	for _, iwant := range ctl.GetIwant() {
615		for _, mid := range iwant.GetMessageIDs() {
616			msg, count, ok := gs.mcache.GetForPeer(mid, p)
617			if !ok {
618				continue
619			}
620
621			if count > GossipSubGossipRetransmission {
622				log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid)
623				continue
624			}
625
626			ihave[mid] = msg
627		}
628	}
629
630	if len(ihave) == 0 {
631		return nil
632	}
633
634	log.Debugf("IWANT: Sending %d messages to %s", len(ihave), p)
635
636	msgs := make([]*pb.Message, 0, len(ihave))
637	for _, msg := range ihave {
638		msgs = append(msgs, msg)
639	}
640
641	return msgs
642}
643
644func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
645	var prune []string
646
647	doPX := gs.doPX
648	score := gs.score.Score(p)
649	now := time.Now()
650
651	for _, graft := range ctl.GetGraft() {
652		topic := graft.GetTopicID()
653		peers, ok := gs.mesh[topic]
654		if !ok {
655			// don't do PX when there is an unknown topic to avoid leaking our peers
656			doPX = false
657			// spam hardening: ignore GRAFTs for unknown topics
658			continue
659		}
660
661		// check if it is already in the mesh; if so do nothing (we might have concurrent grafting)
662		_, inMesh := peers[p]
663		if inMesh {
664			continue
665		}
666
667		// we don't GRAFT to/from direct peers; complain loudly if this happens
668		_, direct := gs.direct[p]
669		if direct {
670			log.Warnf("GRAFT: ignoring request from direct peer %s", p)
671			// this is possibly a bug from non-reciprocal configuration; send a PRUNE
672			prune = append(prune, topic)
673			// but don't PX
674			doPX = false
675			continue
676		}
677
678		// make sure we are not backing off that peer
679		expire, backoff := gs.backoff[topic][p]
680		if backoff && now.Before(expire) {
681			log.Debugf("GRAFT: ignoring backed off peer %s", p)
682			// add behavioural penalty
683			gs.score.AddPenalty(p, 1)
684			// no PX
685			doPX = false
686			// check the flood cutoff -- is the GRAFT coming too fast?
687			floodCutoff := expire.Add(GossipSubGraftFloodThreshold - GossipSubPruneBackoff)
688			if now.Before(floodCutoff) {
689				// extra penalty
690				gs.score.AddPenalty(p, 1)
691			}
692			// refresh the backoff
693			gs.addBackoff(p, topic)
694			prune = append(prune, topic)
695			continue
696		}
697
698		// check the score
699		if score < 0 {
700			// we don't GRAFT peers with negative score
701			log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic)
702			// we do send them PRUNE however, because it's a matter of protocol correctness
703			prune = append(prune, topic)
704			// but we won't PX to them
705			doPX = false
706			// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
707			gs.addBackoff(p, topic)
708			continue
709		}
710
711		// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
712		// from peers with outbound connections; this is a defensive check to restrict potential
713		// mesh takeover attacks combined with love bombing
714		if len(peers) >= gs.Dhi && !gs.outbound[p] {
715			prune = append(prune, topic)
716			gs.addBackoff(p, topic)
717			continue
718		}
719
720		log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
721		gs.tracer.Graft(p, topic)
722		peers[p] = struct{}{}
723	}
724
725	if len(prune) == 0 {
726		return nil
727	}
728
729	cprune := make([]*pb.ControlPrune, 0, len(prune))
730	for _, topic := range prune {
731		cprune = append(cprune, gs.makePrune(p, topic, doPX))
732	}
733
734	return cprune
735}
736
737func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
738	score := gs.score.Score(p)
739
740	for _, prune := range ctl.GetPrune() {
741		topic := prune.GetTopicID()
742		peers, ok := gs.mesh[topic]
743		if !ok {
744			continue
745		}
746
747		log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
748		gs.tracer.Prune(p, topic)
749		delete(peers, p)
750		// is there a backoff specified by the peer? if so obey it.
751		backoff := prune.GetBackoff()
752		if backoff > 0 {
753			gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
754		} else {
755			gs.addBackoff(p, topic)
756		}
757
758		px := prune.GetPeers()
759		if len(px) > 0 {
760			// we ignore PX from peers with insufficient score
761			if score < gs.acceptPXThreshold {
762				log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", p, score, topic)
763				continue
764			}
765
766			gs.pxConnect(px)
767		}
768	}
769}
770
771func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
772	gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
773}
774
775func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
776	backoff, ok := gs.backoff[topic]
777	if !ok {
778		backoff = make(map[peer.ID]time.Time)
779		gs.backoff[topic] = backoff
780	}
781	expire := time.Now().Add(interval)
782	if backoff[p].Before(expire) {
783		backoff[p] = expire
784	}
785}
786
787func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
788	if len(peers) > GossipSubPrunePeers {
789		shufflePeerInfo(peers)
790		peers = peers[:GossipSubPrunePeers]
791	}
792
793	toconnect := make([]connectInfo, 0, len(peers))
794
795	for _, pi := range peers {
796		p := peer.ID(pi.PeerID)
797
798		_, connected := gs.peers[p]
799		if connected {
800			continue
801		}
802
803		var spr *record.Envelope
804		if pi.SignedPeerRecord != nil {
805			// the peer sent us a signed record; ensure that it is valid
806			envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
807			if err != nil {
808				log.Warnf("error unmarshalling peer record obtained through px: %s", err)
809				continue
810			}
811			rec, ok := r.(*peer.PeerRecord)
812			if !ok {
813				log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord")
814				continue
815			}
816			if rec.PeerID != p {
817				log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p)
818				continue
819			}
820			spr = envelope
821		}
822
823		toconnect = append(toconnect, connectInfo{p, spr})
824	}
825
826	if len(toconnect) == 0 {
827		return
828	}
829
830	for _, ci := range toconnect {
831		select {
832		case gs.connect <- ci:
833		default:
834			log.Debugf("ignoring peer connection attempt; too many pending connections")
835			break
836		}
837	}
838}
839
840func (gs *GossipSubRouter) connector() {
841	for {
842		select {
843		case ci := <-gs.connect:
844			if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
845				continue
846			}
847
848			log.Debugf("connecting to %s", ci.p)
849			cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
850			if ok && ci.spr != nil {
851				_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL)
852				if err != nil {
853					log.Debugf("error processing peer record: %s", err)
854				}
855			}
856
857			ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout)
858			err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
859			cancel()
860			if err != nil {
861				log.Debugf("error connecting to %s: %s", ci.p, err)
862			}
863
864		case <-gs.p.ctx.Done():
865			return
866		}
867	}
868}
869
870func (gs *GossipSubRouter) Publish(msg *Message) {
871	gs.mcache.Put(msg.Message)
872
873	from := msg.ReceivedFrom
874	topic := msg.GetTopic()
875
876	tosend := make(map[peer.ID]struct{})
877
878	// any peers in the topic?
879	tmap, ok := gs.p.topics[topic]
880	if !ok {
881		return
882	}
883
884	if gs.floodPublish && from == gs.p.host.ID() {
885		for p := range tmap {
886			_, direct := gs.direct[p]
887			if direct || gs.score.Score(p) >= gs.publishThreshold {
888				tosend[p] = struct{}{}
889			}
890		}
891	} else {
892		// direct peers
893		for p := range gs.direct {
894			_, inTopic := tmap[p]
895			if inTopic {
896				tosend[p] = struct{}{}
897			}
898		}
899
900		// floodsub peers
901		for p := range tmap {
902			if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold {
903				tosend[p] = struct{}{}
904			}
905		}
906
907		// gossipsub peers
908		gmap, ok := gs.mesh[topic]
909		if !ok {
910			// we are not in the mesh for topic, use fanout peers
911			gmap, ok = gs.fanout[topic]
912			if !ok || len(gmap) == 0 {
913				// we don't have any, pick some with score above the publish threshold
914				peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
915					_, direct := gs.direct[p]
916					return !direct && gs.score.Score(p) >= gs.publishThreshold
917				})
918
919				if len(peers) > 0 {
920					gmap = peerListToMap(peers)
921					gs.fanout[topic] = gmap
922				}
923			}
924			gs.lastpub[topic] = time.Now().UnixNano()
925		}
926
927		for p := range gmap {
928			tosend[p] = struct{}{}
929		}
930	}
931
932	out := rpcWithMessages(msg.Message)
933	for pid := range tosend {
934		if pid == from || pid == peer.ID(msg.GetFrom()) {
935			continue
936		}
937
938		gs.sendRPC(pid, out)
939	}
940}
941
942func (gs *GossipSubRouter) Join(topic string) {
943	gmap, ok := gs.mesh[topic]
944	if ok {
945		return
946	}
947
948	log.Debugf("JOIN %s", topic)
949	gs.tracer.Join(topic)
950
951	gmap, ok = gs.fanout[topic]
952	if ok {
953		// these peers have a score above the publish threshold, which may be negative
954		// so drop the ones with a negative score
955		for p := range gmap {
956			if gs.score.Score(p) < 0 {
957				delete(gmap, p)
958			}
959		}
960
961		if len(gmap) < gs.D {
962			// we need more peers; eager, as this would get fixed in the next heartbeat
963			more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool {
964				// filter our current peers, direct peers, and peers with negative scores
965				_, inMesh := gmap[p]
966				_, direct := gs.direct[p]
967				return !inMesh && !direct && gs.score.Score(p) >= 0
968			})
969			for _, p := range more {
970				gmap[p] = struct{}{}
971			}
972		}
973		gs.mesh[topic] = gmap
974		delete(gs.fanout, topic)
975		delete(gs.lastpub, topic)
976	} else {
977		peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
978			// filter direct peers and peers with negative score
979			_, direct := gs.direct[p]
980			return !direct && gs.score.Score(p) >= 0
981		})
982		gmap = peerListToMap(peers)
983		gs.mesh[topic] = gmap
984	}
985
986	for p := range gmap {
987		log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
988		gs.tracer.Graft(p, topic)
989		gs.sendGraft(p, topic)
990	}
991}
992
993func (gs *GossipSubRouter) Leave(topic string) {
994	gmap, ok := gs.mesh[topic]
995	if !ok {
996		return
997	}
998
999	log.Debugf("LEAVE %s", topic)
1000	gs.tracer.Leave(topic)
1001
1002	delete(gs.mesh, topic)
1003
1004	for p := range gmap {
1005		log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
1006		gs.tracer.Prune(p, topic)
1007		gs.sendPrune(p, topic)
1008	}
1009}
1010
1011func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
1012	graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
1013	out := rpcWithControl(nil, nil, nil, graft, nil)
1014	gs.sendRPC(p, out)
1015}
1016
1017func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
1018	prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)}
1019	out := rpcWithControl(nil, nil, nil, nil, prune)
1020	gs.sendRPC(p, out)
1021}
1022
1023func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
1024	// do we own the RPC?
1025	own := false
1026
1027	// piggyback control message retries
1028	ctl, ok := gs.control[p]
1029	if ok {
1030		out = copyRPC(out)
1031		own = true
1032		gs.piggybackControl(p, out, ctl)
1033		delete(gs.control, p)
1034	}
1035
1036	// piggyback gossip
1037	ihave, ok := gs.gossip[p]
1038	if ok {
1039		if !own {
1040			out = copyRPC(out)
1041			own = true
1042		}
1043		gs.piggybackGossip(p, out, ihave)
1044		delete(gs.gossip, p)
1045	}
1046
1047	mch, ok := gs.p.peers[p]
1048	if !ok {
1049		return
1050	}
1051
1052	// If we're below the max message size, go ahead and send
1053	if out.Size() < gs.p.maxMessageSize {
1054		gs.doSendRPC(out, p, mch)
1055		return
1056	}
1057
1058	// If we're too big, fragment into multiple RPCs and send each sequentially
1059	outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize)
1060	if err != nil {
1061		gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err))
1062		return
1063	}
1064
1065	for _, rpc := range outRPCs {
1066		gs.doSendRPC(rpc, p, mch)
1067	}
1068}
1069
1070func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
1071	log.Debugf("dropping message to peer %s: %s", p.Pretty(), reason)
1072	gs.tracer.DropRPC(rpc, p)
1073	// push control messages that need to be retried
1074	ctl := rpc.GetControl()
1075	if ctl != nil {
1076		gs.pushControl(p, ctl)
1077	}
1078}
1079
1080func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
1081	select {
1082	case mch <- rpc:
1083		gs.tracer.SendRPC(rpc, p)
1084	default:
1085		gs.doDropRPC(rpc, p, "queue full")
1086	}
1087}
1088
1089func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
1090	if rpc.Size() < limit {
1091		return []*RPC{rpc}, nil
1092	}
1093
1094	c := (rpc.Size() / limit) + 1
1095	rpcs := make([]*RPC, 1, c)
1096	rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from}
1097
1098	// outRPC returns the current RPC message if it will fit sizeToAdd more bytes
1099	// otherwise, it will create a new RPC message and add it to the list.
1100	// if withCtl is true, the returned message will have a non-nil empty Control message.
1101	outRPC := func(sizeToAdd int, withCtl bool) *RPC {
1102		current := rpcs[len(rpcs)-1]
1103		// check if we can fit the new data, plus an extra byte for the protobuf field tag
1104		if current.Size()+sizeToAdd+1 < limit {
1105			if withCtl && current.Control == nil {
1106				current.Control = &pb.ControlMessage{}
1107			}
1108			return current
1109		}
1110		var ctl *pb.ControlMessage
1111		if withCtl {
1112			ctl = &pb.ControlMessage{}
1113		}
1114		next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
1115		rpcs = append(rpcs, next)
1116		return next
1117	}
1118
1119	for _, msg := range rpc.GetPublish() {
1120		s := msg.Size()
1121		// if an individual message is too large, we can't fragment it and have to fail entirely
1122		if s > limit {
1123			return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit)
1124		}
1125		out := outRPC(s, false)
1126		out.Publish = append(out.Publish, msg)
1127	}
1128
1129	for _, sub := range rpc.GetSubscriptions() {
1130		out := outRPC(sub.Size(), false)
1131		out.Subscriptions = append(out.Subscriptions, sub)
1132	}
1133
1134	ctl := rpc.GetControl()
1135	if ctl == nil {
1136		// if there were no control messages, we're done
1137		return rpcs, nil
1138	}
1139	// if all the control messages fit into one RPC, we just add it to the end and return
1140	ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
1141	if ctlOut.Size() < limit {
1142		rpcs = append(rpcs, ctlOut)
1143		return rpcs, nil
1144	}
1145
1146	// we need to split up the control messages into multiple RPCs
1147	for _, graft := range ctl.Graft {
1148		out := outRPC(graft.Size(), true)
1149		out.Control.Graft = append(out.Control.Graft, graft)
1150	}
1151	for _, prune := range ctl.Prune {
1152		out := outRPC(prune.Size(), true)
1153		out.Control.Prune = append(out.Control.Prune, prune)
1154	}
1155
1156	// An individual IWANT or IHAVE message could be larger than the limit if we have
1157	// a lot of message IDs. fragmentMessageIds will split them into buckets that
1158	// fit within the limit, with some overhead for the control messages themselves
1159	for _, iwant := range ctl.Iwant {
1160		const protobufOverhead = 6
1161		idBuckets := fragmentMessageIds(iwant.MessageIDs, limit-protobufOverhead)
1162		for _, ids := range idBuckets {
1163			iwant := &pb.ControlIWant{MessageIDs: ids}
1164			out := outRPC(iwant.Size(), true)
1165			out.Control.Iwant = append(out.Control.Iwant, iwant)
1166		}
1167	}
1168	for _, ihave := range ctl.Ihave {
1169		const protobufOverhead = 6
1170		idBuckets := fragmentMessageIds(ihave.MessageIDs, limit-protobufOverhead)
1171		for _, ids := range idBuckets {
1172			ihave := &pb.ControlIHave{MessageIDs: ids}
1173			out := outRPC(ihave.Size(), true)
1174			out.Control.Ihave = append(out.Control.Ihave, ihave)
1175		}
1176	}
1177	return rpcs, nil
1178}
1179
1180func fragmentMessageIds(msgIds []string, limit int) [][]string {
1181	// account for two bytes of protobuf overhead per array element
1182	const protobufOverhead = 2
1183
1184	out := [][]string{{}}
1185	var currentBucket int
1186	var bucketLen int
1187	for i := 0; i < len(msgIds); i++ {
1188		size := len(msgIds[i]) + protobufOverhead
1189		if size > limit {
1190			// pathological case where a single message ID exceeds the limit.
1191			log.Warnf("message ID length %d exceeds limit %d, removing from outgoing gossip", size, limit)
1192			continue
1193		}
1194		bucketLen += size
1195		if bucketLen > limit {
1196			out = append(out, []string{})
1197			currentBucket++
1198			bucketLen = size
1199		}
1200		out[currentBucket] = append(out[currentBucket], msgIds[i])
1201	}
1202	return out
1203}
1204
1205func (gs *GossipSubRouter) heartbeatTimer() {
1206	time.Sleep(GossipSubHeartbeatInitialDelay)
1207	select {
1208	case gs.p.eval <- gs.heartbeat:
1209	case <-gs.p.ctx.Done():
1210		return
1211	}
1212
1213	ticker := time.NewTicker(GossipSubHeartbeatInterval)
1214	defer ticker.Stop()
1215
1216	for {
1217		select {
1218		case <-ticker.C:
1219			select {
1220			case gs.p.eval <- gs.heartbeat:
1221			case <-gs.p.ctx.Done():
1222				return
1223			}
1224		case <-gs.p.ctx.Done():
1225			return
1226		}
1227	}
1228}
1229
1230func (gs *GossipSubRouter) heartbeat() {
1231	defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
1232
1233	gs.heartbeatTicks++
1234
1235	tograft := make(map[peer.ID][]string)
1236	toprune := make(map[peer.ID][]string)
1237	noPX := make(map[peer.ID]bool)
1238
1239	// clean up expired backoffs
1240	gs.clearBackoff()
1241
1242	// clean up iasked counters
1243	gs.clearIHaveCounters()
1244
1245	// apply IWANT request penalties
1246	gs.applyIwantPenalties()
1247
1248	// ensure direct peers are connected
1249	gs.directConnect()
1250
1251	// cache scores throughout the heartbeat
1252	scores := make(map[peer.ID]float64)
1253	score := func(p peer.ID) float64 {
1254		s, ok := scores[p]
1255		if !ok {
1256			s = gs.score.Score(p)
1257			scores[p] = s
1258		}
1259		return s
1260	}
1261
1262	// maintain the mesh for topics we have joined
1263	for topic, peers := range gs.mesh {
1264		prunePeer := func(p peer.ID) {
1265			gs.tracer.Prune(p, topic)
1266			delete(peers, p)
1267			gs.addBackoff(p, topic)
1268			topics := toprune[p]
1269			toprune[p] = append(topics, topic)
1270		}
1271
1272		graftPeer := func(p peer.ID) {
1273			log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
1274			gs.tracer.Graft(p, topic)
1275			peers[p] = struct{}{}
1276			topics := tograft[p]
1277			tograft[p] = append(topics, topic)
1278		}
1279
1280		// drop all peers with negative score, without PX
1281		for p := range peers {
1282			if score(p) < 0 {
1283				log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic)
1284				prunePeer(p)
1285				noPX[p] = true
1286			}
1287		}
1288
1289		// do we have enough peers?
1290		if l := len(peers); l < gs.Dlo {
1291			backoff := gs.backoff[topic]
1292			ineed := gs.D - l
1293			plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
1294				// filter our current and direct peers, peers we are backing off, and peers with negative score
1295				_, inMesh := peers[p]
1296				_, doBackoff := backoff[p]
1297				_, direct := gs.direct[p]
1298				return !inMesh && !doBackoff && !direct && score(p) >= 0
1299			})
1300
1301			for _, p := range plst {
1302				graftPeer(p)
1303			}
1304		}
1305
1306		// do we have too many peers?
1307		if len(peers) > gs.Dhi {
1308			plst := peerMapToList(peers)
1309
1310			// sort by score (but shuffle first for the case we don't use the score)
1311			shufflePeers(plst)
1312			sort.Slice(plst, func(i, j int) bool {
1313				return score(plst[i]) > score(plst[j])
1314			})
1315
1316			// We keep the first D_score peers by score and the remaining up to D randomly
1317			// under the constraint that we keep D_out peers in the mesh (if we have that many)
1318			shufflePeers(plst[gs.Dscore:])
1319
1320			// count the outbound peers we are keeping
1321			outbound := 0
1322			for _, p := range plst[:gs.D] {
1323				if gs.outbound[p] {
1324					outbound++
1325				}
1326			}
1327
1328			// if it's less than D_out, bubble up some outbound peers from the random selection
1329			if outbound < gs.Dout {
1330				rotate := func(i int) {
1331					// rotate the plst to the right and put the ith peer in the front
1332					p := plst[i]
1333					for j := i; j > 0; j-- {
1334						plst[j] = plst[j-1]
1335					}
1336					plst[0] = p
1337				}
1338
1339				// first bubble up all outbound peers already in the selection to the front
1340				if outbound > 0 {
1341					ihave := outbound
1342					for i := 1; i < gs.D && ihave > 0; i++ {
1343						p := plst[i]
1344						if gs.outbound[p] {
1345							rotate(i)
1346							ihave--
1347						}
1348					}
1349				}
1350
1351				// now bubble up enough outbound peers outside the selection to the front
1352				ineed := gs.Dout - outbound
1353				for i := gs.D; i < len(plst) && ineed > 0; i++ {
1354					p := plst[i]
1355					if gs.outbound[p] {
1356						rotate(i)
1357						ineed--
1358					}
1359				}
1360			}
1361
1362			// prune the excess peers
1363			for _, p := range plst[gs.D:] {
1364				log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
1365				prunePeer(p)
1366			}
1367		}
1368
1369		// do we have enough outboud peers?
1370		if len(peers) >= gs.Dlo {
1371			// count the outbound peers we have
1372			outbound := 0
1373			for p := range peers {
1374				if gs.outbound[p] {
1375					outbound++
1376				}
1377			}
1378
1379			// if it's less than D_out, select some peers with outbound connections and graft them
1380			if outbound < gs.Dout {
1381				ineed := gs.Dout - outbound
1382				backoff := gs.backoff[topic]
1383				plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
1384					// filter our current and direct peers, peers we are backing off, and peers with negative score
1385					_, inMesh := peers[p]
1386					_, doBackoff := backoff[p]
1387					_, direct := gs.direct[p]
1388					return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0
1389				})
1390
1391				for _, p := range plst {
1392					graftPeer(p)
1393				}
1394			}
1395		}
1396
1397		// should we try to improve the mesh with opportunistic grafting?
1398		if gs.heartbeatTicks%gs.opportunisticGraftTicks == 0 && len(peers) > 1 {
1399			// Opportunistic grafting works as follows: we check the median score of peers in the
1400			// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
1401			// random with score over the median.
1402			// The intention is to (slowly) improve an underperforming mesh by introducing good
1403			// scoring peers that may have been gossiping at us. This allows us to get out of sticky
1404			// situations where we are stuck with poor peers and also recover from churn of good peers.
1405
1406			// now compute the median peer score in the mesh
1407			plst := peerMapToList(peers)
1408			sort.Slice(plst, func(i, j int) bool {
1409				return score(plst[i]) < score(plst[j])
1410			})
1411			medianIndex := len(peers) / 2
1412			medianScore := scores[plst[medianIndex]]
1413
1414			// if the median score is below the threshold, select a better peer (if any) and GRAFT
1415			if medianScore < gs.opportunisticGraftThreshold {
1416				backoff := gs.backoff[topic]
1417				plst = gs.getPeers(topic, GossipSubOpportunisticGraftPeers, func(p peer.ID) bool {
1418					_, inMesh := peers[p]
1419					_, doBackoff := backoff[p]
1420					_, direct := gs.direct[p]
1421					return !inMesh && !doBackoff && !direct && score(p) > medianScore
1422				})
1423
1424				for _, p := range plst {
1425					log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic)
1426					graftPeer(p)
1427				}
1428			}
1429		}
1430
1431		// 2nd arg are mesh peers excluded from gossip. We already push
1432		// messages to them, so its redundant to gossip IHAVEs.
1433		gs.emitGossip(topic, peers)
1434	}
1435
1436	// expire fanout for topics we haven't published to in a while
1437	now := time.Now().UnixNano()
1438	for topic, lastpub := range gs.lastpub {
1439		if lastpub+int64(gs.fanoutTTL) < now {
1440			delete(gs.fanout, topic)
1441			delete(gs.lastpub, topic)
1442		}
1443	}
1444
1445	// maintain our fanout for topics we are publishing but we have not joined
1446	for topic, peers := range gs.fanout {
1447		// check whether our peers are still in the topic and have a score above the publish threshold
1448		for p := range peers {
1449			_, ok := gs.p.topics[topic][p]
1450			if !ok || score(p) < gs.publishThreshold {
1451				delete(peers, p)
1452			}
1453		}
1454
1455		// do we need more peers?
1456		if len(peers) < gs.D {
1457			ineed := gs.D - len(peers)
1458			plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
1459				// filter our current and direct peers and peers with score above the publish threshold
1460				_, inFanout := peers[p]
1461				_, direct := gs.direct[p]
1462				return !inFanout && !direct && score(p) >= gs.publishThreshold
1463			})
1464
1465			for _, p := range plst {
1466				peers[p] = struct{}{}
1467			}
1468		}
1469
1470		// 2nd arg are fanout peers excluded from gossip. We already push
1471		// messages to them, so its redundant to gossip IHAVEs.
1472		gs.emitGossip(topic, peers)
1473	}
1474
1475	// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
1476	gs.sendGraftPrune(tograft, toprune, noPX)
1477
1478	// flush all pending gossip that wasn't piggybacked above
1479	gs.flush()
1480
1481	// advance the message history window
1482	gs.mcache.Shift()
1483}
1484
1485func (gs *GossipSubRouter) clearIHaveCounters() {
1486	if len(gs.peerhave) > 0 {
1487		// throw away the old map and make a new one
1488		gs.peerhave = make(map[peer.ID]int)
1489	}
1490
1491	if len(gs.iasked) > 0 {
1492		// throw away the old map and make a new one
1493		gs.iasked = make(map[peer.ID]int)
1494	}
1495}
1496
1497func (gs *GossipSubRouter) applyIwantPenalties() {
1498	for p, count := range gs.gossipTracer.GetBrokenPromises() {
1499		log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count)
1500		gs.score.AddPenalty(p, count)
1501	}
1502}
1503
1504func (gs *GossipSubRouter) clearBackoff() {
1505	// we only clear once every 15 ticks to avoid iterating over the map(s) too much
1506	if gs.heartbeatTicks%15 != 0 {
1507		return
1508	}
1509
1510	now := time.Now()
1511	for topic, backoff := range gs.backoff {
1512		for p, expire := range backoff {
1513			// add some slack time to the expiration
1514			// https://github.com/libp2p/specs/pull/289
1515			if expire.Add(2 * GossipSubHeartbeatInterval).Before(now) {
1516				delete(backoff, p)
1517			}
1518		}
1519		if len(backoff) == 0 {
1520			delete(gs.backoff, topic)
1521		}
1522	}
1523}
1524
1525func (gs *GossipSubRouter) directConnect() {
1526	// we donly do this every some ticks to allow pending connections to complete and account
1527	// for restarts/downtime
1528	if gs.heartbeatTicks%gs.directConnectTicks != 0 {
1529		return
1530	}
1531
1532	var toconnect []peer.ID
1533	for p := range gs.direct {
1534		_, connected := gs.peers[p]
1535		if !connected {
1536			toconnect = append(toconnect, p)
1537		}
1538	}
1539
1540	if len(toconnect) > 0 {
1541		go func() {
1542			for _, p := range toconnect {
1543				gs.connect <- connectInfo{p: p}
1544			}
1545		}()
1546	}
1547}
1548
1549func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
1550	for p, topics := range tograft {
1551		graft := make([]*pb.ControlGraft, 0, len(topics))
1552		for _, topic := range topics {
1553			// copy topic string here since
1554			// the reference to the string
1555			// topic here changes with every
1556			// iteration of the slice.
1557			copiedID := topic
1558			graft = append(graft, &pb.ControlGraft{TopicID: &copiedID})
1559		}
1560
1561		var prune []*pb.ControlPrune
1562		pruning, ok := toprune[p]
1563		if ok {
1564			delete(toprune, p)
1565			prune = make([]*pb.ControlPrune, 0, len(pruning))
1566			for _, topic := range pruning {
1567				prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
1568			}
1569		}
1570
1571		out := rpcWithControl(nil, nil, nil, graft, prune)
1572		gs.sendRPC(p, out)
1573	}
1574
1575	for p, topics := range toprune {
1576		prune := make([]*pb.ControlPrune, 0, len(topics))
1577		for _, topic := range topics {
1578			prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
1579		}
1580
1581		out := rpcWithControl(nil, nil, nil, nil, prune)
1582		gs.sendRPC(p, out)
1583	}
1584
1585}
1586
1587// emitGossip emits IHAVE gossip advertising items in the message cache window
1588// of this topic.
1589func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) {
1590	mids := gs.mcache.GetGossipIDs(topic)
1591	if len(mids) == 0 {
1592		return
1593	}
1594
1595	// shuffle to emit in random order
1596	shuffleStrings(mids)
1597
1598	// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
1599	if len(mids) > GossipSubMaxIHaveLength {
1600		// we do the truncation (with shuffling) per peer below
1601		log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids))
1602	}
1603
1604	// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
1605	// First we collect the peers above gossipThreshold that are not in the exclude set
1606	// and then randomly select from that set.
1607	// We also exclude direct peers, as there is no reason to emit gossip to them.
1608	peers := make([]peer.ID, 0, len(gs.p.topics[topic]))
1609	for p := range gs.p.topics[topic] {
1610		_, inExclude := exclude[p]
1611		_, direct := gs.direct[p]
1612		if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
1613			peers = append(peers, p)
1614		}
1615	}
1616
1617	target := gs.Dlazy
1618	factor := int(GossipSubGossipFactor * float64(len(peers)))
1619	if factor > target {
1620		target = factor
1621	}
1622
1623	if target > len(peers) {
1624		target = len(peers)
1625	} else {
1626		shufflePeers(peers)
1627	}
1628	peers = peers[:target]
1629
1630	// Emit the IHAVE gossip to the selected peers.
1631	for _, p := range peers {
1632		peerMids := mids
1633		if len(mids) > GossipSubMaxIHaveLength {
1634			// we do this per peer so that we emit a different set for each peer.
1635			// we have enough redundancy in the system that this will significantly increase the message
1636			// coverage when we do truncate.
1637			peerMids = make([]string, GossipSubMaxIHaveLength)
1638			shuffleStrings(mids)
1639			copy(peerMids, mids)
1640		}
1641		gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids})
1642	}
1643}
1644
1645func (gs *GossipSubRouter) flush() {
1646	// send gossip first, which will also piggyback pending control
1647	for p, ihave := range gs.gossip {
1648		delete(gs.gossip, p)
1649		out := rpcWithControl(nil, ihave, nil, nil, nil)
1650		gs.sendRPC(p, out)
1651	}
1652
1653	// send the remaining control messages that wasn't merged with gossip
1654	for p, ctl := range gs.control {
1655		delete(gs.control, p)
1656		out := rpcWithControl(nil, nil, nil, ctl.Graft, ctl.Prune)
1657		gs.sendRPC(p, out)
1658	}
1659}
1660
1661func (gs *GossipSubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) {
1662	gossip := gs.gossip[p]
1663	gossip = append(gossip, ihave)
1664	gs.gossip[p] = gossip
1665}
1666
1667func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
1668	ctl := out.GetControl()
1669	if ctl == nil {
1670		ctl = &pb.ControlMessage{}
1671		out.Control = ctl
1672	}
1673
1674	ctl.Ihave = ihave
1675}
1676
1677func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
1678	// remove IHAVE/IWANT from control message, gossip is not retried
1679	ctl.Ihave = nil
1680	ctl.Iwant = nil
1681	if ctl.Graft != nil || ctl.Prune != nil {
1682		gs.control[p] = ctl
1683	}
1684}
1685
1686func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
1687	// check control message for staleness first
1688	var tograft []*pb.ControlGraft
1689	var toprune []*pb.ControlPrune
1690
1691	for _, graft := range ctl.GetGraft() {
1692		topic := graft.GetTopicID()
1693		peers, ok := gs.mesh[topic]
1694		if !ok {
1695			continue
1696		}
1697		_, ok = peers[p]
1698		if ok {
1699			tograft = append(tograft, graft)
1700		}
1701	}
1702
1703	for _, prune := range ctl.GetPrune() {
1704		topic := prune.GetTopicID()
1705		peers, ok := gs.mesh[topic]
1706		if !ok {
1707			toprune = append(toprune, prune)
1708			continue
1709		}
1710		_, ok = peers[p]
1711		if !ok {
1712			toprune = append(toprune, prune)
1713		}
1714	}
1715
1716	if len(tograft) == 0 && len(toprune) == 0 {
1717		return
1718	}
1719
1720	xctl := out.Control
1721	if xctl == nil {
1722		xctl = &pb.ControlMessage{}
1723		out.Control = xctl
1724	}
1725
1726	if len(tograft) > 0 {
1727		xctl.Graft = append(xctl.Graft, tograft...)
1728	}
1729	if len(toprune) > 0 {
1730		xctl.Prune = append(xctl.Prune, toprune...)
1731	}
1732}
1733
1734func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
1735	if gs.peers[p] == GossipSubID_v10 {
1736		// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1737		return &pb.ControlPrune{TopicID: &topic}
1738	}
1739
1740	backoff := uint64(GossipSubPruneBackoff / time.Second)
1741	var px []*pb.PeerInfo
1742	if doPX {
1743		// select peers for Peer eXchange
1744		peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
1745			return p != xp && gs.score.Score(xp) >= 0
1746		})
1747
1748		cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
1749		px = make([]*pb.PeerInfo, 0, len(peers))
1750		for _, p := range peers {
1751			// see if we have a signed peer record to send back; if we don't, just send
1752			// the peer ID and let the pruned peer find them in the DHT -- we can't trust
1753			// unsigned address records through px anyway.
1754			var recordBytes []byte
1755			if ok {
1756				spr := cab.GetPeerRecord(p)
1757				var err error
1758				if spr != nil {
1759					recordBytes, err = spr.Marshal()
1760					if err != nil {
1761						log.Warnf("error marshaling signed peer record for %s: %s", p, err)
1762					}
1763				}
1764			}
1765			px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
1766		}
1767	}
1768
1769	return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff}
1770}
1771
1772func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
1773	tmap, ok := gs.p.topics[topic]
1774	if !ok {
1775		return nil
1776	}
1777
1778	peers := make([]peer.ID, 0, len(tmap))
1779	for p := range tmap {
1780		if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
1781			peers = append(peers, p)
1782		}
1783	}
1784
1785	shufflePeers(peers)
1786
1787	if count > 0 && len(peers) > count {
1788		peers = peers[:count]
1789	}
1790
1791	return peers
1792}
1793
1794func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
1795	pmap := make(map[peer.ID]struct{})
1796	for _, p := range peers {
1797		pmap[p] = struct{}{}
1798	}
1799	return pmap
1800}
1801
1802func peerMapToList(peers map[peer.ID]struct{}) []peer.ID {
1803	plst := make([]peer.ID, 0, len(peers))
1804	for p := range peers {
1805		plst = append(plst, p)
1806	}
1807	return plst
1808}
1809
1810func shufflePeers(peers []peer.ID) {
1811	for i := range peers {
1812		j := rand.Intn(i + 1)
1813		peers[i], peers[j] = peers[j], peers[i]
1814	}
1815}
1816
1817func shufflePeerInfo(peers []*pb.PeerInfo) {
1818	for i := range peers {
1819		j := rand.Intn(i + 1)
1820		peers[i], peers[j] = peers[j], peers[i]
1821	}
1822}
1823
1824func shuffleStrings(lst []string) {
1825	for i := range lst {
1826		j := rand.Intn(i + 1)
1827		lst[i], lst[j] = lst[j], lst[i]
1828	}
1829}
1830