1package pubsub
2
3import (
4	"bytes"
5	"context"
6	"fmt"
7	"io"
8	"math/rand"
9	"sync"
10	"testing"
11	"time"
12
13	pb "github.com/libp2p/go-libp2p-pubsub/pb"
14
15	"github.com/libp2p/go-libp2p-core/host"
16	"github.com/libp2p/go-libp2p-core/network"
17	"github.com/libp2p/go-libp2p-core/peer"
18	"github.com/libp2p/go-libp2p-core/peerstore"
19	"github.com/libp2p/go-libp2p-core/record"
20
21	bhost "github.com/libp2p/go-libp2p-blankhost"
22	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
23
24	"github.com/libp2p/go-msgio/protoio"
25)
26
27func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub {
28	ps, err := NewGossipSub(ctx, h, opts...)
29	if err != nil {
30		panic(err)
31	}
32	return ps
33}
34
35func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub {
36	var psubs []*PubSub
37	for _, h := range hs {
38		psubs = append(psubs, getGossipsub(ctx, h, opts...))
39	}
40	return psubs
41}
42
43func TestSparseGossipsub(t *testing.T) {
44	ctx, cancel := context.WithCancel(context.Background())
45	defer cancel()
46	hosts := getNetHosts(t, ctx, 20)
47
48	psubs := getGossipsubs(ctx, hosts)
49
50	var msgs []*Subscription
51	for _, ps := range psubs {
52		subch, err := ps.Subscribe("foobar")
53		if err != nil {
54			t.Fatal(err)
55		}
56
57		msgs = append(msgs, subch)
58	}
59
60	sparseConnect(t, hosts)
61
62	// wait for heartbeats to build mesh
63	time.Sleep(time.Second * 2)
64
65	for i := 0; i < 100; i++ {
66		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
67
68		owner := rand.Intn(len(psubs))
69
70		psubs[owner].Publish("foobar", msg)
71
72		for _, sub := range msgs {
73			got, err := sub.Next(ctx)
74			if err != nil {
75				t.Fatal(sub.err)
76			}
77			if !bytes.Equal(msg, got.Data) {
78				t.Fatal("got wrong message!")
79			}
80		}
81	}
82}
83
84func TestDenseGossipsub(t *testing.T) {
85	ctx, cancel := context.WithCancel(context.Background())
86	defer cancel()
87	hosts := getNetHosts(t, ctx, 20)
88
89	psubs := getGossipsubs(ctx, hosts)
90
91	var msgs []*Subscription
92	for _, ps := range psubs {
93		subch, err := ps.Subscribe("foobar")
94		if err != nil {
95			t.Fatal(err)
96		}
97
98		msgs = append(msgs, subch)
99	}
100
101	denseConnect(t, hosts)
102
103	// wait for heartbeats to build mesh
104	time.Sleep(time.Second * 2)
105
106	for i := 0; i < 100; i++ {
107		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
108
109		owner := rand.Intn(len(psubs))
110
111		psubs[owner].Publish("foobar", msg)
112
113		for _, sub := range msgs {
114			got, err := sub.Next(ctx)
115			if err != nil {
116				t.Fatal(sub.err)
117			}
118			if !bytes.Equal(msg, got.Data) {
119				t.Fatal("got wrong message!")
120			}
121		}
122	}
123}
124
125func TestGossipsubFanout(t *testing.T) {
126	ctx, cancel := context.WithCancel(context.Background())
127	defer cancel()
128	hosts := getNetHosts(t, ctx, 20)
129
130	psubs := getGossipsubs(ctx, hosts)
131
132	var msgs []*Subscription
133	for _, ps := range psubs[1:] {
134		subch, err := ps.Subscribe("foobar")
135		if err != nil {
136			t.Fatal(err)
137		}
138
139		msgs = append(msgs, subch)
140	}
141
142	denseConnect(t, hosts)
143
144	// wait for heartbeats to build mesh
145	time.Sleep(time.Second * 2)
146
147	for i := 0; i < 100; i++ {
148		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
149
150		owner := 0
151
152		psubs[owner].Publish("foobar", msg)
153
154		for _, sub := range msgs {
155			got, err := sub.Next(ctx)
156			if err != nil {
157				t.Fatal(sub.err)
158			}
159			if !bytes.Equal(msg, got.Data) {
160				t.Fatal("got wrong message!")
161			}
162		}
163	}
164
165	// subscribe the owner
166	subch, err := psubs[0].Subscribe("foobar")
167	if err != nil {
168		t.Fatal(err)
169	}
170	msgs = append(msgs, subch)
171
172	// wait for a heartbeat
173	time.Sleep(time.Second * 1)
174
175	for i := 0; i < 100; i++ {
176		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
177
178		owner := 0
179
180		psubs[owner].Publish("foobar", msg)
181
182		for _, sub := range msgs {
183			got, err := sub.Next(ctx)
184			if err != nil {
185				t.Fatal(sub.err)
186			}
187			if !bytes.Equal(msg, got.Data) {
188				t.Fatal("got wrong message!")
189			}
190		}
191	}
192}
193
194func TestGossipsubFanoutMaintenance(t *testing.T) {
195	ctx, cancel := context.WithCancel(context.Background())
196	defer cancel()
197	hosts := getNetHosts(t, ctx, 20)
198
199	psubs := getGossipsubs(ctx, hosts)
200
201	var msgs []*Subscription
202	for _, ps := range psubs[1:] {
203		subch, err := ps.Subscribe("foobar")
204		if err != nil {
205			t.Fatal(err)
206		}
207
208		msgs = append(msgs, subch)
209	}
210
211	denseConnect(t, hosts)
212
213	// wait for heartbeats to build mesh
214	time.Sleep(time.Second * 2)
215
216	for i := 0; i < 100; i++ {
217		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
218
219		owner := 0
220
221		psubs[owner].Publish("foobar", msg)
222
223		for _, sub := range msgs {
224			got, err := sub.Next(ctx)
225			if err != nil {
226				t.Fatal(sub.err)
227			}
228			if !bytes.Equal(msg, got.Data) {
229				t.Fatal("got wrong message!")
230			}
231		}
232	}
233
234	// unsubscribe all peers to exercise fanout maintenance
235	for _, sub := range msgs {
236		sub.Cancel()
237	}
238	msgs = nil
239
240	// wait for heartbeats
241	time.Sleep(time.Second * 2)
242
243	// resubscribe and repeat
244	for _, ps := range psubs[1:] {
245		subch, err := ps.Subscribe("foobar")
246		if err != nil {
247			t.Fatal(err)
248		}
249
250		msgs = append(msgs, subch)
251	}
252
253	time.Sleep(time.Second * 2)
254
255	for i := 0; i < 100; i++ {
256		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
257
258		owner := 0
259
260		psubs[owner].Publish("foobar", msg)
261
262		for _, sub := range msgs {
263			got, err := sub.Next(ctx)
264			if err != nil {
265				t.Fatal(sub.err)
266			}
267			if !bytes.Equal(msg, got.Data) {
268				t.Fatal("got wrong message!")
269			}
270		}
271	}
272}
273
274func TestGossipsubFanoutExpiry(t *testing.T) {
275	GossipSubFanoutTTL = 1 * time.Second
276	defer func() {
277		GossipSubFanoutTTL = 60 * time.Second
278	}()
279
280	ctx, cancel := context.WithCancel(context.Background())
281	defer cancel()
282	hosts := getNetHosts(t, ctx, 10)
283
284	psubs := getGossipsubs(ctx, hosts)
285
286	var msgs []*Subscription
287	for _, ps := range psubs[1:] {
288		subch, err := ps.Subscribe("foobar")
289		if err != nil {
290			t.Fatal(err)
291		}
292
293		msgs = append(msgs, subch)
294	}
295
296	denseConnect(t, hosts)
297
298	// wait for heartbeats to build mesh
299	time.Sleep(time.Second * 2)
300
301	for i := 0; i < 5; i++ {
302		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
303
304		owner := 0
305
306		psubs[owner].Publish("foobar", msg)
307
308		for _, sub := range msgs {
309			got, err := sub.Next(ctx)
310			if err != nil {
311				t.Fatal(sub.err)
312			}
313			if !bytes.Equal(msg, got.Data) {
314				t.Fatal("got wrong message!")
315			}
316		}
317	}
318
319	psubs[0].eval <- func() {
320		if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 {
321			t.Fatal("owner has no fanout")
322		}
323	}
324
325	// wait for TTL to expire fanout peers in owner
326	time.Sleep(time.Second * 2)
327
328	psubs[0].eval <- func() {
329		if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 {
330			t.Fatal("fanout hasn't expired")
331		}
332	}
333
334	// wait for it to run in the event loop
335	time.Sleep(10 * time.Millisecond)
336}
337
338func TestGossipsubGossip(t *testing.T) {
339	ctx, cancel := context.WithCancel(context.Background())
340	defer cancel()
341	hosts := getNetHosts(t, ctx, 20)
342
343	psubs := getGossipsubs(ctx, hosts)
344
345	var msgs []*Subscription
346	for _, ps := range psubs {
347		subch, err := ps.Subscribe("foobar")
348		if err != nil {
349			t.Fatal(err)
350		}
351
352		msgs = append(msgs, subch)
353	}
354
355	denseConnect(t, hosts)
356
357	// wait for heartbeats to build mesh
358	time.Sleep(time.Second * 2)
359
360	for i := 0; i < 100; i++ {
361		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
362
363		owner := rand.Intn(len(psubs))
364
365		psubs[owner].Publish("foobar", msg)
366
367		for _, sub := range msgs {
368			got, err := sub.Next(ctx)
369			if err != nil {
370				t.Fatal(sub.err)
371			}
372			if !bytes.Equal(msg, got.Data) {
373				t.Fatal("got wrong message!")
374			}
375		}
376
377		// wait a bit to have some gossip interleaved
378		time.Sleep(time.Millisecond * 100)
379	}
380
381	// and wait for some gossip flushing
382	time.Sleep(time.Second * 2)
383}
384
385func TestGossipsubGossipPiggyback(t *testing.T) {
386	t.Skip("test no longer relevant; gossip propagation has become eager")
387	ctx, cancel := context.WithCancel(context.Background())
388	defer cancel()
389	hosts := getNetHosts(t, ctx, 20)
390
391	psubs := getGossipsubs(ctx, hosts)
392
393	var msgs []*Subscription
394	for _, ps := range psubs {
395		subch, err := ps.Subscribe("foobar")
396		if err != nil {
397			t.Fatal(err)
398		}
399
400		msgs = append(msgs, subch)
401	}
402
403	var xmsgs []*Subscription
404	for _, ps := range psubs {
405		subch, err := ps.Subscribe("bazcrux")
406		if err != nil {
407			t.Fatal(err)
408		}
409
410		xmsgs = append(xmsgs, subch)
411	}
412
413	denseConnect(t, hosts)
414
415	// wait for heartbeats to build mesh
416	time.Sleep(time.Second * 2)
417
418	for i := 0; i < 100; i++ {
419		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
420
421		owner := rand.Intn(len(psubs))
422
423		psubs[owner].Publish("foobar", msg)
424		psubs[owner].Publish("bazcrux", msg)
425
426		for _, sub := range msgs {
427			got, err := sub.Next(ctx)
428			if err != nil {
429				t.Fatal(sub.err)
430			}
431			if !bytes.Equal(msg, got.Data) {
432				t.Fatal("got wrong message!")
433			}
434		}
435
436		for _, sub := range xmsgs {
437			got, err := sub.Next(ctx)
438			if err != nil {
439				t.Fatal(sub.err)
440			}
441			if !bytes.Equal(msg, got.Data) {
442				t.Fatal("got wrong message!")
443			}
444		}
445
446		// wait a bit to have some gossip interleaved
447		time.Sleep(time.Millisecond * 100)
448	}
449
450	// and wait for some gossip flushing
451	time.Sleep(time.Second * 2)
452}
453
454func TestGossipsubGossipPropagation(t *testing.T) {
455	ctx, cancel := context.WithCancel(context.Background())
456	defer cancel()
457
458	hosts := getNetHosts(t, ctx, 20)
459	psubs := getGossipsubs(ctx, hosts)
460
461	hosts1 := hosts[:GossipSubD+1]
462	hosts2 := append(hosts[GossipSubD+1:], hosts[0])
463
464	denseConnect(t, hosts1)
465	denseConnect(t, hosts2)
466
467	var msgs1 []*Subscription
468	for _, ps := range psubs[1 : GossipSubD+1] {
469		subch, err := ps.Subscribe("foobar")
470		if err != nil {
471			t.Fatal(err)
472		}
473
474		msgs1 = append(msgs1, subch)
475	}
476
477	time.Sleep(time.Second * 1)
478
479	for i := 0; i < 10; i++ {
480		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
481
482		owner := 0
483
484		psubs[owner].Publish("foobar", msg)
485
486		for _, sub := range msgs1 {
487			got, err := sub.Next(ctx)
488			if err != nil {
489				t.Fatal(sub.err)
490			}
491			if !bytes.Equal(msg, got.Data) {
492				t.Fatal("got wrong message!")
493			}
494		}
495	}
496
497	time.Sleep(time.Millisecond * 100)
498
499	var msgs2 []*Subscription
500	for _, ps := range psubs[GossipSubD+1:] {
501		subch, err := ps.Subscribe("foobar")
502		if err != nil {
503			t.Fatal(err)
504		}
505
506		msgs2 = append(msgs2, subch)
507	}
508
509	var collect [][]byte
510	for i := 0; i < 10; i++ {
511		for _, sub := range msgs2 {
512			got, err := sub.Next(ctx)
513			if err != nil {
514				t.Fatal(sub.err)
515			}
516			collect = append(collect, got.Data)
517		}
518	}
519
520	for i := 0; i < 10; i++ {
521		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
522		gotit := false
523		for j := 0; j < len(collect); j++ {
524			if bytes.Equal(msg, collect[j]) {
525				gotit = true
526				break
527			}
528		}
529		if !gotit {
530			t.Fatalf("Didn't get message %s", string(msg))
531		}
532	}
533}
534
535func TestGossipsubPrune(t *testing.T) {
536	ctx, cancel := context.WithCancel(context.Background())
537	defer cancel()
538	hosts := getNetHosts(t, ctx, 20)
539
540	psubs := getGossipsubs(ctx, hosts)
541
542	var msgs []*Subscription
543	for _, ps := range psubs {
544		subch, err := ps.Subscribe("foobar")
545		if err != nil {
546			t.Fatal(err)
547		}
548
549		msgs = append(msgs, subch)
550	}
551
552	denseConnect(t, hosts)
553
554	// wait for heartbeats to build mesh
555	time.Sleep(time.Second * 2)
556
557	// disconnect some peers from the mesh to get some PRUNEs
558	for _, sub := range msgs[:5] {
559		sub.Cancel()
560	}
561
562	// wait a bit to take effect
563	time.Sleep(time.Millisecond * 100)
564
565	for i := 0; i < 10; i++ {
566		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
567
568		owner := rand.Intn(len(psubs))
569
570		psubs[owner].Publish("foobar", msg)
571
572		for _, sub := range msgs[5:] {
573			got, err := sub.Next(ctx)
574			if err != nil {
575				t.Fatal(sub.err)
576			}
577			if !bytes.Equal(msg, got.Data) {
578				t.Fatal("got wrong message!")
579			}
580		}
581	}
582}
583
584func TestGossipsubGraft(t *testing.T) {
585	ctx, cancel := context.WithCancel(context.Background())
586	defer cancel()
587	hosts := getNetHosts(t, ctx, 20)
588
589	psubs := getGossipsubs(ctx, hosts)
590
591	sparseConnect(t, hosts)
592
593	time.Sleep(time.Second * 1)
594
595	var msgs []*Subscription
596	for _, ps := range psubs {
597		subch, err := ps.Subscribe("foobar")
598		if err != nil {
599			t.Fatal(err)
600		}
601
602		msgs = append(msgs, subch)
603
604		// wait for announce to propagate
605		time.Sleep(time.Millisecond * 100)
606	}
607
608	time.Sleep(time.Second * 1)
609
610	for i := 0; i < 100; i++ {
611		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
612
613		owner := rand.Intn(len(psubs))
614
615		psubs[owner].Publish("foobar", msg)
616
617		for _, sub := range msgs {
618			got, err := sub.Next(ctx)
619			if err != nil {
620				t.Fatal(sub.err)
621			}
622			if !bytes.Equal(msg, got.Data) {
623				t.Fatal("got wrong message!")
624			}
625		}
626	}
627}
628
629func TestGossipsubRemovePeer(t *testing.T) {
630	ctx, cancel := context.WithCancel(context.Background())
631	defer cancel()
632	hosts := getNetHosts(t, ctx, 20)
633
634	psubs := getGossipsubs(ctx, hosts)
635
636	var msgs []*Subscription
637	for _, ps := range psubs {
638		subch, err := ps.Subscribe("foobar")
639		if err != nil {
640			t.Fatal(err)
641		}
642
643		msgs = append(msgs, subch)
644	}
645
646	denseConnect(t, hosts)
647
648	// wait for heartbeats to build mesh
649	time.Sleep(time.Second * 2)
650
651	// disconnect some peers to exercise RemovePeer paths
652	for _, host := range hosts[:5] {
653		host.Close()
654	}
655
656	// wait a heartbeat
657	time.Sleep(time.Second * 1)
658
659	for i := 0; i < 10; i++ {
660		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
661
662		owner := 5 + rand.Intn(len(psubs)-5)
663
664		psubs[owner].Publish("foobar", msg)
665
666		for _, sub := range msgs[5:] {
667			got, err := sub.Next(ctx)
668			if err != nil {
669				t.Fatal(sub.err)
670			}
671			if !bytes.Equal(msg, got.Data) {
672				t.Fatal("got wrong message!")
673			}
674		}
675	}
676}
677
678func TestGossipsubGraftPruneRetry(t *testing.T) {
679	ctx, cancel := context.WithCancel(context.Background())
680	defer cancel()
681
682	hosts := getNetHosts(t, ctx, 10)
683	psubs := getGossipsubs(ctx, hosts)
684	denseConnect(t, hosts)
685
686	var topics []string
687	var msgs [][]*Subscription
688	for i := 0; i < 35; i++ {
689		topic := fmt.Sprintf("topic%d", i)
690		topics = append(topics, topic)
691
692		var subs []*Subscription
693		for _, ps := range psubs {
694			subch, err := ps.Subscribe(topic)
695			if err != nil {
696				t.Fatal(err)
697			}
698
699			subs = append(subs, subch)
700		}
701		msgs = append(msgs, subs)
702	}
703
704	// wait for heartbeats to build meshes
705	time.Sleep(time.Second * 5)
706
707	for i, topic := range topics {
708		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
709
710		owner := rand.Intn(len(psubs))
711
712		psubs[owner].Publish(topic, msg)
713
714		for _, sub := range msgs[i] {
715			got, err := sub.Next(ctx)
716			if err != nil {
717				t.Fatal(sub.err)
718			}
719			if !bytes.Equal(msg, got.Data) {
720				t.Fatal("got wrong message!")
721			}
722		}
723	}
724}
725
726func TestGossipsubControlPiggyback(t *testing.T) {
727	t.Skip("travis regularly fails on this test")
728
729	ctx, cancel := context.WithCancel(context.Background())
730	defer cancel()
731
732	hosts := getNetHosts(t, ctx, 10)
733	psubs := getGossipsubs(ctx, hosts)
734	denseConnect(t, hosts)
735
736	for _, ps := range psubs {
737		subch, err := ps.Subscribe("flood")
738		if err != nil {
739			t.Fatal(err)
740		}
741		go func(sub *Subscription) {
742			for {
743				_, err := sub.Next(ctx)
744				if err != nil {
745					break
746				}
747			}
748		}(subch)
749	}
750
751	time.Sleep(time.Second * 1)
752
753	// create a background flood of messages that overloads the queues
754	done := make(chan struct{})
755	go func() {
756		owner := rand.Intn(len(psubs))
757		for i := 0; i < 10000; i++ {
758			msg := []byte("background flooooood")
759			psubs[owner].Publish("flood", msg)
760		}
761		done <- struct{}{}
762	}()
763
764	time.Sleep(time.Millisecond * 20)
765
766	// and subscribe to a bunch of topics in the meantime -- this should
767	// result in some dropped control messages, with subsequent piggybacking
768	// in the background flood
769	var topics []string
770	var msgs [][]*Subscription
771	for i := 0; i < 5; i++ {
772		topic := fmt.Sprintf("topic%d", i)
773		topics = append(topics, topic)
774
775		var subs []*Subscription
776		for _, ps := range psubs {
777			subch, err := ps.Subscribe(topic)
778			if err != nil {
779				t.Fatal(err)
780			}
781
782			subs = append(subs, subch)
783		}
784		msgs = append(msgs, subs)
785	}
786
787	// wait for the flood to stop
788	<-done
789
790	// and test that we have functional overlays
791	for i, topic := range topics {
792		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
793
794		owner := rand.Intn(len(psubs))
795
796		psubs[owner].Publish(topic, msg)
797
798		for _, sub := range msgs[i] {
799			got, err := sub.Next(ctx)
800			if err != nil {
801				t.Fatal(sub.err)
802			}
803			if !bytes.Equal(msg, got.Data) {
804				t.Fatal("got wrong message!")
805			}
806		}
807	}
808}
809
810func TestMixedGossipsub(t *testing.T) {
811	ctx, cancel := context.WithCancel(context.Background())
812	defer cancel()
813	hosts := getNetHosts(t, ctx, 30)
814
815	gsubs := getGossipsubs(ctx, hosts[:20])
816	fsubs := getPubsubs(ctx, hosts[20:])
817	psubs := append(gsubs, fsubs...)
818
819	var msgs []*Subscription
820	for _, ps := range psubs {
821		subch, err := ps.Subscribe("foobar")
822		if err != nil {
823			t.Fatal(err)
824		}
825
826		msgs = append(msgs, subch)
827	}
828
829	sparseConnect(t, hosts)
830
831	// wait for heartbeats to build mesh
832	time.Sleep(time.Second * 2)
833
834	for i := 0; i < 100; i++ {
835		msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
836
837		owner := rand.Intn(len(psubs))
838
839		psubs[owner].Publish("foobar", msg)
840
841		for _, sub := range msgs {
842			got, err := sub.Next(ctx)
843			if err != nil {
844				t.Fatal(sub.err)
845			}
846			if !bytes.Equal(msg, got.Data) {
847				t.Fatal("got wrong message!")
848			}
849		}
850	}
851}
852
853func TestGossipsubMultihops(t *testing.T) {
854	ctx, cancel := context.WithCancel(context.Background())
855	defer cancel()
856
857	hosts := getNetHosts(t, ctx, 6)
858
859	psubs := getGossipsubs(ctx, hosts)
860
861	connect(t, hosts[0], hosts[1])
862	connect(t, hosts[1], hosts[2])
863	connect(t, hosts[2], hosts[3])
864	connect(t, hosts[3], hosts[4])
865	connect(t, hosts[4], hosts[5])
866
867	var subs []*Subscription
868	for i := 1; i < 6; i++ {
869		ch, err := psubs[i].Subscribe("foobar")
870		if err != nil {
871			t.Fatal(err)
872		}
873		subs = append(subs, ch)
874	}
875
876	// wait for heartbeats to build mesh
877	time.Sleep(time.Second * 2)
878
879	msg := []byte("i like cats")
880	err := psubs[0].Publish("foobar", msg)
881	if err != nil {
882		t.Fatal(err)
883	}
884
885	// last node in the chain should get the message
886	select {
887	case out := <-subs[4].ch:
888		if !bytes.Equal(out.GetData(), msg) {
889			t.Fatal("got wrong data")
890		}
891	case <-time.After(time.Second * 5):
892		t.Fatal("timed out waiting for message")
893	}
894}
895
896func TestGossipsubTreeTopology(t *testing.T) {
897	ctx, cancel := context.WithCancel(context.Background())
898	defer cancel()
899
900	hosts := getNetHosts(t, ctx, 10)
901	psubs := getGossipsubs(ctx, hosts)
902
903	connect(t, hosts[0], hosts[1])
904	connect(t, hosts[1], hosts[2])
905	connect(t, hosts[1], hosts[4])
906	connect(t, hosts[2], hosts[3])
907	connect(t, hosts[0], hosts[5])
908	connect(t, hosts[5], hosts[6])
909	connect(t, hosts[5], hosts[8])
910	connect(t, hosts[6], hosts[7])
911	connect(t, hosts[8], hosts[9])
912
913	/*
914		[0] -> [1] -> [2] -> [3]
915		 |      L->[4]
916		 v
917		[5] -> [6] -> [7]
918		 |
919		 v
920		[8] -> [9]
921	*/
922
923	var chs []*Subscription
924	for _, ps := range psubs {
925		ch, err := ps.Subscribe("fizzbuzz")
926		if err != nil {
927			t.Fatal(err)
928		}
929
930		chs = append(chs, ch)
931	}
932
933	// wait for heartbeats to build mesh
934	time.Sleep(time.Second * 2)
935
936	assertPeerLists(t, hosts, psubs[0], 1, 5)
937	assertPeerLists(t, hosts, psubs[1], 0, 2, 4)
938	assertPeerLists(t, hosts, psubs[2], 1, 3)
939
940	checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs)
941}
942
943// this tests overlay bootstrapping through px in Gossipsub v1.1
944// we start with a star topology and rely on px through prune to build the mesh
945func TestGossipsubStarTopology(t *testing.T) {
946	originalGossipSubD := GossipSubD
947	GossipSubD = 4
948	originalGossipSubDhi := GossipSubDhi
949	GossipSubDhi = GossipSubD + 1
950	originalGossipSubDlo := GossipSubDlo
951	GossipSubDlo = GossipSubD - 1
952	originalGossipSubDscore := GossipSubDscore
953	GossipSubDscore = GossipSubDlo
954	defer func() {
955		GossipSubD = originalGossipSubD
956		GossipSubDhi = originalGossipSubDhi
957		GossipSubDlo = originalGossipSubDlo
958		GossipSubDscore = originalGossipSubDscore
959	}()
960
961	ctx, cancel := context.WithCancel(context.Background())
962	defer cancel()
963
964	hosts := getNetHosts(t, ctx, 20)
965	psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true))
966
967	// configure the center of the star with a very low D
968	psubs[0].eval <- func() {
969		gs := psubs[0].rt.(*GossipSubRouter)
970		gs.D = 0
971		gs.Dlo = 0
972		gs.Dhi = 0
973		gs.Dscore = 0
974	}
975
976	// add all peer addresses to the peerstores
977	// this is necessary because we can't have signed address records witout identify
978	// pushing them
979	for i := range hosts {
980		for j := range hosts {
981			if i == j {
982				continue
983			}
984			hosts[i].Peerstore().AddAddrs(hosts[j].ID(), hosts[j].Addrs(), peerstore.PermanentAddrTTL)
985		}
986	}
987
988	// build the star
989	for i := 1; i < 20; i++ {
990		connect(t, hosts[0], hosts[i])
991	}
992
993	time.Sleep(time.Second)
994
995	// build the mesh
996	var subs []*Subscription
997	for _, ps := range psubs {
998		sub, err := ps.Subscribe("test")
999		if err != nil {
1000			t.Fatal(err)
1001		}
1002		subs = append(subs, sub)
1003	}
1004
1005	// wait a bit for the mesh to build
1006	time.Sleep(10 * time.Second)
1007
1008	// check that all peers have > 1 connection
1009	for i, h := range hosts {
1010		if len(h.Network().Conns()) == 1 {
1011			t.Errorf("peer %d has ony a single connection", i)
1012		}
1013	}
1014
1015	// send a message from each peer and assert it was propagated
1016	for i := 0; i < 20; i++ {
1017		msg := []byte(fmt.Sprintf("message %d", i))
1018		psubs[i].Publish("test", msg)
1019
1020		for _, sub := range subs {
1021			assertReceive(t, sub, msg)
1022		}
1023	}
1024}
1025
1026// this tests overlay bootstrapping through px in Gossipsub v1.1, with addresses
1027// exchanged in signed peer records.
1028// we start with a star topology and rely on px through prune to build the mesh
1029func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) {
1030	originalGossipSubD := GossipSubD
1031	GossipSubD = 4
1032	originalGossipSubDhi := GossipSubDhi
1033	GossipSubDhi = GossipSubD + 1
1034	originalGossipSubDlo := GossipSubDlo
1035	GossipSubDlo = GossipSubD - 1
1036	originalGossipSubDscore := GossipSubDscore
1037	GossipSubDscore = GossipSubDlo
1038	defer func() {
1039		GossipSubD = originalGossipSubD
1040		GossipSubDhi = originalGossipSubDhi
1041		GossipSubDlo = originalGossipSubDlo
1042		GossipSubDscore = originalGossipSubDscore
1043	}()
1044
1045	ctx, cancel := context.WithCancel(context.Background())
1046	defer cancel()
1047
1048	hosts := getNetHosts(t, ctx, 20)
1049	psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true))
1050
1051	// configure the center of the star with a very low D
1052	psubs[0].eval <- func() {
1053		gs := psubs[0].rt.(*GossipSubRouter)
1054		gs.D = 0
1055		gs.Dlo = 0
1056		gs.Dhi = 0
1057		gs.Dscore = 0
1058	}
1059
1060	// manually create signed peer records for each host and add them to the
1061	// peerstore of the center of the star, which is doing the bootstrapping
1062	for i := range hosts[1:] {
1063		privKey := hosts[i].Peerstore().PrivKey(hosts[i].ID())
1064		if privKey == nil {
1065			t.Fatalf("unable to get private key for host %s", hosts[i].ID().Pretty())
1066		}
1067		ai := host.InfoFromHost(hosts[i])
1068		rec := peer.PeerRecordFromAddrInfo(*ai)
1069		signedRec, err := record.Seal(rec, privKey)
1070		if err != nil {
1071			t.Fatalf("error creating signed peer record: %s", err)
1072		}
1073
1074		cab, ok := peerstore.GetCertifiedAddrBook(hosts[0].Peerstore())
1075		if !ok {
1076			t.Fatal("peerstore does not implement CertifiedAddrBook")
1077		}
1078		_, err = cab.ConsumePeerRecord(signedRec, peerstore.PermanentAddrTTL)
1079		if err != nil {
1080			t.Fatalf("error adding signed peer record: %s", err)
1081		}
1082	}
1083
1084	// build the star
1085	for i := 1; i < 20; i++ {
1086		connect(t, hosts[0], hosts[i])
1087	}
1088
1089	time.Sleep(time.Second)
1090
1091	// build the mesh
1092	var subs []*Subscription
1093	for _, ps := range psubs {
1094		sub, err := ps.Subscribe("test")
1095		if err != nil {
1096			t.Fatal(err)
1097		}
1098		subs = append(subs, sub)
1099	}
1100
1101	// wait a bit for the mesh to build
1102	time.Sleep(10 * time.Second)
1103
1104	// check that all peers have > 1 connection
1105	for i, h := range hosts {
1106		if len(h.Network().Conns()) == 1 {
1107			t.Errorf("peer %d has ony a single connection", i)
1108		}
1109	}
1110
1111	// send a message from each peer and assert it was propagated
1112	for i := 0; i < 20; i++ {
1113		msg := []byte(fmt.Sprintf("message %d", i))
1114		psubs[i].Publish("test", msg)
1115
1116		for _, sub := range subs {
1117			assertReceive(t, sub, msg)
1118		}
1119	}
1120}
1121
1122func TestGossipsubDirectPeers(t *testing.T) {
1123	ctx, cancel := context.WithCancel(context.Background())
1124	defer cancel()
1125
1126	h := getNetHosts(t, ctx, 3)
1127	psubs := []*PubSub{
1128		getGossipsub(ctx, h[0], WithDirectConnectTicks(2)),
1129		getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}}), WithDirectConnectTicks(2)),
1130		getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}}), WithDirectConnectTicks(2)),
1131	}
1132
1133	connect(t, h[0], h[1])
1134	connect(t, h[0], h[2])
1135
1136	// verify that the direct peers connected
1137	time.Sleep(2 * time.Second)
1138	if len(h[1].Network().ConnsToPeer(h[2].ID())) == 0 {
1139		t.Fatal("expected a connection between direct peers")
1140	}
1141
1142	// build the mesh
1143	var subs []*Subscription
1144	for _, ps := range psubs {
1145		sub, err := ps.Subscribe("test")
1146		if err != nil {
1147			t.Fatal(err)
1148		}
1149		subs = append(subs, sub)
1150	}
1151
1152	time.Sleep(time.Second)
1153
1154	// publish some messages
1155	for i := 0; i < 3; i++ {
1156		msg := []byte(fmt.Sprintf("message %d", i))
1157		psubs[i].Publish("test", msg)
1158
1159		for _, sub := range subs {
1160			assertReceive(t, sub, msg)
1161		}
1162	}
1163
1164	// disconnect the direct peers to test reconnection
1165	for _, c := range h[1].Network().ConnsToPeer(h[2].ID()) {
1166		c.Close()
1167	}
1168
1169	time.Sleep(5 * time.Second)
1170
1171	if len(h[1].Network().ConnsToPeer(h[2].ID())) == 0 {
1172		t.Fatal("expected a connection between direct peers")
1173	}
1174
1175	// publish some messages
1176	for i := 0; i < 3; i++ {
1177		msg := []byte(fmt.Sprintf("message %d", i))
1178		psubs[i].Publish("test", msg)
1179
1180		for _, sub := range subs {
1181			assertReceive(t, sub, msg)
1182		}
1183	}
1184}
1185
1186func TestGossipsubDirectPeersFanout(t *testing.T) {
1187	// regression test for #371
1188	ctx, cancel := context.WithCancel(context.Background())
1189	defer cancel()
1190
1191	h := getNetHosts(t, ctx, 3)
1192	psubs := []*PubSub{
1193		getGossipsub(ctx, h[0]),
1194		getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}})),
1195		getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}})),
1196	}
1197
1198	connect(t, h[0], h[1])
1199	connect(t, h[0], h[2])
1200
1201	// Join all peers except h2
1202	var subs []*Subscription
1203	for _, ps := range psubs[:2] {
1204		sub, err := ps.Subscribe("test")
1205		if err != nil {
1206			t.Fatal(err)
1207		}
1208		subs = append(subs, sub)
1209	}
1210
1211	time.Sleep(time.Second)
1212
1213	// h2 publishes some messages to build a fanout
1214	for i := 0; i < 3; i++ {
1215		msg := []byte(fmt.Sprintf("message %d", i))
1216		psubs[2].Publish("test", msg)
1217
1218		for _, sub := range subs {
1219			assertReceive(t, sub, msg)
1220		}
1221	}
1222
1223	// verify that h0 is in the fanout of h2, but not h1 who is a direct peer
1224	result := make(chan bool, 2)
1225	psubs[2].eval <- func() {
1226		rt := psubs[2].rt.(*GossipSubRouter)
1227		fanout := rt.fanout["test"]
1228		_, ok := fanout[h[0].ID()]
1229		result <- ok
1230		_, ok = fanout[h[1].ID()]
1231		result <- ok
1232	}
1233
1234	inFanout := <-result
1235	if !inFanout {
1236		t.Fatal("expected peer 0 to be in fanout")
1237	}
1238
1239	inFanout = <-result
1240	if inFanout {
1241		t.Fatal("expected peer 1 to not be in fanout")
1242	}
1243
1244	// now subscribe h2 too and verify tht h0 is in the mesh but not h1
1245	_, err := psubs[2].Subscribe("test")
1246	if err != nil {
1247		t.Fatal(err)
1248	}
1249
1250	time.Sleep(2 * time.Second)
1251
1252	psubs[2].eval <- func() {
1253		rt := psubs[2].rt.(*GossipSubRouter)
1254		mesh := rt.mesh["test"]
1255		_, ok := mesh[h[0].ID()]
1256		result <- ok
1257		_, ok = mesh[h[1].ID()]
1258		result <- ok
1259	}
1260
1261	inMesh := <-result
1262	if !inMesh {
1263		t.Fatal("expected peer 0 to be in mesh")
1264	}
1265
1266	inMesh = <-result
1267	if inMesh {
1268		t.Fatal("expected peer 1 to not be in mesh")
1269	}
1270}
1271
1272func TestGossipsubFloodPublish(t *testing.T) {
1273	// uses a star topology without PX and publishes from the star to verify that all
1274	// messages get received
1275	ctx, cancel := context.WithCancel(context.Background())
1276	defer cancel()
1277
1278	hosts := getNetHosts(t, ctx, 20)
1279	psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true))
1280
1281	// build the star
1282	for i := 1; i < 20; i++ {
1283		connect(t, hosts[0], hosts[i])
1284	}
1285
1286	// build the (partial, unstable) mesh
1287	var subs []*Subscription
1288	for _, ps := range psubs {
1289		sub, err := ps.Subscribe("test")
1290		if err != nil {
1291			t.Fatal(err)
1292		}
1293		subs = append(subs, sub)
1294	}
1295
1296	time.Sleep(time.Second)
1297
1298	// send a message from the star and assert it was received
1299	for i := 0; i < 20; i++ {
1300		msg := []byte(fmt.Sprintf("message %d", i))
1301		psubs[0].Publish("test", msg)
1302
1303		for _, sub := range subs {
1304			assertReceive(t, sub, msg)
1305		}
1306	}
1307}
1308
1309func TestGossipsubEnoughPeers(t *testing.T) {
1310	ctx, cancel := context.WithCancel(context.Background())
1311	defer cancel()
1312
1313	hosts := getNetHosts(t, ctx, 20)
1314	psubs := getGossipsubs(ctx, hosts)
1315
1316	var subs []*Subscription
1317	for _, ps := range psubs {
1318		sub, err := ps.Subscribe("test")
1319		if err != nil {
1320			t.Fatal(err)
1321		}
1322		subs = append(subs, sub)
1323	}
1324
1325	// at this point we have no connections and no mesh, so EnoughPeers should return false
1326	res := make(chan bool, 1)
1327	psubs[0].eval <- func() {
1328		res <- psubs[0].rt.EnoughPeers("test", 0)
1329	}
1330	enough := <-res
1331	if enough {
1332		t.Fatal("should not have enough peers")
1333	}
1334
1335	// connect them densly to build up the mesh
1336	denseConnect(t, hosts)
1337
1338	time.Sleep(3 * time.Second)
1339
1340	psubs[0].eval <- func() {
1341		res <- psubs[0].rt.EnoughPeers("test", 0)
1342	}
1343	enough = <-res
1344	if !enough {
1345		t.Fatal("should have enough peers")
1346	}
1347}
1348
1349func TestGossipsubNegativeScore(t *testing.T) {
1350	// in this test we score sinkhole a peer to exercise code paths relative to negative scores
1351	ctx, cancel := context.WithCancel(context.Background())
1352	defer cancel()
1353
1354	hosts := getNetHosts(t, ctx, 20)
1355	psubs := getGossipsubs(ctx, hosts,
1356		WithPeerScore(
1357			&PeerScoreParams{
1358				AppSpecificScore: func(p peer.ID) float64 {
1359					if p == hosts[0].ID() {
1360						return -1000
1361					} else {
1362						return 0
1363					}
1364				},
1365				AppSpecificWeight: 1,
1366				DecayInterval:     time.Second,
1367				DecayToZero:       0.01,
1368			},
1369			&PeerScoreThresholds{
1370				GossipThreshold:   -10,
1371				PublishThreshold:  -100,
1372				GraylistThreshold: -10000,
1373			}))
1374
1375	denseConnect(t, hosts)
1376
1377	var subs []*Subscription
1378	for _, ps := range psubs {
1379		sub, err := ps.Subscribe("test")
1380		if err != nil {
1381			t.Fatal(err)
1382		}
1383		subs = append(subs, sub)
1384	}
1385
1386	time.Sleep(3 * time.Second)
1387
1388	for i := 0; i < 20; i++ {
1389		msg := []byte(fmt.Sprintf("message %d", i))
1390		psubs[i%20].Publish("test", msg)
1391		time.Sleep(20 * time.Millisecond)
1392	}
1393
1394	// let the sinkholed peer try to emit gossip as well
1395	time.Sleep(2 * time.Second)
1396
1397	// checks:
1398	// 1. peer 0 should only receive its own message
1399	// 2. peers 1-20 should not receive a message from peer 0, because it's not part of the mesh
1400	//    and its gossip is rejected
1401	collectAll := func(sub *Subscription) []*Message {
1402		var res []*Message
1403		ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
1404		defer cancel()
1405
1406		for {
1407			msg, err := sub.Next(ctx)
1408			if err != nil {
1409				break
1410			}
1411
1412			res = append(res, msg)
1413		}
1414
1415		return res
1416	}
1417
1418	count := len(collectAll(subs[0]))
1419	if count != 1 {
1420		t.Fatalf("expected 1 message but got %d instead", count)
1421	}
1422
1423	for _, sub := range subs[1:] {
1424		all := collectAll(sub)
1425		for _, m := range all {
1426			if m.ReceivedFrom == hosts[0].ID() {
1427				t.Fatal("received message from sinkholed peer")
1428			}
1429		}
1430	}
1431}
1432
1433func TestGossipsubScoreValidatorEx(t *testing.T) {
1434	// this is a test that of the two message drop responses from a validator
1435	ctx, cancel := context.WithCancel(context.Background())
1436	defer cancel()
1437
1438	hosts := getNetHosts(t, ctx, 3)
1439	psubs := getGossipsubs(ctx, hosts,
1440		WithPeerScore(
1441			&PeerScoreParams{
1442				AppSpecificScore: func(p peer.ID) float64 { return 0 },
1443				DecayInterval:    time.Second,
1444				DecayToZero:      0.01,
1445				Topics: map[string]*TopicScoreParams{
1446					"test": &TopicScoreParams{
1447						TopicWeight:                    1,
1448						TimeInMeshQuantum:              time.Second,
1449						InvalidMessageDeliveriesWeight: -1,
1450						InvalidMessageDeliveriesDecay:  0.9999,
1451					},
1452				},
1453			},
1454			&PeerScoreThresholds{
1455				GossipThreshold:   -10,
1456				PublishThreshold:  -100,
1457				GraylistThreshold: -10000,
1458			}))
1459
1460	connectAll(t, hosts)
1461
1462	err := psubs[0].RegisterTopicValidator("test", func(ctx context.Context, p peer.ID, msg *Message) ValidationResult {
1463		// we ignore host1 and reject host2
1464		if p == hosts[1].ID() {
1465			return ValidationIgnore
1466		}
1467		if p == hosts[2].ID() {
1468			return ValidationReject
1469		}
1470
1471		return ValidationAccept
1472	})
1473	if err != nil {
1474		t.Fatal(err)
1475	}
1476
1477	sub, err := psubs[0].Subscribe("test")
1478	if err != nil {
1479		t.Fatal(err)
1480	}
1481
1482	time.Sleep(100 * time.Millisecond)
1483
1484	expectNoMessage := func(sub *Subscription) {
1485		ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
1486		defer cancel()
1487
1488		m, err := sub.Next(ctx)
1489		if err == nil {
1490			t.Fatal("expected no message, but got ", string(m.Data))
1491		}
1492	}
1493
1494	psubs[1].Publish("test", []byte("i am not a walrus"))
1495	psubs[2].Publish("test", []byte("i am not a walrus either"))
1496
1497	// assert no messages
1498	expectNoMessage(sub)
1499
1500	// assert that peer1's score is still 0 (its message was ignored) while peer2 should have
1501	// a negative score (its message got rejected)
1502	res := make(chan float64, 1)
1503	psubs[0].eval <- func() {
1504		res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[1].ID())
1505	}
1506	score := <-res
1507	if score != 0 {
1508		t.Fatalf("expected 0 score for peer1, but got %f", score)
1509	}
1510
1511	psubs[0].eval <- func() {
1512		res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[2].ID())
1513	}
1514	score = <-res
1515	if score >= 0 {
1516		t.Fatalf("expected negative score for peer2, but got %f", score)
1517	}
1518}
1519
1520func TestGossipsubPiggybackControl(t *testing.T) {
1521	// this is a direct test of the piggybackControl function as we can't reliably
1522	// trigger it on travis
1523	ctx, cancel := context.WithCancel(context.Background())
1524	defer cancel()
1525
1526	h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
1527	ps := getGossipsub(ctx, h)
1528
1529	blah := peer.ID("bogotr0n")
1530
1531	res := make(chan *RPC, 1)
1532	ps.eval <- func() {
1533		gs := ps.rt.(*GossipSubRouter)
1534		test1 := "test1"
1535		test2 := "test2"
1536		test3 := "test3"
1537		gs.mesh[test1] = make(map[peer.ID]struct{})
1538		gs.mesh[test2] = make(map[peer.ID]struct{})
1539		gs.mesh[test1][blah] = struct{}{}
1540
1541		rpc := &RPC{RPC: pb.RPC{}}
1542		gs.piggybackControl(blah, rpc, &pb.ControlMessage{
1543			Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: &test1}, &pb.ControlGraft{TopicID: &test2}, &pb.ControlGraft{TopicID: &test3}},
1544			Prune: []*pb.ControlPrune{&pb.ControlPrune{TopicID: &test1}, &pb.ControlPrune{TopicID: &test2}, &pb.ControlPrune{TopicID: &test3}},
1545		})
1546		res <- rpc
1547	}
1548
1549	rpc := <-res
1550	if rpc.Control == nil {
1551		t.Fatal("expected non-nil control message")
1552	}
1553	if len(rpc.Control.Graft) != 1 {
1554		t.Fatal("expected 1 GRAFT")
1555	}
1556	if rpc.Control.Graft[0].GetTopicID() != "test1" {
1557		t.Fatal("expected test1 as graft topic ID")
1558	}
1559	if len(rpc.Control.Prune) != 2 {
1560		t.Fatal("expected 2 PRUNEs")
1561	}
1562	if rpc.Control.Prune[0].GetTopicID() != "test2" {
1563		t.Fatal("expected test2 as prune topic ID")
1564	}
1565	if rpc.Control.Prune[1].GetTopicID() != "test3" {
1566		t.Fatal("expected test3 as prune topic ID")
1567	}
1568}
1569
1570func TestGossipsubMultipleGraftTopics(t *testing.T) {
1571	ctx, cancel := context.WithCancel(context.Background())
1572	defer cancel()
1573
1574	hosts := getNetHosts(t, ctx, 2)
1575	psubs := getGossipsubs(ctx, hosts)
1576	sparseConnect(t, hosts)
1577
1578	time.Sleep(time.Second * 1)
1579
1580	firstTopic := "topic1"
1581	secondTopic := "topic2"
1582	thirdTopic := "topic3"
1583
1584	firstPeer := hosts[0].ID()
1585	secondPeer := hosts[1].ID()
1586
1587	p2Sub := psubs[1]
1588	p1Router := psubs[0].rt.(*GossipSubRouter)
1589	p2Router := psubs[1].rt.(*GossipSubRouter)
1590
1591	finChan := make(chan struct{})
1592
1593	p2Sub.eval <- func() {
1594		// Add topics to second peer
1595		p2Router.mesh[firstTopic] = map[peer.ID]struct{}{}
1596		p2Router.mesh[secondTopic] = map[peer.ID]struct{}{}
1597		p2Router.mesh[thirdTopic] = map[peer.ID]struct{}{}
1598
1599		finChan <- struct{}{}
1600	}
1601	<-finChan
1602
1603	// Send multiple GRAFT messages to second peer from
1604	// 1st peer
1605	p1Router.sendGraftPrune(map[peer.ID][]string{
1606		secondPeer: []string{firstTopic, secondTopic, thirdTopic},
1607	}, map[peer.ID][]string{}, map[peer.ID]bool{})
1608
1609	time.Sleep(time.Second * 1)
1610
1611	p2Sub.eval <- func() {
1612		if _, ok := p2Router.mesh[firstTopic][firstPeer]; !ok {
1613			t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", firstTopic)
1614		}
1615		if _, ok := p2Router.mesh[secondTopic][firstPeer]; !ok {
1616			t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", secondTopic)
1617		}
1618		if _, ok := p2Router.mesh[thirdTopic][firstPeer]; !ok {
1619			t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", thirdTopic)
1620		}
1621		finChan <- struct{}{}
1622	}
1623	<-finChan
1624}
1625
1626func TestGossipsubOpportunisticGrafting(t *testing.T) {
1627	originalGossipSubPruneBackoff := GossipSubPruneBackoff
1628	GossipSubPruneBackoff = 500 * time.Millisecond
1629	originalGossipSubGraftFloodThreshold := GossipSubGraftFloodThreshold
1630	GossipSubGraftFloodThreshold = 100 * time.Millisecond
1631	originalGossipSubOpportunisticGraftTicks := GossipSubOpportunisticGraftTicks
1632	GossipSubOpportunisticGraftTicks = 2
1633	defer func() {
1634		GossipSubPruneBackoff = originalGossipSubPruneBackoff
1635		GossipSubGraftFloodThreshold = originalGossipSubGraftFloodThreshold
1636		GossipSubOpportunisticGraftTicks = originalGossipSubOpportunisticGraftTicks
1637	}()
1638
1639	ctx, cancel := context.WithCancel(context.Background())
1640	defer cancel()
1641
1642	hosts := getNetHosts(t, ctx, 50)
1643	// pubsubs for the first 10 hosts
1644	psubs := getGossipsubs(ctx, hosts[:10],
1645		WithFloodPublish(true),
1646		WithPeerScore(
1647			&PeerScoreParams{
1648				AppSpecificScore:  func(peer.ID) float64 { return 0 },
1649				AppSpecificWeight: 0,
1650				DecayInterval:     time.Second,
1651				DecayToZero:       0.01,
1652				Topics: map[string]*TopicScoreParams{
1653					"test": &TopicScoreParams{
1654						TopicWeight:                   1,
1655						TimeInMeshWeight:              0.0002777,
1656						TimeInMeshQuantum:             time.Second,
1657						TimeInMeshCap:                 3600,
1658						FirstMessageDeliveriesWeight:  1,
1659						FirstMessageDeliveriesDecay:   0.9997,
1660						FirstMessageDeliveriesCap:     100,
1661						InvalidMessageDeliveriesDecay: 0.99997,
1662					},
1663				},
1664			},
1665			&PeerScoreThresholds{
1666				GossipThreshold:             -10,
1667				PublishThreshold:            -100,
1668				GraylistThreshold:           -10000,
1669				OpportunisticGraftThreshold: 1,
1670			}))
1671
1672	// connect the real hosts with degree 5
1673	connectSome(t, hosts[:10], 5)
1674
1675	// sybil squatters for the remaining 40 hosts
1676	squatters := make([]*sybilSquatter, 0, 40)
1677	for _, h := range hosts[10:] {
1678		squatter := &sybilSquatter{h: h}
1679		h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
1680		squatters = append(squatters, squatter)
1681	}
1682
1683	// connect all squatters to every real host
1684	for _, squatter := range hosts[10:] {
1685		for _, real := range hosts[:10] {
1686			connect(t, squatter, real)
1687		}
1688	}
1689
1690	// wait a bit for the connections to propagate events to the pubsubs
1691	time.Sleep(time.Second)
1692
1693	// ask the real pubsus to join the topic
1694	for _, ps := range psubs {
1695		sub, err := ps.Subscribe("test")
1696		if err != nil {
1697			t.Fatal(err)
1698		}
1699		// consume the messages
1700		go func(sub *Subscription) {
1701			for {
1702				_, err := sub.Next(ctx)
1703				if err != nil {
1704					return
1705				}
1706			}
1707		}(sub)
1708	}
1709
1710	// publish a bunch of messages from the real hosts
1711	for i := 0; i < 1000; i++ {
1712		msg := []byte(fmt.Sprintf("message %d", i))
1713		psubs[i%10].Publish("test", msg)
1714		time.Sleep(20 * time.Millisecond)
1715	}
1716
1717	// now wait a few of oppgraft cycles
1718	time.Sleep(7 * time.Second)
1719
1720	// check the honest peer meshes, they should have at least 3 honest peers each
1721	res := make(chan int, 1)
1722	for _, ps := range psubs {
1723		ps.eval <- func() {
1724			gs := ps.rt.(*GossipSubRouter)
1725			count := 0
1726			for _, h := range hosts[:10] {
1727				_, ok := gs.mesh["test"][h.ID()]
1728				if ok {
1729					count++
1730				}
1731			}
1732			res <- count
1733		}
1734
1735		count := <-res
1736		if count < 3 {
1737			t.Fatalf("expected at least 3 honest peers, got %d", count)
1738		}
1739	}
1740}
1741
1742type sybilSquatter struct {
1743	h host.Host
1744}
1745
1746func (sq *sybilSquatter) handleStream(s network.Stream) {
1747	defer s.Close()
1748
1749	os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10)
1750	if err != nil {
1751		panic(err)
1752	}
1753
1754	// send a subscription for test in the output stream to become candidate for GRAFT
1755	// and then just read and ignore the incoming RPCs
1756	r := protoio.NewDelimitedReader(s, 1<<20)
1757	w := protoio.NewDelimitedWriter(os)
1758	truth := true
1759	topic := "test"
1760	err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
1761	if err != nil {
1762		panic(err)
1763	}
1764
1765	var rpc pb.RPC
1766	for {
1767		rpc.Reset()
1768		err = r.ReadMsg(&rpc)
1769		if err != nil {
1770			if err != io.EOF {
1771				s.Reset()
1772			}
1773			return
1774		}
1775	}
1776}
1777
1778func TestGossipsubPeerScoreInspect(t *testing.T) {
1779	// this test exercises the code path sof peer score inspection
1780	ctx, cancel := context.WithCancel(context.Background())
1781	defer cancel()
1782
1783	hosts := getNetHosts(t, ctx, 2)
1784
1785	inspector := &mockPeerScoreInspector{}
1786	psub1 := getGossipsub(ctx, hosts[0],
1787		WithPeerScore(
1788			&PeerScoreParams{
1789				Topics: map[string]*TopicScoreParams{
1790					"test": &TopicScoreParams{
1791						TopicWeight:                    1,
1792						TimeInMeshQuantum:              time.Second,
1793						FirstMessageDeliveriesWeight:   1,
1794						FirstMessageDeliveriesDecay:    0.999,
1795						FirstMessageDeliveriesCap:      100,
1796						InvalidMessageDeliveriesWeight: -1,
1797						InvalidMessageDeliveriesDecay:  0.9999,
1798					},
1799				},
1800				AppSpecificScore: func(peer.ID) float64 { return 0 },
1801				DecayInterval:    time.Second,
1802				DecayToZero:      0.01,
1803			},
1804			&PeerScoreThresholds{
1805				GossipThreshold:   -1,
1806				PublishThreshold:  -10,
1807				GraylistThreshold: -1000,
1808			}),
1809		WithPeerScoreInspect(inspector.inspect, time.Second))
1810	psub2 := getGossipsub(ctx, hosts[1])
1811	psubs := []*PubSub{psub1, psub2}
1812
1813	connect(t, hosts[0], hosts[1])
1814
1815	var subs []*Subscription
1816	for _, ps := range psubs {
1817		sub, err := ps.Subscribe("test")
1818		if err != nil {
1819			t.Fatal(err)
1820		}
1821		subs = append(subs, sub)
1822	}
1823
1824	time.Sleep(time.Second)
1825
1826	for i := 0; i < 20; i++ {
1827		msg := []byte(fmt.Sprintf("message %d", i))
1828		psubs[i%2].Publish("test", msg)
1829		time.Sleep(20 * time.Millisecond)
1830	}
1831
1832	time.Sleep(time.Second + 200*time.Millisecond)
1833
1834	score2 := inspector.score(hosts[1].ID())
1835	if score2 < 9 {
1836		t.Fatalf("expected score to be at least 9, instead got %f", score2)
1837	}
1838}
1839
1840func TestGossipsubPeerScoreResetTopicParams(t *testing.T) {
1841	// this test exercises the code path sof peer score inspection
1842	ctx, cancel := context.WithCancel(context.Background())
1843	defer cancel()
1844
1845	hosts := getNetHosts(t, ctx, 1)
1846
1847	ps := getGossipsub(ctx, hosts[0],
1848		WithPeerScore(
1849			&PeerScoreParams{
1850				Topics: map[string]*TopicScoreParams{
1851					"test": &TopicScoreParams{
1852						TopicWeight:                    1,
1853						TimeInMeshQuantum:              time.Second,
1854						FirstMessageDeliveriesWeight:   1,
1855						FirstMessageDeliveriesDecay:    0.999,
1856						FirstMessageDeliveriesCap:      100,
1857						InvalidMessageDeliveriesWeight: -1,
1858						InvalidMessageDeliveriesDecay:  0.9999,
1859					},
1860				},
1861				AppSpecificScore: func(peer.ID) float64 { return 0 },
1862				DecayInterval:    time.Second,
1863				DecayToZero:      0.01,
1864			},
1865			&PeerScoreThresholds{
1866				GossipThreshold:   -1,
1867				PublishThreshold:  -10,
1868				GraylistThreshold: -1000,
1869			}))
1870
1871	topic, err := ps.Join("test")
1872	if err != nil {
1873		t.Fatal(err)
1874	}
1875
1876	err = topic.SetScoreParams(
1877		&TopicScoreParams{
1878			TopicWeight:                    1,
1879			TimeInMeshQuantum:              time.Second,
1880			FirstMessageDeliveriesWeight:   1,
1881			FirstMessageDeliveriesDecay:    0.999,
1882			FirstMessageDeliveriesCap:      200,
1883			InvalidMessageDeliveriesWeight: -1,
1884			InvalidMessageDeliveriesDecay:  0.9999,
1885		})
1886	if err != nil {
1887		t.Fatal(err)
1888	}
1889}
1890
1891type mockPeerScoreInspector struct {
1892	mx     sync.Mutex
1893	scores map[peer.ID]float64
1894}
1895
1896func (ps *mockPeerScoreInspector) inspect(scores map[peer.ID]float64) {
1897	ps.mx.Lock()
1898	defer ps.mx.Unlock()
1899	ps.scores = scores
1900}
1901
1902func (ps *mockPeerScoreInspector) score(p peer.ID) float64 {
1903	ps.mx.Lock()
1904	defer ps.mx.Unlock()
1905	return ps.scores[p]
1906}
1907
1908func TestGossipsubRPCFragmentation(t *testing.T) {
1909	ctx, cancel := context.WithCancel(context.Background())
1910	defer cancel()
1911
1912	hosts := getNetHosts(t, ctx, 2)
1913	ps := getGossipsub(ctx, hosts[0])
1914
1915	// make a fake peer that requests everything through IWANT gossip
1916	iwe := iwantEverything{h: hosts[1]}
1917	iwe.h.SetStreamHandler(GossipSubID_v10, iwe.handleStream)
1918
1919	connect(t, hosts[0], hosts[1])
1920
1921	// have the real pubsub join the test topic
1922	_, err := ps.Subscribe("test")
1923	if err != nil {
1924		t.Fatal(err)
1925	}
1926
1927	// wait for the real pubsub to connect and try to graft to the faker
1928	time.Sleep(time.Second)
1929
1930	// publish a bunch of fairly large messages from the real host
1931	nMessages := 1000
1932	msgSize := 20000
1933	for i := 0; i < nMessages; i++ {
1934		msg := make([]byte, msgSize)
1935		rand.Read(msg)
1936		ps.Publish("test", msg)
1937		time.Sleep(20 * time.Millisecond)
1938	}
1939
1940	// wait a bit for them to be received via gossip by the fake peer
1941	time.Sleep(5 * time.Second)
1942	iwe.lk.Lock()
1943	defer iwe.lk.Unlock()
1944
1945	// we should have received all the messages
1946	if iwe.msgsReceived != nMessages {
1947		t.Fatalf("expected fake gossipsub peer to receive all messages, got %d / %d", iwe.msgsReceived, nMessages)
1948	}
1949
1950	// and we should have seen an IHAVE message for each of them
1951	if iwe.ihavesReceived != nMessages {
1952		t.Fatalf("expected to get IHAVEs for every message, got %d / %d", iwe.ihavesReceived, nMessages)
1953	}
1954
1955	// If everything were fragmented with maximum efficiency, we would expect to get
1956	// (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for.
1957	// The actual number will probably be larger, since there's some overhead for the RPC itself, and
1958	// we probably aren't packing each RPC to it's maximum size
1959	minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize
1960	if iwe.rpcsWithMessages < minExpectedRPCS {
1961		t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages)
1962	}
1963}
1964
1965// iwantEverything is a simple gossipsub client that never grafts onto a mesh,
1966// instead requesting everything through IWANT gossip messages. It is used to
1967// test that large responses to IWANT requests are fragmented into multiple RPCs.
1968type iwantEverything struct {
1969	h                host.Host
1970	lk               sync.Mutex
1971	rpcsWithMessages int
1972	msgsReceived     int
1973	ihavesReceived   int
1974}
1975
1976func (iwe *iwantEverything) handleStream(s network.Stream) {
1977	defer s.Close()
1978
1979	os, err := iwe.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10)
1980	if err != nil {
1981		panic(err)
1982	}
1983
1984	msgIdsReceived := make(map[string]struct{})
1985	gossipMsgIdsReceived := make(map[string]struct{})
1986
1987	// send a subscription for test in the output stream to become candidate for gossip
1988	r := protoio.NewDelimitedReader(s, 1<<20)
1989	w := protoio.NewDelimitedWriter(os)
1990	truth := true
1991	topic := "test"
1992	err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
1993
1994	var rpc pb.RPC
1995	for {
1996		rpc.Reset()
1997		err = r.ReadMsg(&rpc)
1998		if err != nil {
1999			if err != io.EOF {
2000				s.Reset()
2001			}
2002			return
2003		}
2004
2005		iwe.lk.Lock()
2006		if len(rpc.Publish) != 0 {
2007			iwe.rpcsWithMessages++
2008		}
2009		// keep track of unique message ids received
2010		for _, msg := range rpc.Publish {
2011			id := string(msg.Seqno)
2012			if _, seen := msgIdsReceived[id]; !seen {
2013				iwe.msgsReceived++
2014			}
2015			msgIdsReceived[id] = struct{}{}
2016		}
2017
2018		if rpc.Control != nil {
2019			// send a PRUNE for all grafts, so we don't get direct message deliveries
2020			var prunes []*pb.ControlPrune
2021			for _, graft := range rpc.Control.Graft {
2022				prunes = append(prunes, &pb.ControlPrune{TopicID: graft.TopicID})
2023			}
2024
2025			var iwants []*pb.ControlIWant
2026			for _, ihave := range rpc.Control.Ihave {
2027				iwants = append(iwants, &pb.ControlIWant{MessageIDs: ihave.MessageIDs})
2028				for _, msgId := range ihave.MessageIDs {
2029					if _, seen := gossipMsgIdsReceived[msgId]; !seen {
2030						iwe.ihavesReceived++
2031					}
2032					gossipMsgIdsReceived[msgId] = struct{}{}
2033				}
2034			}
2035
2036			out := rpcWithControl(nil, nil, iwants, nil, prunes)
2037			err = w.WriteMsg(out)
2038			if err != nil {
2039				panic(err)
2040			}
2041		}
2042		iwe.lk.Unlock()
2043	}
2044}
2045
2046func TestFragmentRPCFunction(t *testing.T) {
2047	p := peer.ID("some-peer")
2048	topic := "test"
2049	rpc := &RPC{from: p}
2050	limit := 1024
2051
2052	mkMsg := func(size int) *pb.Message {
2053		msg := &pb.Message{}
2054		msg.Data = make([]byte, size-4) // subtract the protobuf overhead, so msg.Size() returns requested size
2055		rand.Read(msg.Data)
2056		return msg
2057	}
2058
2059	ensureBelowLimit := func(rpcs []*RPC) {
2060		for _, r := range rpcs {
2061			if r.Size() > limit {
2062				t.Fatalf("expected fragmented RPC to be below %d bytes, was %d", limit, r.Size())
2063			}
2064		}
2065	}
2066
2067	// it should not fragment if everything fits in one RPC
2068	rpc.Publish = []*pb.Message{}
2069	rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(10)}
2070	results, err := fragmentRPC(rpc, limit)
2071	if err != nil {
2072		t.Fatal(err)
2073	}
2074	if len(results) != 1 {
2075		t.Fatalf("expected single RPC if input is < limit, got %d", len(results))
2076	}
2077
2078	// if there's a message larger than the limit, we should fail
2079	rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(limit * 2)}
2080	results, err = fragmentRPC(rpc, limit)
2081	if err == nil {
2082		t.Fatalf("expected an error if a message exceeds limit, got %d RPCs instead", len(results))
2083	}
2084
2085	// if the individual messages are below the limit, but the RPC as a whole is larger, we should fragment
2086	nMessages := 100
2087	msgSize := 200
2088	truth := true
2089	rpc.Subscriptions = []*pb.RPC_SubOpts{
2090		{
2091			Subscribe: &truth,
2092			Topicid:   &topic,
2093		},
2094	}
2095	rpc.Publish = make([]*pb.Message, nMessages)
2096	for i := 0; i < nMessages; i++ {
2097		rpc.Publish[i] = mkMsg(msgSize)
2098	}
2099	results, err = fragmentRPC(rpc, limit)
2100	if err != nil {
2101		t.Fatal(err)
2102	}
2103	ensureBelowLimit(results)
2104	msgsPerRPC := limit / msgSize
2105	expectedRPCs := nMessages / msgsPerRPC
2106	if len(results) != expectedRPCs {
2107		t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results))
2108	}
2109	var nMessagesFragmented int
2110	var nSubscriptions int
2111	for _, r := range results {
2112		nMessagesFragmented += len(r.Publish)
2113		nSubscriptions += len(r.Subscriptions)
2114	}
2115	if nMessagesFragmented != nMessages {
2116		t.Fatalf("expected fragemented RPCs to contain same number of messages as input, got %d / %d", nMessagesFragmented, nMessages)
2117	}
2118	if nSubscriptions != 1 {
2119		t.Fatal("expected subscription to be present in one of the fragmented messages, but not found")
2120	}
2121
2122	// if we're fragmenting, and the input RPC has control messages,
2123	// the control messages should be in a separate RPC at the end
2124	// reuse RPC from prev test, but add a control message
2125	rpc.Control = &pb.ControlMessage{
2126		Graft: []*pb.ControlGraft{{TopicID: &topic}},
2127		Prune: []*pb.ControlPrune{{TopicID: &topic}},
2128		Ihave: []*pb.ControlIHave{{MessageIDs: []string{"foo"}}},
2129		Iwant: []*pb.ControlIWant{{MessageIDs: []string{"bar"}}},
2130	}
2131	results, err = fragmentRPC(rpc, limit)
2132	if err != nil {
2133		t.Fatal(err)
2134	}
2135	ensureBelowLimit(results)
2136	// we expect one more RPC than last time, with the final one containing the control messages
2137	expectedCtrl := 1
2138	expectedRPCs = (nMessages / msgsPerRPC) + expectedCtrl
2139	if len(results) != expectedRPCs {
2140		t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results))
2141	}
2142	ctl := results[len(results)-1].Control
2143	if ctl == nil {
2144		t.Fatal("expected final fragmented RPC to contain control messages, but .Control was nil")
2145	}
2146	// since it was not altered, the original control message should be identical to the output control message
2147	originalBytes, err := rpc.Control.Marshal()
2148	if err != nil {
2149		t.Fatal(err)
2150	}
2151	receivedBytes, err := ctl.Marshal()
2152	if err != nil {
2153		t.Fatal(err)
2154	}
2155	if !bytes.Equal(originalBytes, receivedBytes) {
2156		t.Fatal("expected control message to be unaltered if it fits within one RPC message")
2157	}
2158
2159	// if the control message is too large to fit into a single RPC, it should be split into multiple RPCs
2160	nTopics := 5 // pretend we're subscribed to multiple topics and sending IHAVE / IWANTs for each
2161	messageIdSize := 32
2162	msgsPerTopic := 100 // enough that a single IHAVE or IWANT will exceed the limit
2163	rpc.Control.Ihave = make([]*pb.ControlIHave, nTopics)
2164	rpc.Control.Iwant = make([]*pb.ControlIWant, nTopics)
2165	for i := 0; i < nTopics; i++ {
2166		messageIds := make([]string, msgsPerTopic)
2167		for m := 0; m < msgsPerTopic; m++ {
2168			mid := make([]byte, messageIdSize)
2169			rand.Read(mid)
2170			messageIds[m] = string(mid)
2171		}
2172		rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds}
2173		rpc.Control.Iwant[i] = &pb.ControlIWant{MessageIDs: messageIds}
2174	}
2175	results, err = fragmentRPC(rpc, limit)
2176	if err != nil {
2177		t.Fatal(err)
2178	}
2179	ensureBelowLimit(results)
2180	minExpectedCtl := rpc.Control.Size() / limit
2181	minExpectedRPCs := (nMessages / msgsPerRPC) + minExpectedCtl
2182	if len(results) < minExpectedRPCs {
2183		t.Fatalf("expected at least %d total RPCs (at least %d with control messages), got %d total", expectedRPCs, expectedCtrl, len(results))
2184	}
2185
2186	// Test the pathological case where a single gossip message ID exceeds the limit.
2187	// It should not be present in the fragmented messages, but smaller IDs should be
2188	rpc.Reset()
2189	giantIdBytes := make([]byte, limit*2)
2190	rand.Read(giantIdBytes)
2191	rpc.Control = &pb.ControlMessage{
2192		Iwant: []*pb.ControlIWant{
2193			{MessageIDs: []string{"hello", string(giantIdBytes)}},
2194		},
2195	}
2196	results, err = fragmentRPC(rpc, limit)
2197	if err != nil {
2198		t.Fatal(err)
2199	}
2200	if len(results) != 1 {
2201		t.Fatalf("expected 1 RPC, got %d", len(results))
2202	}
2203	if len(results[0].Control.Iwant) != 1 {
2204		t.Fatalf("expected 1 IWANT, got %d", len(results[0].Control.Iwant))
2205	}
2206	if results[0].Control.Iwant[0].MessageIDs[0] != "hello" {
2207		t.Fatalf("expected small message ID to be included unaltered, got %s instead",
2208			results[0].Control.Iwant[0].MessageIDs[0])
2209	}
2210}
2211