1package discovery 2 3import ( 4 "context" 5 "math/rand" 6 "testing" 7 "time" 8 9 bhost "github.com/libp2p/go-libp2p-blankhost" 10 "github.com/libp2p/go-libp2p-core/discovery" 11 "github.com/libp2p/go-libp2p-core/peer" 12 swarmt "github.com/libp2p/go-libp2p-swarm/testing" 13) 14 15type delayedDiscovery struct { 16 disc discovery.Discovery 17 delay time.Duration 18} 19 20func (d *delayedDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { 21 return d.disc.Advertise(ctx, ns, opts...) 22} 23 24func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { 25 dch, err := d.disc.FindPeers(ctx, ns, opts...) 26 if err != nil { 27 return nil, err 28 } 29 30 ch := make(chan peer.AddrInfo, 32) 31 go func() { 32 defer close(ch) 33 for ai := range dch { 34 ch <- ai 35 time.Sleep(d.delay) 36 } 37 }() 38 39 return ch, nil 40} 41 42func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) { 43 t.Helper() 44 peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10)) 45 if err != nil { 46 t.Fatal(err) 47 } 48 49 peerset := make(map[peer.ID]struct{}) 50 for p := range peerCh { 51 peerset[p.ID] = struct{}{} 52 } 53 54 if len(peerset) != count { 55 t.Fatalf("Was supposed to find %d, found %d instead", count, len(peerset)) 56 } 57} 58 59func TestBackoffDiscoverySingleBackoff(t *testing.T) { 60 ctx, cancel := context.WithCancel(context.Background()) 61 defer cancel() 62 63 discServer := newDiscoveryServer() 64 65 h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 66 h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 67 d1 := &mockDiscoveryClient{h1, discServer} 68 d2 := &mockDiscoveryClient{h2, discServer} 69 70 bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter, 71 time.Millisecond*100, 2.5, 0, rand.NewSource(0)) 72 dCache, err := NewBackoffDiscovery(d1, bkf) 73 if err != nil { 74 t.Fatal(err) 75 } 76 77 const ns = "test" 78 79 // try adding a peer then find it 80 d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) 81 assertNumPeers(t, ctx, dCache, ns, 1) 82 83 // add a new peer and make sure it is still hidden by the caching layer 84 d2.Advertise(ctx, ns, discovery.TTL(time.Hour)) 85 assertNumPeers(t, ctx, dCache, ns, 1) 86 87 // wait for cache to expire and check for the new peer 88 time.Sleep(time.Millisecond * 110) 89 assertNumPeers(t, ctx, dCache, ns, 2) 90} 91 92func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { 93 ctx, cancel := context.WithCancel(context.Background()) 94 defer cancel() 95 96 discServer := newDiscoveryServer() 97 98 h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 99 h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 100 d1 := &mockDiscoveryClient{h1, discServer} 101 d2 := &mockDiscoveryClient{h2, discServer} 102 103 // Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms. 104 bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter, 105 time.Millisecond*100, 2.5, 0, rand.NewSource(0)) 106 dCache, err := NewBackoffDiscovery(d1, bkf) 107 if err != nil { 108 t.Fatal(err) 109 } 110 111 const ns = "test" 112 113 // try adding a peer then find it 114 d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) 115 assertNumPeers(t, ctx, dCache, ns, 1) 116 117 // wait a little to make sure the extra request doesn't modify the backoff 118 time.Sleep(time.Millisecond * 50) //50 < 100 119 assertNumPeers(t, ctx, dCache, ns, 1) 120 121 // wait for backoff to expire and check if we increase it 122 time.Sleep(time.Millisecond * 60) // 50+60 > 100 123 assertNumPeers(t, ctx, dCache, ns, 1) 124 125 d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400)) 126 127 time.Sleep(time.Millisecond * 150) //150 < 250 128 assertNumPeers(t, ctx, dCache, ns, 1) 129 130 time.Sleep(time.Millisecond * 150) //150 + 150 > 250 131 assertNumPeers(t, ctx, dCache, ns, 2) 132 133 // check that the backoff has been reset 134 // also checks that we can decrease our peer count (i.e. not just growing a set) 135 time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400 136 assertNumPeers(t, ctx, dCache, ns, 1) 137} 138 139func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { 140 ctx, cancel := context.WithCancel(context.Background()) 141 defer cancel() 142 143 discServer := newDiscoveryServer() 144 145 // Testing with n larger than most internal buffer sizes (32) 146 n := 40 147 advertisers := make([]discovery.Discovery, n) 148 149 for i := 0; i < n; i++ { 150 h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 151 advertisers[i] = &mockDiscoveryClient{h, discServer} 152 } 153 154 d1 := &delayedDiscovery{advertisers[0], time.Millisecond * 10} 155 156 bkf := NewFixedBackoff(time.Millisecond * 200) 157 dCache, err := NewBackoffDiscovery(d1, bkf) 158 if err != nil { 159 t.Fatal(err) 160 } 161 162 const ns = "test" 163 164 for _, a := range advertisers { 165 if _, err := a.Advertise(ctx, ns, discovery.TTL(time.Hour)); err != nil { 166 t.Fatal(err) 167 } 168 } 169 170 ch1, err := dCache.FindPeers(ctx, ns) 171 if err != nil { 172 t.Fatal(err) 173 } 174 175 _ = <-ch1 176 ch2, err := dCache.FindPeers(ctx, ns) 177 if err != nil { 178 t.Fatal(err) 179 } 180 181 szCh2 := 0 182 for ai := range ch2 { 183 _ = ai 184 szCh2++ 185 } 186 187 szCh1 := 1 188 for _ = range ch1 { 189 szCh1++ 190 } 191 192 if szCh1 != n && szCh2 != n { 193 t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n) 194 } 195} 196