1package discovery
2
3import (
4	"context"
5	"math/rand"
6	"testing"
7	"time"
8
9	bhost "github.com/libp2p/go-libp2p-blankhost"
10	"github.com/libp2p/go-libp2p-core/discovery"
11	"github.com/libp2p/go-libp2p-core/peer"
12	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
13)
14
15type delayedDiscovery struct {
16	disc  discovery.Discovery
17	delay time.Duration
18}
19
20func (d *delayedDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
21	return d.disc.Advertise(ctx, ns, opts...)
22}
23
24func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
25	dch, err := d.disc.FindPeers(ctx, ns, opts...)
26	if err != nil {
27		return nil, err
28	}
29
30	ch := make(chan peer.AddrInfo, 32)
31	go func() {
32		defer close(ch)
33		for ai := range dch {
34			ch <- ai
35			time.Sleep(d.delay)
36		}
37	}()
38
39	return ch, nil
40}
41
42func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) {
43	t.Helper()
44	peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10))
45	if err != nil {
46		t.Fatal(err)
47	}
48
49	peerset := make(map[peer.ID]struct{})
50	for p := range peerCh {
51		peerset[p.ID] = struct{}{}
52	}
53
54	if len(peerset) != count {
55		t.Fatalf("Was supposed to find %d, found %d instead", count, len(peerset))
56	}
57}
58
59func TestBackoffDiscoverySingleBackoff(t *testing.T) {
60	ctx, cancel := context.WithCancel(context.Background())
61	defer cancel()
62
63	discServer := newDiscoveryServer()
64
65	h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
66	h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
67	d1 := &mockDiscoveryClient{h1, discServer}
68	d2 := &mockDiscoveryClient{h2, discServer}
69
70	bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter,
71		time.Millisecond*100, 2.5, 0, rand.NewSource(0))
72	dCache, err := NewBackoffDiscovery(d1, bkf)
73	if err != nil {
74		t.Fatal(err)
75	}
76
77	const ns = "test"
78
79	// try adding a peer then find it
80	d1.Advertise(ctx, ns, discovery.TTL(time.Hour))
81	assertNumPeers(t, ctx, dCache, ns, 1)
82
83	// add a new peer and make sure it is still hidden by the caching layer
84	d2.Advertise(ctx, ns, discovery.TTL(time.Hour))
85	assertNumPeers(t, ctx, dCache, ns, 1)
86
87	// wait for cache to expire and check for the new peer
88	time.Sleep(time.Millisecond * 110)
89	assertNumPeers(t, ctx, dCache, ns, 2)
90}
91
92func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
93	ctx, cancel := context.WithCancel(context.Background())
94	defer cancel()
95
96	discServer := newDiscoveryServer()
97
98	h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
99	h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
100	d1 := &mockDiscoveryClient{h1, discServer}
101	d2 := &mockDiscoveryClient{h2, discServer}
102
103	// Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms.
104	bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter,
105		time.Millisecond*100, 2.5, 0, rand.NewSource(0))
106	dCache, err := NewBackoffDiscovery(d1, bkf)
107	if err != nil {
108		t.Fatal(err)
109	}
110
111	const ns = "test"
112
113	// try adding a peer then find it
114	d1.Advertise(ctx, ns, discovery.TTL(time.Hour))
115	assertNumPeers(t, ctx, dCache, ns, 1)
116
117	// wait a little to make sure the extra request doesn't modify the backoff
118	time.Sleep(time.Millisecond * 50) //50 < 100
119	assertNumPeers(t, ctx, dCache, ns, 1)
120
121	// wait for backoff to expire and check if we increase it
122	time.Sleep(time.Millisecond * 60) // 50+60 > 100
123	assertNumPeers(t, ctx, dCache, ns, 1)
124
125	d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400))
126
127	time.Sleep(time.Millisecond * 150) //150 < 250
128	assertNumPeers(t, ctx, dCache, ns, 1)
129
130	time.Sleep(time.Millisecond * 150) //150 + 150 > 250
131	assertNumPeers(t, ctx, dCache, ns, 2)
132
133	// check that the backoff has been reset
134	// also checks that we can decrease our peer count (i.e. not just growing a set)
135	time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400
136	assertNumPeers(t, ctx, dCache, ns, 1)
137}
138
139func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
140	ctx, cancel := context.WithCancel(context.Background())
141	defer cancel()
142
143	discServer := newDiscoveryServer()
144
145	// Testing with n larger than most internal buffer sizes (32)
146	n := 40
147	advertisers := make([]discovery.Discovery, n)
148
149	for i := 0; i < n; i++ {
150		h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
151		advertisers[i] = &mockDiscoveryClient{h, discServer}
152	}
153
154	d1 := &delayedDiscovery{advertisers[0], time.Millisecond * 10}
155
156	bkf := NewFixedBackoff(time.Millisecond * 200)
157	dCache, err := NewBackoffDiscovery(d1, bkf)
158	if err != nil {
159		t.Fatal(err)
160	}
161
162	const ns = "test"
163
164	for _, a := range advertisers {
165		if _, err := a.Advertise(ctx, ns, discovery.TTL(time.Hour)); err != nil {
166			t.Fatal(err)
167		}
168	}
169
170	ch1, err := dCache.FindPeers(ctx, ns)
171	if err != nil {
172		t.Fatal(err)
173	}
174
175	_ = <-ch1
176	ch2, err := dCache.FindPeers(ctx, ns)
177	if err != nil {
178		t.Fatal(err)
179	}
180
181	szCh2 := 0
182	for ai := range ch2 {
183		_ = ai
184		szCh2++
185	}
186
187	szCh1 := 1
188	for _ = range ch1 {
189		szCh1++
190	}
191
192	if szCh1 != n && szCh2 != n {
193		t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n)
194	}
195}
196