1package pubsub 2 3import ( 4 "context" 5 6 "github.com/libp2p/go-libp2p-core/host" 7 "github.com/libp2p/go-libp2p-core/peer" 8 "github.com/libp2p/go-libp2p-core/protocol" 9) 10 11const ( 12 FloodSubID = protocol.ID("/floodsub/1.0.0") 13 FloodSubTopicSearchSize = 5 14) 15 16// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps. 17func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) { 18 rt := &FloodSubRouter{ 19 protocols: ps, 20 } 21 return NewPubSub(ctx, h, rt, opts...) 22} 23 24// NewFloodSub returns a new PubSub object using the FloodSubRouter. 25func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { 26 return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...) 27} 28 29type FloodSubRouter struct { 30 p *PubSub 31 protocols []protocol.ID 32 tracer *pubsubTracer 33} 34 35func (fs *FloodSubRouter) Protocols() []protocol.ID { 36 return fs.protocols 37} 38 39func (fs *FloodSubRouter) Attach(p *PubSub) { 40 fs.p = p 41 fs.tracer = p.tracer 42} 43 44func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) { 45 fs.tracer.AddPeer(p, proto) 46} 47 48func (fs *FloodSubRouter) RemovePeer(p peer.ID) { 49 fs.tracer.RemovePeer(p) 50} 51 52func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { 53 // check all peers in the topic 54 tmap, ok := fs.p.topics[topic] 55 if !ok { 56 return false 57 } 58 59 if suggested == 0 { 60 suggested = FloodSubTopicSearchSize 61 } 62 63 if len(tmap) >= suggested { 64 return true 65 } 66 67 return false 68} 69 70func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus { 71 return AcceptAll 72} 73 74func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} 75 76func (fs *FloodSubRouter) Publish(msg *Message) { 77 from := msg.ReceivedFrom 78 topic := msg.GetTopic() 79 80 out := rpcWithMessages(msg.Message) 81 for pid := range fs.p.topics[topic] { 82 if pid == from || pid == peer.ID(msg.GetFrom()) { 83 continue 84 } 85 86 mch, ok := fs.p.peers[pid] 87 if !ok { 88 continue 89 } 90 91 select { 92 case mch <- out: 93 fs.tracer.SendRPC(out, pid) 94 default: 95 log.Infof("dropping message to peer %s: queue full", pid) 96 fs.tracer.DropRPC(out, pid) 97 // Drop it. The peer is too slow. 98 } 99 } 100} 101 102func (fs *FloodSubRouter) Join(topic string) { 103 fs.tracer.Join(topic) 104} 105 106func (fs *FloodSubRouter) Leave(topic string) { 107 fs.tracer.Leave(topic) 108} 109