1package pubsub
2
3import (
4	"context"
5
6	"github.com/libp2p/go-libp2p-core/host"
7	"github.com/libp2p/go-libp2p-core/peer"
8	"github.com/libp2p/go-libp2p-core/protocol"
9)
10
11const (
12	FloodSubID              = protocol.ID("/floodsub/1.0.0")
13	FloodSubTopicSearchSize = 5
14)
15
16// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
17func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
18	rt := &FloodSubRouter{
19		protocols: ps,
20	}
21	return NewPubSub(ctx, h, rt, opts...)
22}
23
24// NewFloodSub returns a new PubSub object using the FloodSubRouter.
25func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
26	return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
27}
28
29type FloodSubRouter struct {
30	p         *PubSub
31	protocols []protocol.ID
32	tracer    *pubsubTracer
33}
34
35func (fs *FloodSubRouter) Protocols() []protocol.ID {
36	return fs.protocols
37}
38
39func (fs *FloodSubRouter) Attach(p *PubSub) {
40	fs.p = p
41	fs.tracer = p.tracer
42}
43
44func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
45	fs.tracer.AddPeer(p, proto)
46}
47
48func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
49	fs.tracer.RemovePeer(p)
50}
51
52func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
53	// check all peers in the topic
54	tmap, ok := fs.p.topics[topic]
55	if !ok {
56		return false
57	}
58
59	if suggested == 0 {
60		suggested = FloodSubTopicSearchSize
61	}
62
63	if len(tmap) >= suggested {
64		return true
65	}
66
67	return false
68}
69
70func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
71	return AcceptAll
72}
73
74func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
75
76func (fs *FloodSubRouter) Publish(msg *Message) {
77	from := msg.ReceivedFrom
78	topic := msg.GetTopic()
79
80	out := rpcWithMessages(msg.Message)
81	for pid := range fs.p.topics[topic] {
82		if pid == from || pid == peer.ID(msg.GetFrom()) {
83			continue
84		}
85
86		mch, ok := fs.p.peers[pid]
87		if !ok {
88			continue
89		}
90
91		select {
92		case mch <- out:
93			fs.tracer.SendRPC(out, pid)
94		default:
95			log.Infof("dropping message to peer %s: queue full", pid)
96			fs.tracer.DropRPC(out, pid)
97			// Drop it. The peer is too slow.
98		}
99	}
100}
101
102func (fs *FloodSubRouter) Join(topic string) {
103	fs.tracer.Join(topic)
104}
105
106func (fs *FloodSubRouter) Leave(topic string) {
107	fs.tracer.Leave(topic)
108}
109